Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

At the same time, diagnosing issues in MM2 distributed mode can be quite challenging. Each flow has a dedicated Connect group, and for each flow, the Connectors have the same name. This causes the following monitoring and diagnostic issues with MM2:

  1. The producers and admins managed by the Connect framework for the tasks will get the exact same client.ids in each flow. In the MM2 logs, the logs of such Kafka clients cannot be distinguished easily/at all.The Connector log context provided by Connect contains the same Connector name/Task ID in each flow. In the MM2 logs, lines coming from inside the context cannot be distinguished easily/at all.
  2. The toString method of some entities (such as WorkerConnector, WorkerTask) used in logs internal Connect thread names only contain the Connector name/Task ID, and those logs cannot be distinguished easily/at all.

MM2 distributed mode should also provide the replication flow in the client.ids the log context and the logs thread names to address this problem.

Public Interfaces

MM2

...

dedicated mode

...

logging change

  • New configuration in MM2 distributed mode: add.MDC context key: flow.context (boolean, default: false) - when enabled the following changes are applied

MM2 distributed mode client.id and log change

  • flow.context will contain the replication flow The client.id configuration of the Connect-managed Kafka clients (used by the Tasks) will be prefixed with the flow, using the following pattern: [<SOURCE>-><TARGET>|] e.g. "[primary→backup|]".
  • Entities having a toString implementation used in logs and relying on Connector name/Task ID will also contain the flow. (WorkerConnector, WorkerTask and all their subclasses)
  • The Connector context (named "connector.context" in the MDC) will be also prefixed with the flow, using the following pattern: <SOURCE>-><TARGET>| e.g. "primary→backup|".

Proposed Changes

Connect code changes

  • LoggingContext: all context-creator methods will now accept a nullable "String prefix" parameter, which if not null, will be prepended to the connector.context value with the "|" separator.
  • WorkerConfig: add a new method with the following signature: String contextPrefix()
    The implementation in WorkerConfig will return null, DistributedConfig will not override it either.
  • Worker: if WorkerConfig.contextPrefix returns non-null, use it to prefix the client.ids of the managed Kafka clients with the "|" separator.
  • WorkerConnector, WorkerTask and their subclasses: store contextPrefix, and if not null, toString also contains the field
  • Important note: since all of the above relies on WorkerConfig.contextPrefix returning non-null, Connect itself will not change.

MM2 code changes

...

  • flow.context will be exported in all locations where connector context information is currently exported.
  • The connect-log4j.properties file will be changed by adding a comment about the flow.context key, and its usage in MM2:
# The `%X{connector.context}` parameter in the layout includes connector-specific and task-specific information
# in the log messages, where appropriate. This makes it easier to identify those log messages that apply to a
# specific connector.
#
# The `%X{flow.context}` parameter can be used in the layout in MM2 dedicated mode. flow.context includes flow-specific information
# in the log messages, where appropriate. This makes it easier to identify those log messages that apply to a
# specific replication flow.
#
connect.log.pattern=[%d] %p %X{connector.context}%m (%c:%L)%n

Thread name changes

The Connect internal thread names will contain a |A->B suffix in MM2 mode. The following Connect internal thread names will change:

Only in MM2 dedicated mode:

  • SourceTaskOffsetCommitter,commitExecutorService (current: SourceTaskOffsetCommitter-0, proposed: SourceTaskOffsetCommitter-0|primary→backup)
  • WorkerTask thread name (current: task-thread-MyConnector-0, proposed: task-thread-MyConnector-0|primary→backup)
  • KafkaStatusBackingStore.sendRetryExecutor (current: status-store-retry-mm2-status.primary.internal, proposed: status-store-retry-mm2-status.primary.internal|primary→backup)
  • KafkaBasedLog.thread (current: KafkaBasedLog Work Thread - mm2-configs.primary.internal, proposed: KafkaBasedLog Work Thread - mm2-configs.primary.internal|primary->backup)

Both in Connect and in MM2 dedicated mode:

  • AbstractHerder.connectorExecutor (current: unnamed, proposed for Connect: connector-executor, proposed for MM2: connector-executor|primary→backup)
  • Worker.executor (current: unnamed, proposed for Connect: worker-executor, proposed for MM2: worker-executor|primary->backup)

