...
Property | Type | Default | Possible Values | Description |
---|---|---|---|---|
topic.creation.groups | List of String types | empty | The group default is always defined for topic configurations. The values of this property refer to additional groups | A list of group aliases that will be used to define per group topic configurations for matching topics. If the feature if topic configs is enabled, The group default always exists and matches all topics. |
topic.creation.$alias.include | List of String types | empty | Comma separated list of exact topic names or regular expressions. | A list of strings that represent either exact topic names or regular expressions that may match topic names. This list is used to include topics that match their values and apply this group's specific configuration to the topics that match this inclusion list. $alias applies to any group defined in topic.creation.groups but not the default |
topic.creation.$alias.exclude | List of String types | empty | Comma separated list of exact topic names or regular expressions | A list of strings that represent either exact topic names or regular expressions that may match topic names. This list is used to exclude topics that match their values and refrain from applying this group's specific configuration to the topics that match this exclusion list. $alias applies to any group defined in topic.creation.groups but not the default. Note that exclusion rules have precedent and override any inclusion rules for topics. |
topic.creation.$alias.replication.factor | int | n/a | >= 1 when a value is specified | The replication factor for new topics created for this connector. This value must not be larger than the number of brokers in the Kafka cluster, or otherwise an error will be thrown when the connector will attempt to create a topic. For the default group this configuration is required. For any other group defined in topic.creation.groups this config is optional and if it's missing it gets the value the default group |
topic.creation.$alias.partitions | int | n/a | >= 1 when a value is specified | The number of partitions new topics created for this connector. For the default group this configuration is required. For any other group defined in topic.creation.groups this config is optional and if it's missing it gets the value the default group |
topic.creation.$alias.${kafkaTopicSpecificConfigName} | several | broker value | Any of the Kafka topic-level configurations for the version of the Kafka broker where the records will be written. The broker's topic-level configuration value will be used if that configuration is not specified for the rule. $alias applies to the default as well as any group defined in topic.creation.groups |
...
Code Block | ||||
---|---|---|---|---|
| ||||
...
topic.creation.groups=inorder
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5
topic.creation.inorder.include=status, orders.*
topic.creation.inorder.partitions=1
... |
...
Example 3: By default, new topics created by Connect for this connector will have replication factor of 3 and 5 partitions, while the key_value_topic
topic topic
and another.compacted.topic
topics or topics that begin with the prefix configurations
will be compacted and have a replication factor of 5 and 1 partition.
Code Block | ||||
---|---|---|---|---|
| ||||
... topic.creation.groups=compacted topic.creation.default.replication.factor=3 topic.creation.default.partitions=5 topic.creation.compacted.include=key_value_topic, another\\.compacted\\.topic, configurations.* topic.creation.compacted.replication.factor=5 topic.creation.compacted.partitions=1 topic.creation.compacted.cleanup.policy=compact ... |
...
Code Block | ||||
---|---|---|---|---|
| ||||
... topic.creation.groups=compacted, highly_parallel topic.creation.default.replication.factor=3 topic.creation.default.partitions=5 topic.creation.highly_parallel.include=hpc.*,parallel.* topic.creation.highly_parallel.exclude=.*internal, .*metadata, .*config.* topic.creation.highly_parallel.replication.factor=1 topic.creation.highly_paralle.partitions=1 topic.creation.compacted.include=configurations.* topic.creation.compacted.cleanup.policy=compact ... |
...
Therefore, in order to use this feature, the Kafka principal specified in the worker configuration and used for the source connectors (e.g., producer.*
) must have the permission to DESCRIBE and CREATE topics. If
Note that when the worker's producer does not have the necessary privileges to DESCRIBE existing and CREATE missing topics but a source connector does specify the topic.creation.*
configuration properties, the worker will log a warning and will default to the previous behavior of assuming the topics already exist or that the broker will auto-create them when needed.Note that when the Connect worker starts up Connect worker starts up, it already has the ability to create in the Kafka cluster the internal topics used for storing connector configurations, connector and task statuses, and source connector offsets.
...
Code Block | ||||
---|---|---|---|---|
| ||||
... producer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="alice" \ password="alice-secret"; admin.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="bob" \ password="bob-secret"; ...\ password="bob-secret"; ... |
If neither the worker properties or the connector overrides allow for creation of new topics during runtime, if this feature is enabled, an error will be logged and the task will fail.
If creating topics is not desired for security purposes, this feature should be disabled by setting topic.creation.enable=false
. In this case, the previous behavior of assuming the topics already exist or that the broker will auto-create them when needed will be in effect.
Compatibility, Deprecation, and Migration Plan
...