...
Property | Type | Default | Possible Values | Description |
---|---|---|---|---|
topic.creation | list | <empty> | <any> | The names of the topic creation rules, in the order that they should be matched against new topics used by the source connector. |
topic.creation.${ruleName}.regex | string | n/a | valid regex | The regular expression that will be applied to new topics. The first rule whose regular expression matches the new topic name will be used. |
topic.creation.${ruleName}.policy | enum | "create" | "autocreate", "create", "fail" | How new topics to which the rule applies should be created. When set to "created", Connect will attempt to use the Admin API to explicitly create the topic. When set to "autocreated", Connect will attempt to let the broker autocreate the topic per its own settings; all topic-specific settings on this rule will be ignored. When set to "fail", any topic to which the rule applies will result in a failure of the task if the topic does not already exist. |
topic.creation.${ruleName}.replication.factor | int | 3 | >= 1 when a value is specified | The replication factor for the topics created using this rule. |
topic.creation.${ruleName}.partitions | int | 1 | >= 1 when a value is specified | The number of partitions for the topics created using this rule. |
topic.creation.${ruleName}.${kafkaTopicSpecificConfigName} | n/a | Any of the Kafka topic-level configurations. The broker's topic-level configuration value will be used if that configuration is not specified for the rule. |
The following example defines two topic creation rules named "firstRule" and "defaultRule":
topic.creation=firstRule,defaultRule
topic.creation.firstRule.regex=MyPrefix.*
topic.creation.firstRule.replication.factor=3
topic.creation.firstRule.partitions=5
topic.creation.firstRule.cleanup.policy=compact
topic.creation.firstRule.min.insync.replicas=2
topic.creation.firstRule.unclean.leader.election.enable=false
topic.creation.defaultRule.regex=.*
topic.creation.defaultRule.replication.factor=3
topic.creation.defaultRule.partitions=1
topic.creation.defaultRule.cleanup.policy=compact
topic.creation.defaultRule.min.insync.replicas=2
topic.creation.defaultRule.unclean.leader.election.enable=false
This style of configuration properties is very similar to those defined for . These properties can appear in the connector's configuration in any order, but the order of the names in "topic.creation" is important and defines the order in which the framework evaluates whether each rule applies to a topic with a given name. For example, if a new topic named "MyPrefixABC" is to be created, the framework would first use the regular expression of the "firstRule" to see if it matched the topic name "MyPrefixABC". Because it does, the topic-specific settings defined in the properties beginning with "topic.creation.firstRule." would be used and passed to the connector for validation / overrides and ultimately used to create the topic. However, a topic named "XYZ" would not match the "firstRule" but would match the "defaultRule", and thus the topic-specific settings defined in the configuration properties beginning with "topic.creation.defaultRule." would be used and passed to the connector for validation / overrides and ultimately used to create the topic.
Here is another example that shows how to use one rule for topics whose names begin with "MyPrefix", and a second default rule that will result in connector failure if the framework needs to create topics because they don't yet exist and are used in source records. This provides a way to enforce that a source connector does not attempt to create unwanted topics.
topic.creation=firstRule,failRule
topic.creation.firstRule.regex=MyPrefix.*
topic.creation.firstRule.replication.factor=3
topic.creation.firstRule.partitions=5
topic.creation.firstRule.cleanup.policy=compact
topic.creation.firstRule.min.insync.replicas=2
topic.creation.firstRule.unclean.leader.election.enable=false
topic.creation.failRule.regex=.*
topic.creation.failRule.policy=fail
Java API
...