Sidenote: the following Connect internal thread names will NOT change:

  • DistributedHerder internal threads - their names already contain the client.id, which contains the flow in MM2 dedicated mode.

Proposed Changes


BeforeAfter (change is in bold)
Log pattern[%d] %p [%t] %X{connector.context}%m (%c:%L)%n[%d] %p [%t] %X{flow.context}%X{connector.context}%m (%c:%L)%n
MirrorSourceTask startup[2023-06-12 11:07:42,246] INFO [task-thread-MirrorSourceConnector-0] [MirrorSourceConnector|task-0] Starting with 2 previously uncommitted partitions. (org.apache.kafka.connect.mirror.MirrorSourceTask:108)[2023-06-12 11:07:42,246] INFO [task-thread-MirrorSourceConnector-0|A->B] [A->B] [MirrorSourceConnector|task-0] Starting with 2 previously uncommitted partitions. (org.apache.kafka.connect.mirror.MirrorSourceTask:108)
Connector logs[2023-06-12 11:10:51,829] INFO [connector-thread-MirrorHeartbeatConnector] [MirrorHeartbeatConnector|worker] MirrorHeartbeatConfig values:[2023-06-12 11:10:51,829] INFO [connector-thread-MirrorHeartbeatConnector|A->B] [A->B] [MirrorHeartbeatConnector|worker] MirrorHeartbeatConfig values:
Framework internal logs[2023-06-12 11:10:51,975] INFO [connector-thread-MirrorSourceConnector] [MirrorSourceConnector|worker] Completed shutdown for WorkerConnector{id=MirrorSourceConnector} (org.apache.kafka.connect.runtime.WorkerConnector:287)[2023-06-12 11:10:51,975] INFO [connector-thread-MirrorSourceConnector|A->B] [A->B] [MirrorSourceConnector|worker] Completed shutdown for WorkerConnector{id=MirrorSourceConnector} (org.apache.kafka.connect.runtime.WorkerConnector:287)

As seen in the example log lines, the Before lines do not provide the full context, the lines cannot be differentiated among the different flows. Lines after the change will contain the flow information in both the flow.context and the thread names.

There is overlap between the thread names and the flow.context. This shouldn't be a problem - many use-cases do not use the thread name in logging, for those cases, the flow.context is still useful. The thread name change also improves diagnostics in other areas, such as when generating thread dumps, or debugging a process

...

.

Compatibility, Deprecation, and Migration Plan

  • The impact on Connect is

    not impacted at all

    minimal, only internal thread names are changed.

  • MM2 distributed mode

    is not impacted, as the new functionality is protected with a feature flag. Users need to opt-in after the upgrade by setting add.flow.context to true

    impact is minimal

    • Connect internal thread names will be changed.

    • The flow context is only added if the user opts-in by referring to the flow.context MDC key in their logging configuration.

Test Plan

Unit tests .

Rejected Alternatives

Supporting the same feature in Connect, making it configurable

To have a unified feature set, we could also support the same prefixing strategy in Connect. This would require an extra Connect configuration to specify the prefix.

In the context of Connect, this doesn't make sense - in a single Connect cluster, Connector names are unique, and the existing client.id and logging context ensures that the diagnostic information is distinguishable.

Tracking the flow under a separate MDC context key

Instead of modifying the existing "connector.context", an extra context key could be introduced. This extra context value could be exported in the same scope as the connector.context. This would allow opting in through changing the logger configuration instead of changing the MM2 configs.

focusing on the new MDC key flow.context.

Thread names are not tested, as they are not part of any contract (user-facing or programmatic).

Rejected Alternatives

Updating the existing connector.context MDC value with the flow information

Instead of exporting a separate flow.context MDC value, the existing connector.context can be updated (prefixed/suffixed with the flow). This would require extra configurations to allow users to opt-in into the new feature. Additionally, it is less flexible, as it would be tied to the connectors/tasks, while the flow information can be used in broader context (e.g. in Connect worker internal logging)This would take the same amount of work as the proposed solution, only the configuration method would be different.