Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Table of Contents

Status

Current state:  Under Discussion Adopted

Current active discussion thread: here

Previous discussion threads: here and here

Vote thread (current): here

JIRA:

Jira
serverASF JIRA
: here
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-5295

Released: AK 2.6.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

As of 0.11.0.0, Kafka Connect can automatically create its internal topics using the new AdminClient (see KIP-154), but it still relies upon the broker to auto-create new topics to which source connector records are written. This is error prone, as it's easy for the topics to be created with an inappropriate cleanup policy, replication factor, and/or number of partitions. Some Kafka clusters disable auto topic creation via auto.create.topics.enable=false, and in these cases users creating connectors must manually pre-create the necessary topics. That, of course, can be quite challenging for some source connectors that choose topics dynamically based upon the source and that result in large numbers of topics.

...

This proposal adds several source connector configuration properties that specify the default replication factor, number of partitions, and other topic-specific settings to be used by Connect to create any topic to which the source connector writes that does not exist at the time the source connector generates its records. None of these properties has defaults, so therefore this feature is enabled for this connector only when the feature is enabled for the Connect cluster and when the source connector configuration specifies at least the replication factor and number of partitions for at least one group.  Users may choose to use the default values specified in the Kafka broker by setting the replication factor or the number of partitions to -1 respectively.

Different classes of configuration properties can be defined through the Different classes of configuration properties can be defined through the definition of groups. Group definition is following a pattern that resembles what has been used previously to introduce property definition for transformations in Kafka Connect. The config property groups are listed within the property topic.creation.groups. The hierarchy of groups is built on top of a single implicit group that is called default. The default group always exists and does not need to be listed explicitly in topic.creation.groups (if it does, it will be ignored with a warning message). 

...

PropertyTypeDefaultPossible ValuesDescription
topic.creation.groups
List of String typesemptyThe group default is always defined for topic configurations. The values of this property refer to additional groupsA list of group aliases that will be used to define per group topic configurations for matching topics. If the feature if topic configs is enabled, The group default always exists and matches all topics. 
topic.creation.$alias.include
List of String typesemptyComma separated list of exact topic names or regular expressions. A list of strings that represent either exact topic names or regular expressions that may match topic names. This list is used to include topics that match their values and apply this group's specific configuration to the topics that match this inclusion list. $alias applies to any group defined in topic.creation.groups but not the default
topic.creation.$alias.exclude
List of String typesemptyComma separated list of exact topic names or regular expressionsA list of strings that represent either exact topic names or regular expressions that may match topic names. This list is used to exclude topics that match their values and refrain from applying this group's specific configuration to the topics that match this exclusion list. $alias applies to any group defined in topic.creation.groups but not the default. Note that exclusion rules have precedent and override any inclusion rules for topics. 
topic.creation.$alias.replication.factor
intn/a>= 1 when for a value is specifiedspecific valid value, or -1 to use the broker's default valueThe replication factor for new topics created for this connector. This value must not be smaller larger than the number of brokers in the Kafka cluster, or otherwise an error will be thrown when the connector will attempt to create a topic. For the default group this configuration is required. For any other group defined in topic.creation.groups this config is optional and if it's missing it gets the value the default group
topic.creation.$alias.partitions
intn/a>= 1 when a value is specifiedfor a specific valid value, or -1 to use the broker's default valueThe number of partitions new topics created for this connector. For the default group this configuration is required. For any other group defined in topic.creation.groups this config is optional and if it's missing it gets the value the default group
topic.creation.$alias.${kafkaTopicSpecificConfigName}
severalbroker value
Any of the Kafka topic-level configurations for the version of the Kafka broker where the records will be written. The broker's topic-level configuration value will be used if that configuration is not specified for the rule. $alias applies to the default as well as any group defined in topic.creation.groups

Note that the Kafka topic-level configurations do these configuration properties will be forwarded to the connector via its initialization methods (e.g. start or reconfigure). Also note that the Kafka topic-level configurations do vary by Kafka version, so source connectors should specify only those topic settings that the Kafka broker knows about. Topic settings rejected by the Kafka broker will result in the connector failing with an exception, to avoid silently ignoring invalid topic creation properties.

The configuration properties that accept regular expressions accept regex that are defined as Java regex

Topic configs have always at least one group, the default group. This group has as required config properties the replication factor and the number of partitions. Also it has an implicit inclusion list that matches all topics and an implicit exclusion list that is empty. Therefore, configuring these two properties for the default topic config group is not required and will be ignored (with a warning message in the logs). 

In terms of configuration composition between groups and precedence order: 

  • A topic might get its configuration properties from one or more groups
  • Groups are listed in order of preference within the property topic.creation.groups, with the highest preference group first and the lowest priority group last. The default does not have to be explicitly listed and has always the lowest priority. Given that preference is uniquely defined by the sequence of groups in topic.creation.groups, the order in which group configurations occur within a java properties file or a json encoded configuration does not matter. 

