Status
Current state: Under Discussion
Discussion thread: TBD
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The configuration for Kafka Connect distributed worker currently requires several configuration properties that define the names of the three internal Kafka topics that the workers will use for storing connector configs, offsets, and statuses:
config.storage.topic
offset.storage.topic
status.storage.topic
KIP-154 added the ability for the worker to create these topics if they do not exist when the worker starts up, and to use the following properties to define the replication factor and number of partitions for these new topics:
config.storage.replication.factor
– defaults to 3, value must be >= 1offset.storage.replication.factor
– defaults to 3, value must be >= 1status.storage.replication.factor
– defaults to 3, value must be >= 1offset.storage.partitions
– defaults to 25, value must be >= 1status.storage.partitions
– defaults to 5, value must be >= 1
At the time of KIP-154, the AdminClient API required the replication factor be greater than or equal to one. But more recently KIP-464 modified the AdminClient APIs to support creating topics by passing a replication factor of '-1' to signal the topic should be created with the broker's default.replication.factor
value. The Kafka Connect distributed worker configuration does not currently support setting the replication factor to `-1`.
The Kafka Connect distributed worker configuration also does not support defining or passing other topic settings when the Connect worker creates the configuration, offset, and status topics.
Public Interfaces
The allowed values for the following existing Connect distributed worker properties will be changed to also allow -1 to signal that the broker's default replication factor should be used for the newly created topics:
config.storage.replication.factor
offset.storage.replication.factor
status.storage.replication.factor
Also, the distributed worker configuration will be changed to recognize additional optional properties that match the following patterns:
Property pattern | Type | Default | Possible Values | Description |
---|---|---|---|---|
config.storage.<topic-specific-setting> | several | broker value | Additional topic-specific settings used when creating the internal Kafka topic where Connect stores connector configurations. Here "<topic-specific-setting> " corresponds to any of the Kafka topic-level configurations for the version of the Kafka broker where the topic should be created. | |
offset.storage.<topic-specific-setting> | several | broker value | Additional topic-specific settings used when creating the internal Kafka topic where Connect stores source offsets for source connectors. Here "<topic-specific-setting> " corresponds to any of the Kafka topic-level configurations for the version of the Kafka broker where the topic should be created. | |
status.storage.<topic-specific-setting> | several | broker value | Additional topic-specific settings used when creating the internal Kafka topic where Connect stores connector and task statuses. Here "<topic-specific-setting> " corresponds to any of the Kafka topic-level configurations for the version of the Kafka broker where the topic should be created. |
These additional properties (without the prefix) will be passed to the new topic requests created by the Connect worker when it attempts to create the internal topics.
NOTE: This proposal does not affect Connect standalone behavior.
Proposed Changes
When the Connect distributed worker starts up, it currently attempts to create the configuration, offsets, and/or status topics used by the worker if any of those topics do not exist. With this proposal, the additional properties will be passed to the broker during these new topic requests. Note that like with KIP-154, these properties will not be used if the corresponding topic already exists.
Compatibility, Deprecation, and Migration Plan
These changes are backward compatible, and existing Connect distributed worker configurations will continue to work with no change in behavior.
Rejected Alternatives
None