...
Current active discussion thread: here
Previous discussion threads: here and here
...
Property | Type | Default | Possible Values | Description |
---|---|---|---|---|
topic.creation.groups | List of String types | empty | The group default is always defined for topic configurations. The values of this property refer to additional groups | A list of group aliases that will be used to define per group topic configurations for matching topics. If the feature if topic configs is enabled, The group default always exists and matches all topics. |
topic.creation.$alias.include | List of String types | empty | Comma separated list of exact topic names or regular expressions. | A list of strings that represent either exact topic names or regular expressions that may match topic names. This list is used to include topics that match their values and apply this group's specific configuration to the topics that match this inclusion list. $alias applies to any group defined in topic.creation.groups but not the default |
topic.creation.$alias.exclude | List of String types | empty | Comma separated list of exact topic names or regular expressions | A list of strings that represent either exact topic names or regular expressions that may match topic names. This list is used to exclude topics that match their values and refrain from applying this group's specific configuration to the topics that match this exclusion list. $alias applies to any group defined in topic.creation.groups but not the default |
topic.creation.$alias.replication.factor | int | n/a | >= 1 when a value is specified | The replication factor for new topics created for this connector. This value must not be smaller larger than the number of brokers in the Kafka cluster, or otherwise an error will be thrown when the connector will attempt to create a topic. For the default group this configuration is required. For any other group defined in topic.creation.groups this config is optional and if it's missing it gets the value the default group |
topic.creation.$alias.partitions | int | n/a | >= 1 when a value is specified | The number of partitions new topics created for this connector. For the default group this configuration is required. For any other group defined in topic.creation.groups this config is optional and if it's missing it gets the value the default group |
topic.creation.$alias.${kafkaTopicSpecificConfigName} | several | broker value | Any of the Kafka topic-level configurations for the version of the Kafka broker where the records will be written. The broker's topic-level configuration value will be used if that configuration is not specified for the rule. $alias applies to the default as well as any group defined in topic.creation.groups |
Note that these configuration properties will be forwarded to the connector via its initialization methods (e.g. start
or reconfigure
). Also note that the Kafka topic-level configurations do vary by Kafka version, so source connectors should specify only those topic settings that the Kafka broker knows about. Topic settings rejected by the Kafka broker will result in the connector failing with an exception, to avoid silently ignoring invalid topic creation properties.
The configuration properties that accept regular expressions accept regex that are defined as Java regex.
...
Code Block | ||||
---|---|---|---|---|
| ||||
... topic.creation.groups=compacted topic.creation.default.replication.factor=3 topic.creation.default.partitions=5 topic.creation.compacted.include=key_value_topic, configurations* topic.creation.compacted.replication.factor=5 topic.creation.compacted.partitions=1 topic.creation.compacted.cleanup.policy=compact ... |
Example 4: By By default, new topics created by Connect for this connector will have replication factor of 3 and 5 partitionsBy default, new topics created by Connect for this connector will have replication factor of 3 and 5 partitions, while topics that begin with the prefix while topics that begin with the prefix configurations
will be compacted. Additionally, topics that match match the inclusion list of highly_parallel
and don't match its exclusion list will have replication factor of 1 and 1 partition.
...
To address cases in which the security settings need to differ for the Connect worker and its ability to create Connect's internal topics and a source connector that needs to be allowed to create new topics using the feature described in this KIP, Connect will give the ability of such specialization via the config overrides that were introduced with KIP-458. For example, if a specific source connector contains the right properties with the prefix admin.override.
then this connector will be allowed to create new topics in cases where the Connect worker's settings would not be the appropriate. The following two examples highlight two different use cases.
Example 5: The connector is deployed with special properties that work both for producing records and creating topics:
Code Block | ||||
---|---|---|---|---|
| ||||
...
producer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="alice" \
password="alice-secret";
... |
Example 6: The connector is deployed with special properties that work both for producing records and creating topics:
Code Block | ||||
---|---|---|---|---|
| ||||
...
producer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="alice" \
password="alice-secret";
admin.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="bob" \
password="bob-secret";
... |
If creating topics is not desired for security purposesIf creating topics is not desired for security purposes, this feature should be disabled by setting topic.creation.enable=false
Compatibility, Deprecation, and Migration Plan
...