These properties have no effect if the feature is disabled on the Connect cluster via topic.creation.enable=false in the cluster's worker configurations.

Configuration Examples

The following are examples that demonstrate the additions in the configuration of source connectors that this KIP is proposing. For simplicity, these examples show only snippets of the connector configuration properties that deal with topic creation and they are shown in Java properties format. The replication factor and number of partitions must be specified at least for the default group in the source connector configuration in order to enable topic creation for the connector.

Example 1: All new topics created by Connect for this connector will have replication factor of 3 and 5 partitions. Since default is the only group of topic creation properties, the config topic.creation.groups can be skipped:

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
...
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5
...

 

If the connector fails to create a topic for any reason, the task that attempts to create that topic will be stopped and will have to be manually restarted once the issue that resulted in it failure is resolved. Given that topic creation is supported during runtime, such failures are expected to happen any time during the lifetime of a connector and not only during its initial deployment. 

The configuration properties that accept regular expressions accept regex that are defined as Java regex

Topic configs have always at least one group, the default group. This group has as required config properties the replication factor and the number of partitions. Also it has an implicit inclusion list that matches all topics and an implicit exclusion list that is empty. Therefore, configuring these two properties for the default topic config group is not required and will be ignored (with a warning message in the logs). 

In terms of configuration composition between groups and precedence order: 

  • A topic might get its configuration properties from one or more groups
  • Groups are listed in order of preference within the property topic.creation.groups, with the highest preference group first and the lowest priority group last. The default does not have to be explicitly listed and has always the lowest priority. Given that preference is uniquely defined by the sequence of groups in topic.creation.groups, the order in which group configurations occur within a java properties file or a json encoded configuration does not matter. 

These properties have no effect if the feature is disabled on the Connect cluster via topic.creation.enable=false in the cluster's worker configurations.

Configuration Examples

The following are examples that demonstrate the additions in the configuration of source connectors that this KIP is proposing. For simplicity, these examples show only snippets of the connector configuration properties that deal with topic creation and they are shown in Java properties format. The replication factor and number of partitions must be specified at least for the default group in the source connector configuration in order to enable topic creation for the connector.


Example 1: All Example 2: By default, new topics created by Connect for this connector will have replication factor of 3 and 5 partitions with the exception of topics that match the inclusion list of the inorder group, which will have 1 partition. Since default is the only group of topic creation properties, the config topic.creation.groups can be skipped:

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
...
topic.creation.groups=inorder
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5

topic.creation.inorder.include=status, orders*
topic.creation.inorder.partitions=1
....


Example 2: By default, new topics created by Example 3: By default, new topics created by Connect for this connector will have replication factor of 3 and 5 partitions , while the key_value_topic topic or topics that begin with the prefix configurations will be compacted and have a replication factor of 5 and 1 partition. with the exception of topics that match the inclusion list of the inorder group, which will have 1 partition:

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
...
topic.creation.groups=compactedinorder
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5

topic.creation.compactedinorder.include=key_value_topicstatus, configurationsorders.*
topic.creation.compacted.replication.factor=5
topic.creation.compactedinorder.partitions=1
topic.creation.compacted.cleanup.policy=compact
...


Example 43: By default, new topics created by Connect for this connector will have replication factor of 3 and 5 partitionsBy default, new topics created by Connect for this connector will have replication factor of 3 and 5 partitions, while topics , while the key_value_topic and another.compacted.topic topics or topics that begin with the prefix configurations will be compacted . Additionally, topics that match match the inclusion list of highly_parallel and don't match its exclusion list will have have a replication factor of 1 5 and 1 partition. 

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
...
topic.creation.groups=compacted, highly_parallel
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5

topic.creation.highly_parallelcompacted.include=hpc*,parallel*
topic.creation.highly_parallel.exclude=*internal, *metadata, *configkey_value_topic, another\\.compacted\\.topic, configurations.*
topic.creation.highly_parallelcompacted.replication.factor=15
topic.creation.highly_parallecompacted.partitions=1

topic.creation.compacted.include=configurations*
topic.creation.compacted.cleanup.policy=compact
...

Sink Connector Configuration

This feature does not affect sink connectors or their configuration. Any topic creation properties added to sink connectors will be ignored and will produce a warning in the log.

REST API

The existing Connect REST API includes several resources whose request and response payloads will be affected by this proposal, although the structure of those payloads are already dependent upon the specific type of connector. Applications that use the REST API must already expect such variation, and therefore should not break after the extensions proposed by this KIP are introduced. 

Security

