...
The following example defines two topic creation rules named "firstRule
" and "defaultRule
":
topic.creation=firstRule,defaultRule topic.creation.firstRule.regex=MyPrefix.* topic.creation.firstRule.replication.factor=3 topic.creation.firstRule.partitions=5 topic.creation.firstRule.cleanup.policy=compact topic.creation.firstRule.min.insync.replicas=2 topic.creation.firstRule.unclean.leader.election.enable=false topic.creation.defaultRule.regex=.* topic.creation.defaultRule.replication.factor=3 topic.creation.defaultRule.partitions=1 topic.creation.defaultRule.cleanup.policy=compact topic.creation.defaultRule.min.insync.replicas=2 topic.creation.defaultRule.unclean.leader.election.enable=false |
These properties can appear in the connector's configuration in any order, but the order of the names in topic.creation
is important and defines the order in which the framework evaluates whether each rule applies to a topic with a given name. For example, if a new topic named "MyPrefixABC" is to be created, the framework would first use the regular expression of the "firstRule" to see if it matched the topic name "MyPrefixABC". Because it does, the topic-specific settings defined in the topic.creation.firstRule.* properties would be used and passed to the connector for validation / overrides and ultimately used to create the topic. However, a topic named "XYZ" would match the defaultRule, and thus the topic-specific settings defined in the topic.creation.defaultRule.* properties would be used and passed to the connector for validation / overrides and ultimately used to create the topic.
This style of configuration properties is very similar to those defined for Single Message Transforms.
Java API
To allow source connector implementations may the ability to validate or override some or all of these topic-specific settings, we will modify the following existing abstract class in the Kafka Connect public API:
...
by adding a non-abstract method with the following signature that will by default simply return the input TopicSettings
:
public abstract class SourceTask implements Task { /** |
We will also add the new interface org.apache.kafka.connect.storage.TopicSettings
that has methods for SourceTask.settingsForNewTopic
implementation to easily access and update the topic-specific settings:
/** /** /** /** /** /** /** /** /** /** /** /** /** /** /** /** |
Kafka Connect will provide an implementation of TopicSettings
. When Kafka Connect determines it should attempt to create a new topic, it will find the first applicable topic creation rule, instantiate a TopicSettings
object with that rule's topic-specific settings, pass it to the SourceTask
's settingsForNewTopic
method, and use the resulting TopicSettings
instance and its topic-specific settings when the framework attempts to create the new topic. Note that Kafka Connect will altogether skip creating the new topic if no topic creation rule applies to the topic or if the settingsForNewTopic
method returns null. Kafka Connect will log these activities at the appropriate level.
...
- Change only the Java API and have no configuration changes. This very simple approach would have required no changes to a connector configuration yet still given the source connector tremendous flexibility and responsibility in defining the topic-specific settings for each new topics. This approach was rejected because it still relies upon the connector implementation to address/handle all variation in topic-specific settings that might be desired between new topics; because connector users have very little control over the topic-specific settings; and because the connector to be modified to take advantage of the new feature and would therefore not work with older connectors.
- Change the Java API and use connector configuration properties to define the topic-specific settings used as defaults on all topics. This approach is a bit more flexible than the first alternative in that it allows for connector users to specify some default topic-specific settings in configuration properties. However, this approach was rejected because it offers connector users very little flexibility since it still relies upon the source connector to determine the settings for each of the topics.
- Change only the connector configuration properties. This approach is identical to this proposal except there are no changes to the Java API. This gives users a lot of flexibility and control over the topic-specific settings for each of the topics created by the connector, but it does not offer any way for the source connector to validate any of these settings. For example, a connector that really requires all new topics be compacted could not enforce this.
- Allow the connector or its tasks to explicitly create topics. This adds complexity that has not yet been requested.