Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Record adoption

Table of Contents


Status

Current stateAdopted

Under Discussion thread: link

Discussion Vote thread: TBD link

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9931

...

  • config.storage.replication.factor – defaults to 3, value must be >= 1
  • offset.storage.replication.factor – defaults to 3, value must be >= 1
  • status.storage.replication.factor – defaults to 3, value must be >= 1
  • offset.storage.partitions – defaults to 25, value must be >= 1
  • status.storage.partitions – defaults to 5, value must be >= 1

The topic used for storage of connector configurations must always have exactly one partition, which is why there is no configuration property to set the partition for this topic.

At the time of KIP-154, the AdminClient API required the replication factor and partitions be greater than or equal to one. But more recently recently, KIP-464 modified the AdminClient APIs to support creating topics by passing a replication factor of '-1' to signal the topic should be created with using the broker's default.replication.factor value and/or the broker's num.partitions value. The Kafka Connect distributed worker configuration currently does not currently support setting using the broker's default replication factor to `-1`or number of partitions.

The Kafka Connect distributed worker configuration also does not support defining or passing other topic settings when the Connect worker creates the configuration, offset, and status topics. 

Both of these changes will allow Kafka administrators to set up sensible defaults for all topics (including the desired replication factor), and allow a Connect distributed worker configuration to use those defaults where appropriate and override the topic-specific settings for the internal topics as necessary.

As an example, consider a user that wants to use the default replication factor of the Kafka cluster but wants to override the minimum ISR for Connect's three internal topics, which could be done with:

Code Block
languagetext
# Use the broker's default replication factor for these topics
config.storage.replication.factor=-1
offset.storage.replication.factor=-1
status.storage.replication.factor=-1
# Override any broker default min ISR to always use 3
config.storage.min.insync.replicas=3
offset.storage.min.insync.replicas=3
status.storage.min.insync.replicas=3


Public Interfaces

The allowed values for the following five existing Connect distributed worker properties will be changed to also allow -1 to signal that the broker's default replication factor should be used for the newly created topics:

  • config.storage.replication.factor
  • offset.storage.replication.factor
  • status.storage.replication.factor
  • offset.storage.partitions
  • status.storage.partitions


Also, the distributed worker configuration will be changed to recognize additional optional properties that match the following patterns, and to pass them (without the prefix) to the new topic requests created by the Connect distributed worker when it attempts to create the internal topics:

Property pattern

Type

Default

Possible Values

Description

Description
Excluded properties
config.storage.<topic-specific-setting>
severalbroker value

Additional topic-specific settings used when creating the internal Kafka topic where Connect stores connector configurations. Here "<topic-specific-setting>"

corresponds to any of the 

must be any valid Kafka topic-level configurations for the version of the Kafka broker where the topic should be created; the Connect worker will fail upon startup if the "<topic-specific-setting>" is not known to the broker.

  • partitions (always set to 1)
  • cleanup.policy (always set to `compact`)
offset.storage.<topic-specific-setting>
severalbroker valueAdditional topic-specific settings used when creating the internal Kafka topic where Connect stores source offsets for source connectors.
Here
 Here "<topic-specific-setting>"
corresponds to any of the 
must be any valid Kafka topic-level configurations for the version of the Kafka broker where the topic should be created; the Connect worker will fail upon startup if the "<topic-specific-setting>" is not known to the broker.
  • cleanup.policy (always set to `compact`)
status.storage.<topic-specific-setting>
severalbroker valueAdditional topic-specific settings used when creating the internal Kafka topic where Connect stores connector and task statuses.
Here 
 Here "<topic-specific-setting>"
corresponds to any of the 
must be any valid Kafka topic-level configurations for the version of the Kafka broker where the topic should be created; the Connect worker will fail upon startup if the "<topic-specific-setting>" is not known to the broker.
  • cleanup.

These additional properties (without the prefix) will be passed to the new topic requests created by the Connect worker when it attempts to create the internal topics.

  • policy (always set to `compact`)


Note that some topic-specific properties are excluded because the distributed worker always sets specific values. Therefore, if a distributed worker configuration does set any of these excluded properties, the distributed worker will issue a warning that such properties should not be set and will be ignored.

The Connect worker will fail upon startup if any of the topic-settings specified in the above configurations are not known to the Kafka broker.

This proposal does not otherwise change any other behavior of how or when the Connect worker creates internal topics. This proposal also NOTE: This proposal does not affect Connect standalone behavior.

...

These changes are backward compatible, and existing Connect distributed worker configurations will continue to work with no change in behavior, unless those configurations define unknown topic settings using any of the patterns defined above, in which case the Connect worker or MirrorMaker2 process will fail upon startup. All new properties are prefixed with one of three prefixes (e.g., `config.storage.`, `offset.storage.`, and `status.storage.`) already used for other worker-level properties, and thus are not expected to clash with any properties used for REST Extensions or config providers.

Rejected Alternatives

...

  1. When one of the excluded topic-specific settings are used, cause the worker to fail. This would be equivalent to adding a default value and validator that allows only the one allowed value. This was rejected (in favor of a warning) to follow existing conventions that extra / unknown properties do not cause a failure.