When topic creation is enabled in the Connect worker, the worker may attempt to create topics to which the source connector(s) write that are not known to exist. The Admin API allows the Connect worker to request these topics be created, but will only attempt to create topics that do not already exist. 

Therefore, in order to use this feature, the Kafka principal specified in the worker configuration and used for the source connectors (e.g., producer.*) must have the permission to DESCRIBE and CREATE topics. If the worker's producer does not have the necessary privileges to DESCRIBE existing and CREATE missing topics but a source connector does specify the topic.creation.* configuration properties, the worker will log a warning and will default to the previous behavior of assuming the topics already exist or that the broker will auto-create them when needed.

Note that when the Connect worker starts up, it already has the ability to create in the Kafka cluster the internal topics used for storing connector configurations, connector and task statuses, and source connector offsets.

To address cases in which the security settings need to differ for the Connect worker and its ability to create Connect's internal topics and a source connector that needs to be allowed to create new topics using the feature described in this KIP, Connect will give the ability of such specialization via the config overrides that were introduced with KIP-458. For example, if a specific source connector contains the right properties with the prefix admin.override. then this connector will be allowed to create new topics in cases where the Connect worker's settings would not be the appropriate. 


Example 4: By default, new topics created by Connect for this connector will have replication factor of 3 and 5 partitions, while topics that begin with the prefix configurations will be compacted. Additionally, topics that match match the inclusion list of highly_parallel and don't match its exclusion list will have replication factor of 1 and 1 partition.

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
...
topic.creation.groups=compacted, highly_parallel
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5

topic.creation.highly_parallel.include=hpc.*,parallel.*
topic.creation.highly_parallel.exclude=.*internal, .*metadata, .*config.*
topic.creation.highly_parallel.replication.factor=1
topic.creation.highly_paralle.partitions=1

topic.creation.compacted.include=configurations.*
topic.creation.compacted.cleanup.policy=compact
...

Sink Connector Configuration

This feature does not affect sink connectors or their configuration. Any topic creation properties added to sink connectors will be ignored and will produce a warning in the log.

REST API

The existing Connect REST API includes several resources whose request and response payloads will be affected by this proposal, although the structure of those payloads are already dependent upon the specific type of connector. Applications that use the REST API must already expect such variation, and therefore should not break after the extensions proposed by this KIP are introduced. 

Security

When topic creation is enabled in the Connect worker, the worker may attempt to create topics to which the source connector(s) write that are not known to exist. The Admin API allows the Connect worker to request these topics be created, but will only attempt to create topics that do not already exist. 

Therefore, in order to use this feature, the Kafka principal specified in the worker configuration and used for the source connectors (e.g., producer.*) must have the permission to DESCRIBE and CREATE topics. 

Note that when the Connect worker starts up, it already has the ability to create in the Kafka cluster the internal topics used for storing connector configurations, connector and task statuses, and source connector offsets.

To address cases in which the security settings need to differ for the Connect worker and its ability to create Connect's internal topics and a source connector that needs to be allowed to create new topics using the feature described in this KIP, Connect will give the ability of such specialization via the config overrides that were introduced with KIP-458. For example, if a specific source connector contains the right properties with the prefix admin.override. then this connector will be allowed to create new topics in cases where the Connect worker's settings would not be the appropriate. The following two examples highlight two different use cases. 


Example 5: The connector is deployed with special properties that work both for producing records and creating topics:

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
...
producer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="alice" \
    password="alice-secret";
...


Example 6: The connector is deployed with special properties that work both for producing records and creating topics:

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
...
producer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="alice" \
    password="alice-secret";

admin.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="bob" \
    password="bob-secret";
...

If neither the worker properties or the connector overrides allow for creation of new topics during runtime, if this feature is enabled, an error will be logged and the task will fail. 

If creating topics is not desired for security purposes, this feature should be disabled by setting topic.creation.enable=false. In this case, the previous behavior of assuming the topics already exist or that the broker will auto-create them when needed will be in effectIf creating topics is not desired for security purposes, this feature should be disabled.

Compatibility, Deprecation, and Migration Plan

...

Finally, this feature uses Kafka's Admin API methods to check for the existence of a topic and to create new topics. This feature will do nothing if If the broker does not support the Admin API methods, which is equivalent to relying upon auto-topic creationan error will be logged and the task will fail if this feature is enabled. If ACLs are used, the Kafka principal used in the Connect worker's producer.* settings is assumed to have privilege to create topics when needed; if not, then appropriate overrides will have to be present in the connector's configuration, and, finally, if that's not in place either, then an error will be logged but and the worker will revert to the old behavior of assuming the topics exist or will be auto-created by the broker.task will fail if this feature is enabled. 

Rejected Alternatives

Several alternative designs were considered but ultimately rejected:

...