Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
As of 0.11.0.0, Kafka Connect can automatically create its internal topics using the new AdminClient (see KIP-154), but it still relies upon the broker to auto-create new topics to which source connector records are written. This is error prone, as it's easy for the topics to be created with an inappropriate cleanup policy, replication factor, and/or number of partitions. Some users would rather not configure their brokers with auto.create.topics.enable=true, and in these cases users must manually pre-create the necessary topics. That, of course, can be quite challenging for some source connectors that choose topics dynamically based upon the source and that result in large numbers of topics.
Kafka Connect should instead create the topics automatically for source connectors. New topics for a connector may need to have a variety of topic-specific settings like the replication factor, the number of partitions, the cleanup policy, the minimum number of in-sync replicas (ISRs), and whether unclean leaders can be elected. This means that users need to have quite a bit of control over these settings, and may need different settings for different topics. And, this should work in a backward compatible way for connectors built using earlier versions of Kafka Connect.
This feature does not affect sink connectors and does not change the topic-specific settings on any existing topics.
Public Interfaces and Proposed Changes
This proposal defines a flexible way to specify topic-specific settings for new topics by introducing the concept of topic creation rules that are specified for a source connector entirely through connector-level configuration properties. Each topic creation rule has a name, a regular expression that is used to determine whether the rule applies to a particular topic, and the topic-specific settings that would be used when creating the new topic. When Kafka Connect determines that a new topic may need to be created, it will find the first topic creation rule that applies to the new topic's name and then use the topic-specific settings specified by that rule. Kafka Connect will then use a new Java API to let the source connector validate and override these topic-specific settings, and then will use these final topic-specific settings when creating the new topic.
Note that if no topic creation rules are defined or when no topic creation rules match, the Kafka Connect framework will not attempt to create the topic before sending source records to that topic. Therefore, by default Kafka Connect will rely upon the broker to auto-create topics as needed.
Configuration
topic.creation
Then each of the named topic creation rules will be defined by one configuration property that define the regular expression used to match topic names and additional configuration properties that define the topic-specific settings:
topic.creation.${ruleName}.regex
topic.creation.${ruleName}.replication.factor
topic.creation.${ruleName}.partitions
topic.creation.${ruleName}.${kafkaTopicSpecificConfigName}
None of these new connector configuration properties has a default value, although any topic-specific setting not specified when Kafka Connect creates the topic will inherit the broker's corresponding topic-specific settings.
The following example defines two topic creation rules named "firstRule
" and "defaultRule
" that are applied in that order:
topic.creation=firstRule,defaultRule topic.creation.firstRule.regex=MyPrefix.* topic.creation.firstRule.replication.factor=3 topic.creation.firstRule.partitions=5 topic.creation.firstRule.cleanup.policy=compact topic.creation.firstRule.min.insync.replicas=2 topic.creation.firstRule.unclean.leader.election.enable=false topic.creation.defaultRule.regex=.* topic.creation.defaultRule.replication.factor=3 topic.creation.defaultRule.partitions=1 topic.creation.defaultRule.cleanup.policy=compact topic.creation.defaultRule.min.insync.replicas=2 topic.creation.defaultRule.unclean.leader.election.enable=false |
This style of configuration properties is very similar to those defined for Single Message Transforms.
Java API
To allow source connector implementations may the ability to validate or override some or all of these topic-specific settings, we will modify the following existing abstract class in the Kafka Connect public API:
org.apache.kafka.connect.source.SourceTask
by adding a non-abstract method with the following signature that will by default simply return the input TopicSettings
:
public abstract class SourceTask implements Task { /** |
We will also add the new interface org.apache.kafka.connect.storage.TopicSettings
that has methods for SourceTask.settingsForNewTopic
implementation to easily access and update the topic-specific settings:
/** /** /** /** /** /** /** /** /** /** /** /** /** /** /** /** |
Kafka Connect will provide an implementation of TopicSettings
. When Kafka Connect determines it should attempt to create a new topic, it will find the first applicable topic creation rule, instantiate a TopicSettings
object with that rule's topic-specific settings, pass it to the SourceTask
's settingsForNewTopic
method, and use the resulting TopicSettings
instance and its topic-specific settings when the framework attempts to create the new topic. Note that Kafka Connect will altogether skip creating the new topic if no topic creation rule applies to the topic or if the settingsForNewTopic
method returns null. Kafka Connect will log these activities at the appropriate level.
Compatibility, Deprecation, and Migration Plan
When users upgrade an existing Kafka Connect installation, they do not need to change any configurations or upgrade any connectors: this feature will not be enabled and Kafka Connect will behave exactly as before by relying upon the broker to auto-create any new topics or upon users to manually create the topics before they are used. There are no plans to remove this legacy behavior.
After upgrading, they must alter the configuration of any source connector to enable the creation of new topics, by defining the topic.creation
property with the names of one or more rules and the corresponding topic.creation.${ruleName}.*
properties for each of the rules.
The developers of source connectors do not need to update or rebuild their connectors, since this proposal's Java API changes are binary compatible. However, if they want their source connector to validate and/or override any of the topic-specific settings, the developers must override the SourceTask
's settingsForNewTopic
method and release their connector. For example, a source connector whose topics should always be compacted can always call settings.cleanupPolicy(COMPACT)
to effectively define the connector's own constraints. Or, a source connector may want to simply log warnings when some topic-specific settings are potentially incorrect (e.g., the number of available brokers or the replication factor does not provide enough replication).
This feature does not affect sink connectors and does not change the topic-specific settings on any existing topics.
Rejected Alternatives
Several alternative designs were considered but ultimately rejected:
- Change only the Java API and have no configuration changes. This very simple approach would have required no changes to a connector configuration yet still given the source connector tremendous flexibility and responsibility in defining the topic-specific settings for each new topics. This approach was rejected because it still relies upon the connector implementation to address/handle all variation in topic-specific settings that might be desired between new topics; because connector users have very little control over the topic-specific settings; and because the connector to be modified to take advantage of the new feature and would therefore not work with older connectors.
- Change the Java API and use connector configuration properties to define the topic-specific settings used as defaults on all topics. This approach is a bit more flexible than the first alternative in that it allows for connector users to specify some default topic-specific settings in configuration properties. However, this approach was rejected because it offers connector users very little flexibility since it still relies upon the source connector to determine the settings for each of the topics.
- Change only the connector configuration properties. This approach is identical to this proposal except there are no changes to the Java API. This gives users a lot of flexibility and control over the topic-specific settings for each of the topics created by the connector, but it does not offer any way for the source connector to validate any of these settings. For example, a connector that really requires all new topics be compacted could not enforce this.