Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

PropertyTypeDefaultPossible ValuesDescription
topic.creation.groups
List of String typesemptyThe group default is always defined for topic configurations. The values of this property refer to additional groupsA list of group aliases that will be used to define per group topic configurations for matching topics. If the feature if topic configs is enabled, The group default always exists and matches all topics. 
topic.creation.$alias.include
List of String typesemptyComma separated list of exact topic names or regular expressions. A list of strings that represent either exact topic names or regular expressions that may match topic names. This list is used to include topics that match their values and apply this group's specific configuration to the topics that match this inclusion list. $alias applies to any group defined in topic.creation.groups but not the default
topic.creation.$alias.exclude
List of String typesemptyComma separated list of exact topic names or regular expressionsA list of strings that represent either exact topic names or regular expressions that may match topic names. This list is used to exclude topics that match their values and refrain from applying this group's specific configuration to the topics that match this exclusion list. $alias applies to any group defined in topic.creation.groups but not the default. Note that exclusion rules have precedent and override any inclusion rules for topics. 
topic.creation.$alias.replication.factor
intn/a>= 1 when a value is specifiedThe replication factor for new topics created for this connector. This value must not be larger than the number of brokers in the Kafka cluster, or otherwise an error will be thrown when the connector will attempt to create a topic. For the default group this configuration is required. For any other group defined in topic.creation.groups this config is optional and if it's missing it gets the value the default group
topic.creation.$alias.partitions
intn/a>= 1 when a value is specifiedThe number of partitions new topics created for this connector. For the default group this configuration is required. For any other group defined in topic.creation.groups this config is optional and if it's missing it gets the value the default group
topic.creation.$alias.${kafkaTopicSpecificConfigName}
severalbroker value
Any of the Kafka topic-level configurations for the version of the Kafka broker where the records will be written. The broker's topic-level configuration value will be used if that configuration is not specified for the rule. $alias applies to the default as well as any group defined in topic.creation.groups

...

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
...
topic.creation.groups=inorder
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5

topic.creation.inorder.include=status, orders.*
topic.creation.inorder.partitions=1
...

...

Example 3: By default, new topics created by Connect for this connector will have replication factor of 3 and 5 partitions, while the key_value_topic topic topic and another.compacted.topic topics or topics that begin with the prefix configurations will be compacted and have a replication factor of 5 and 1 partition. 

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
...
topic.creation.groups=compacted
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5

topic.creation.compacted.include=key_value_topic, another\\.compacted\\.topic, configurations.*
topic.creation.compacted.replication.factor=5
topic.creation.compacted.partitions=1
topic.creation.compacted.cleanup.policy=compact
...

...

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
...
topic.creation.groups=compacted, highly_parallel
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5

topic.creation.highly_parallel.include=hpc.*,parallel.*
topic.creation.highly_parallel.exclude=.*internal, .*metadata, .*config.*
topic.creation.highly_parallel.replication.factor=1
topic.creation.highly_paralle.partitions=1

topic.creation.compacted.include=configurations.*
topic.creation.compacted.cleanup.policy=compact
...

...

Therefore, in order to use this feature, the Kafka principal specified in the worker configuration and used for the source connectors (e.g., producer.*) must have the permission to DESCRIBE and CREATE topics. If  

Note that when the worker's producer does not have the necessary privileges to DESCRIBE existing and CREATE missing topics but a source connector does specify the topic.creation.* configuration properties, the worker will log a warning and will default to the previous behavior of assuming the topics already exist or that the broker will auto-create them when needed.Note that when the Connect worker starts up Connect worker starts up, it already has the ability to create in the Kafka cluster the internal topics used for storing connector configurations, connector and task statuses, and source connector offsets.

...

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
...
producer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="alice" \
    password="alice-secret";

admin.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="bob" \
    password="bob-secret";
...\
    password="bob-secret";
...

If neither the worker properties or the connector overrides allow for creation of new topics during runtime, if this feature is enabled, an error will be logged and the task will fail. 

If creating topics is not desired for security purposes, this feature should be disabled by setting topic.creation.enable=false. In this case, the previous behavior of assuming the topics already exist or that the broker will auto-create them when needed will be in effect.

Compatibility, Deprecation, and Migration Plan

...