Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
As of 0.11.0.0, Kafka Connect can automatically create its internal topics using the new AdminClient (see KIP-154), but it still relies upon the broker to auto-create new topics to which source connector records are written. This is error prone, as it's easy for the topics to be created with an inappropriate cleanup policy, replication factor, and/or number of partitions. Some users would rather not configure their brokers with auto.create.topics.enable=true, and in these cases users must manually pre-create the necessary topics. That, of course, can be quite challenging for some source connectors that choose topics dynamically based upon the source and that result in large numbers of topics.
Kafka Connect should instead create the topics automatically for source connectors. New topics for a connector may need to have a variety of topic-specific settings like the replication factor, the number of partitions, the cleanup policy, the minimum number of in-sync replicas (ISRs), and whether unclean leaders can be elected. This means that users need to have quite a bit of control over these settings, and may need different settings for different topics. And, this should work in a backward compatible way for connectors built using earlier versions of Kafka Connect.
This feature does not affect sink connectors and does not change the topic-specific settings on any existing topics.
Public Interfaces and Proposed Changes
This proposal defines a flexible way to specify topic-specific settings for new topics by introducing the concept of topic creation rules that are specified for a source connector entirely through connector-level configuration properties. Each topic creation rule has a name, a regular expression that is used to determine whether the rule applies to a particular topic, and the topic-specific settings that would be used when creating the new topic. When Kafka Connect determines that a new topic may need to be created, it will find and use only the first topic creation rule that applies to the new topic's name, pass that rule's topic-specific settings to the source connector for validation / override, and then use these final topic-specific settings when creating the new topic.
Note that if no topic creation rules are defined or when no topic creation rules match, the Kafka Connect framework will not attempt to create the topic before sending source records to that topic. Therefore, by default Kafka Connect will rely upon the broker to auto-create topics as needed.
Configuration
Property | Type | Default | Possible Values | Description |
---|---|---|---|---|
topic.creation | list | <empty> | <any> | The names of the topic creation rules, in the order that they should be matched against new topics used by the source connector. |
topic.creation.${ruleName}.regex | string | n/a | valid regex | The regular expression that will be applied to new topics. The first rule whose regular expression matches the new topic name will be used. |
topic.creation.${ruleName}.policy | enum | "create" | "autocreate", "create", "fail" | How new topics to which the rule applies should be created. When set to "created", Connect will attempt to use the Admin API to explicitly create the topic. When set to "autocreated", Connect will attempt to let the broker autocreate the topic per its own settings; all topic-specific settings on this rule will be ignored. When set to "fail", any topic to which the rule applies will result in a failure of the task if the topic does not already exist. |
topic.creation.${ruleName}.replication.factor | int | 3 | >= 1 when a value is specified | The replication factor for the topics created using this rule. |
topic.creation.${ruleName}.partitions | int | 1 | >= 1 when a value is specified | The number of partitions for the topics created using this rule. |
topic.creation.${ruleName}.${kafkaTopicSpecificConfigName} | n/a | Any of the Kafka topic-level configurations. The broker's topic-level configuration value will be used if that configuration is not specified for the rule. |
The following example defines two topic creation rules named "firstRule" and "defaultRule":
topic.creation=firstRule,defaultRule
topic.creation.firstRule.regex=MyPrefix.*
topic.creation.firstRule.replication.factor=3
topic.creation.firstRule.partitions=5
topic.creation.firstRule.cleanup.policy=compact
topic.creation.firstRule.min.insync.replicas=2
topic.creation.firstRule.unclean.leader.election.enable=false
topic.creation.defaultRule.regex=.*
topic.creation.defaultRule.replication.factor=3
topic.creation.defaultRule.partitions=1
topic.creation.defaultRule.cleanup.policy=compact
topic.creation.defaultRule.min.insync.replicas=2
topic.creation.defaultRule.unclean.leader.election.enable=false
This style of configuration properties is very similar to those defined for Single Message Transforms. These properties can appear in the connector's configuration in any order, but the order of the names in "topic.creation" is important and defines the order in which the framework evaluates whether each rule applies to a topic with a given name. For example, if a new topic named "MyPrefixABC" is to be created, the framework would first use the regular expression of the "firstRule" to see if it matched the topic name "MyPrefixABC". Because it does, the topic-specific settings defined in the properties beginning with "topic.creation.firstRule." would be used and passed to the connector for validation / overrides and ultimately used to create the topic. However, a topic named "XYZ" would not match the "firstRule" but would match the "defaultRule", and thus the topic-specific settings defined in the configuration properties beginning with "topic.creation.defaultRule." would be used and passed to the connector for validation / overrides and ultimately used to create the topic.
Here is another example that shows how to use one rule for topics whose names begin with "MyPrefix", and a second default rule that will result in connector failure if the framework needs to create topics because they don't yet exist and are used in source records. This provides a way to enforce that a source connector does not attempt to create unwanted topics.
topic.creation=firstRule,failRule
topic.creation.firstRule.regex=MyPrefix.*
topic.creation.firstRule.replication.factor=3
topic.creation.firstRule.partitions=5
topic.creation.firstRule.cleanup.policy=compact
topic.creation.firstRule.min.insync.replicas=2
topic.creation.firstRule.unclean.leader.election.enable=false
topic.creation.failRule.regex=.*
topic.creation.failRule.policy=fail
org.apache.kafka.connect.source.SourceTask
by adding a non-abstract method with the following signature that will by default simply return the input TopicSettings
:
public abstract class SourceTask implements Task { ... /** * Determine the topic-specific settings for a new topic to which the {@link SourceRecord} {@link #poll() produced by this task} * are to be written. This method is called whenever Connect sees a topic in the {@link SourceRecord}s for the first time and when * verifies that the topic does not already exist. * <p> * By default this method simply returns the supplied initial settings. Implementations can override this method * to set the topic-specific settings that should be used when creating the new topic. The broker's own * topic-specific configuration settings will be used as defaults for any settings not set via the resulting object. * </p> * * @param settings the initial settings; never null * @param currentClusterSize the current number of brokers in the cluster, which can be used as an upper limit on the replication factor; always positive * @return the topic-specific settings; may be null if Connect should not attempt to create the topic (and potentially rely upon the broker auto-creating the topic) */ public TopicSettings settingsForNewTopic(TopicSettings settings, int currentClusterSize) { return settings; } }
We will also add the new interface org.apache.kafka.connect.storage.TopicSettings
that has methods for SourceTask.settingsForNewTopic
implementation to easily access and update the topic-specific settings:
/** * Topic-specific settings for a new topic. */ public interface TopicSettings { /** * The log cleanup policy for segments beyond the retention window */ enum CleanupPolicy { /** * Ensures that Kafka will always retain at least the last known value for each message key within the log of * data for a single topic partition. */ COMPACT, /** * Discard old log data after a fixed period of time or when the log reaches some predetermined size. */ DELETE, /** * {@link #COMPACT Compact} to retain at least the last known value for each message key <i>and</i> * {@link #DELETE delete} messages after a period of time. */ COMPACT_AND_DELETE } /** * Get the name of the topic. * @return the name of the topic */ String name(); /** * Get the number of partitions. * @return the number of partitions; always positive */ int partitions(); /** * Get the replication factor. * @return the replication factor; always positive */ short replicationFactor(); /** * Get the cleanup policy. * @return the cleanup policy; may be null if the broker's default setting for new topics is to be used */ CleanupPolicy cleanupPolicy(); /** * Get the maximum time the broker will retain a log before discarding old log segments to free up space. * This only applies when {@link #cleanupPolicy()} is set to {@link CleanupPolicy#DELETE}. * * @param unit the time unit in which the retention is defined; may not be null * @return the retention time, or -1 if there is no time limit; may be null if the broker's default setting is to be used */ Long retentionTime(TimeUnit unit); /** * Get the maximum size a partition (which consists of log segments) can grow to before the broker discards old log segments * to free up space. This only applies when {@link #cleanupPolicy()} is set to {@link CleanupPolicy#DELETE}. * * @return the number of bytes, or -1 if there is no size limit; may be null if the broker's default setting is to be used */ Long retentionBytes(); /** * Get the minimum number of in-sync replicas that must exist for the topic to remain available. * @return the minimum number of in-sync replicas; may be null if the broker's default setting for new topics is to be used */ Short minInSyncReplicas(); /** * Get whether the broker is allowed to elect an unclean leader for the topic. * @return true if unclean leader election is allowed (potentially leading to data loss), or false otherwise; * may be null if the broker's default setting for new topics is to be used */ Boolean uncleanLeaderElection(); /** * Get the value for the named topic-specific configuration. * @param name the name of the topic-specific configuration * @return the configuration value, or null if the specified configuration has not been set */ String config(String name); /** * Get the values for the set topic-specific configuration. * @return the map of configuration values keyed by configuration name; never null */ Map<String, String> configs(); /** * Specify the desired number of partitions for the topic. * * @param numPartitions the desired number of partitions; must be positive * @return this settings object to allow methods to be chained; never null */ TopicSettings partitions(int numPartitions); /** * Specify the desired replication factor for the topic. * * @param replicationFactor the desired replication factor; must be positive * @return this settings object to allow methods to be chained; never null */ TopicSettings replicationFactor(short replicationFactor); /** * Specify the desired cleanup policy for the topic. * @param policy the cleanup policy; may not be null * @return this settings object to allow methods to be chained; never null */ TopicSettings cleanupPolicy(CleanupPolicy policy); /** * Specify the maximum time the broker will retain a log before discarding old log segments to free up space. * This only applies when {@link #cleanupPolicy()} is set to {@link CleanupPolicy#DELETE}. * * @param time the retention time, or -1 if there is no time limit * @param unit the time unit; may not be null * @return this settings object to allow methods to be chained; never null */ TopicSettings retentionTime(long time, TimeUnit unit); /** * Specify the maximum size a partition (which consists of log segments) can grow to before the broker discards old log segments * to free up space. This only applies when {@link #cleanupPolicy()} is set to {@link CleanupPolicy#DELETE}. * * @param bytes the number of bytes, or -1 if there is no size limit * @return this settings object to allow methods to be chained; never null */ TopicSettings retentionBytes(long bytes); /** * Specify the minimum number of in-sync replicas required for this topic. * * @param minInSyncReplicas the minimum number of in-sync replicas allowed for the topic; must be positive * @return this settings object to allow methods to be chained; never null */ TopicSettings minInSyncReplicas(short minInSyncReplicas); /** * Specify whether the broker is allowed to elect a leader that was not an in-sync replica when no ISRs * are available. * * @param allow true if unclean leaders can be elected, or false if they are not allowed * @return this settings object to allow methods to be chained; never null */ TopicSettings uncleanLeaderElection(boolean allow); /** * Specify the name and value of the topic-specific setting for the topic, overwriting any corresponding property that was * previously set. * * @param configName the name of the topic-specific configuration property * @param configValue the value for the topic-specific configuration property * @return this settings object to allow methods to be chained; never null */ TopicSettings with(String configName, String configValue); /** * Specify the name and value of one or more topic-specific settings for the topic, overwriting any corresponding property that was * previously set. * * @param configs the desired topic configuration properties, or null if all existing properties should be cleared * @return this settings object to allow methods to be chained; never null */ TopicSettings with(Map<String, String> configs); /** * Clear all topic-specific settings from this definition. Unless other topic-specific settings are defined after this method is * called, the broker's default topic settings will be used when the topic is created. * * @return this settings object to allow methods to be chained; never null */ TopicSettings clear(); }
Kafka Connect will provide an implementation of TopicSettings
. When Kafka Connect sees a topic in the SourceRecords that the worker has not yet seen since starting up, it will check whether that topic exists using the Admin API of the broker. If the topic does not exist, it will find the first applicable topic creation rule, instantiate a TopicSettings
object with that rule's topic-specific settings, pass it to the SourceTask
's settingsForNewTopic
method, and use the resulting TopicSettings
instance and its topic-specific settings when the framework attempts to create the new topic. Note that Kafka Connect will altogether skip creating the new topic if no topic creation rule applies to the topic or if the settingsForNewTopic
method returns null. Kafka Connect will log these activities at the appropriate level.
Kafka Connect will do nothing if the broker does not support the Admin API methods to check for the existence of a topic and to create a new topic. This is equivalent to relying upon auto-topic creation.
Note that the Java API does not allow the connector to override the creation policy for a topic. The creation policy must only be set in the connector configuration.
Compatibility, Deprecation, and Migration Plan
When users upgrade an existing Kafka Connect installation, they do not need to change any configurations or upgrade any connectors: this feature will not be enabled and Kafka Connect will behave exactly as before by relying upon the broker to auto-create any new topics or upon users to manually create the topics before they are used. There are no plans to remove this legacy behavior.
After upgrading, they must alter the configuration of any source connector to enable the creation of new topics, by defining the topic.creation
property with the names of one or more rules and the corresponding topic.creation.${ruleName}.*
properties for each of the rules.
The developers of source connectors do not need to update or rebuild their connectors, since this proposal's Java API changes are binary compatible. However, if they want their source connector to validate and/or override any of the topic-specific settings, the developers must override the SourceTask
's settingsForNewTopic
method and release their connector. For example, a source connector whose topics should always be compacted can always call settings.cleanupPolicy(COMPACT)
to effectively define the connector's own constraints. Or, a source connector may want to simply log warnings when some topic-specific settings are potentially incorrect (e.g., the number of available brokers or the replication factor does not provide enough replication).
This feature does not affect sink connectors.
This feature does not change the topic-specific settings on any existing topics.
Rejected Alternatives
Several alternative designs were considered but ultimately rejected:
- Change only the Java API and have no configuration changes. This very simple approach would have required no changes to a connector configuration yet still given the source connector tremendous flexibility and responsibility in defining the topic-specific settings for each new topics. This approach was rejected because it still relies upon the connector implementation to address/handle all variation in topic-specific settings that might be desired between new topics; because connector users have very little control over the topic-specific settings; and because the connector to be modified to take advantage of the new feature and would therefore not work with older connectors.
- Change the Java API and use connector configuration properties to define the topic-specific settings used as defaults on all topics. This approach is a bit more flexible than the first alternative in that it allows for connector users to specify some default topic-specific settings in configuration properties. However, this approach was rejected because it offers connector users very little flexibility since it still relies upon the source connector to determine the settings for each of the topics.
- Change only the connector configuration properties. This approach is identical to this proposal except there are no changes to the Java API. This gives users a lot of flexibility and control over the topic-specific settings for each of the topics created by the connector, but it does not offer any way for the source connector to validate any of these settings. For example, a connector that really requires all new topics be compacted could not enforce this.
- Allow the connector or its tasks to explicitly create topics. This adds complexity that has not yet been requested.
- Allow the connector to modify the topic-specific settings on an existing topic. This can be complicated, since not all topic settings can be easily changed. It also would introduce potential conflicts between a connector and other admin clients that are attempting to change the topic configuration settings to different values. Such a scenario would be extremely confusing to users, since they might not expect that the source connector is configured to modify the topic settings for an existing topic.