Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

At the time of KIP-154, the AdminClient API required the replication factor be greater than or equal to one. But more recently 



KIP-464 modified the AdminClient APIs to support creating topics by passing a replication factor of '-1' to signal the topic should be created with the broker's default.replication.factor value. The Kafka Connect distributed worker configuration does not currently support setting the replication factor to `-1`.

The Kafka Connect distributed worker configuration also does not support defining or passing other topic settings when the Connect worker creates the configuration, offset, and status topics. 

Both of these changes will allow Kafka administrators to set up sensible defaults for all topics (including the desired replication factor), and allow a Connect distributed worker configuration to use those defaults where appropriate and override the topic-specific settings for the internal topics as necessary. As an example, consider a user that wants to use the default replication factor of the Kafka cluster but wants to override the minimum ISR for Connect's three internal topics, which could be done with:

Code Block
languagetext
# Use the broker's default replication factor for these topics
config.storage.replication.factor=-1
offset.storage.replication.factor=-1
status.storage.replication.factor=-1
# Override any broker default min ISR to always use 3
config.storage.min.insync.replicas=3
offset.storage.min.insync.replicas=3
status.storage.min.insync.replicas=3


Public Interfaces

The allowed values for the following three existing Connect distributed worker properties will be changed to also allow -1 to signal that the broker's default replication factor should be used for the newly created topics:

...

Also, the distributed worker configuration will be changed to recognize additional optional properties that match the following patterns, and to pass them (without the prefix) to the new topic requests created by the Connect distributed worker when it attempts to create the internal topics:

Property pattern

Type

Default

Possible Values

Description

Description
Excluded properties
config.storage.<topic-specific-setting>
severalbroker value

Additional topic-specific settings used when creating the internal Kafka topic where Connect stores connector configurations. Here "<topic-specific-setting>" corresponds to any of the Kafka topic-level configurations for the version of the Kafka broker where the topic should be created.

  • partitions (always set to 1)
  • cleanup.policy (always set to `compact`)
offset.storage.<topic-specific-setting>
severalbroker valueAdditional topic-specific settings used when creating the internal Kafka topic where Connect stores source offsets for source connectors. Here "<topic-specific-setting>" corresponds to any of the Kafka topic-level configurations for the version of the Kafka broker where the topic should be created.
  • cleanup.policy (always set to `compact`)
status.storage.<topic-specific-setting>
severalbroker valueAdditional topic-specific settings used when creating the internal Kafka topic where Connect stores connector and task statuses. Here "<topic-specific-setting>" corresponds to any of the Kafka topic-level configurations for the version of the Kafka broker where the topic should be created.
  • cleanup.policy (always set to `compact`)


Note that some topic-specific properties are excluded because the distributed worker always sets specific values. Therefore, if a distributed worker configuration does set any of these excluded properties, the distributed worker will issue a warning that such properties should not be set and will be ignored.

This proposal does not otherwise change any other behavior of how or when the Connect worker creates internal topics or uses the properties related to internal topics. This proposal also does not affect Connect standalone behavior.

...

These changes are backward compatible, and existing Connect distributed worker configurations will continue to work with no change in behavior. All new properties are prefixed with one of three prefixes (e.g., `config.storage.

Rejected Alternatives

...

`, `offset.storage.`, and `status.storage.`) already used for other worker-level properties, and thus are not expected to clash with any properties used for REST Extensions or config providers.

Rejected Alternatives

  1. When one of the excluded topic-specific settings are used, cause the worker to fail. This would be equivalent to adding a default value and validator that allows only the one allowed value. This was rejected (in favor of a warning) to follow existing conventions that extra / unknown properties do not cause a failure.