Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As of 0.11.0.0, Kafka Connect can automatically create its internal topics using the new AdminClient (see KIP-154), but it still relies upon the broker to auto-create new topics to which source connector records are written. This is error prone, as it's easy for the topics to be created with an inappropriate cleanup policy, replication factor, and/or number of partitions. Some users would rather not configure their brokers with auto.create.topics.enable=true, and in these cases users must manually pre-create the necessary topics. That, of course, can be quite challenging for some source connectors that choose topics dynamically based upon the source and that result in large numbers of topics.

Kafka Connect should instead create the topics automatically and allow the source connector to specify/customize the for source connectors. New topics for a connector may need to have a variety of topic-specific settings for each of the new topics. 

Public Interfaces

We will modify the following existing abstract class in the Kafka Connect public API:

  • org.apache.kafka.connect.source.SourceTask

by adding a non-abstract method with the following signature:

  • public TopicSettings settingsForNewTopic(TopicSettings settings);

This new method will be called by the Kafka Connect framework when it determines that a new topic may need to be created. It allows the Kafka Connect framework to provide default topic-specific settings to the source connector and allows the source connector to override any or all of these settings. The method's default implementation will be to return null, signaling Kafka Connect should use the current behavior of letting the broker auto-create the topics. This will also be the behavior for source connectors compiled against an earlier version of Kafka Connect that did not have this feature but deployed using a later version of Kafka Connect that does have this feature. However, if the developers of a source connector choose to override this method and return a TopicSettings, then Kafka Connect will use those settings when it attempts to use the AdminClient to explicitly create the new topic.

We will also add the following interface to the Kafka Connect public API:

  • org.apache.kafka.connect.storage.TopicSettings

This interface will contain methods that allow the settingsForNewTopic method implementations to easily access and modify the initial topic-specific settings supplied as a parameter to the method. The initial topic-specific settings for a given topic will be determined by new connector-level configurations.

 

Proposed Changes

This section outlines in more detail the changes to the Java API, connector configuration, and behaviors.

Java API

This proposal adds a single non-abstract method to the existing org.apache.kafka.connect.source.SourceTask class:

public abstract class SourceTask implements Task {
...
    /**
* Determine the topic-specific settings for a new topic to which the {@link SourceRecord} {@link #poll() produced by this task}
* are to be written. This method is called whenever a new topic is seen in the {@link SourceRecord}s, and sh
* <p>
* By default this method simply returns the supplied initial settings. Implementations can override this method
* to set the topic-specific settings that should be used when creating the new topic. The broker's own
* topic-specific configuration settings will be used as defaults for any settings not set via the resulting object.
* </p>
*
* @param settings the initial settings; never null
* @return the topic-specific settings; may be null if the broker should auto-create the topic
*/
public TopicSettings settingsForNewTopic(TopicSettings settings) {
return null;
}
}

and adds a new org.apache.kafka.connect.storage.TopicSettings interface: 

like the replication factor, the number of partitions, the cleanup policy, the minimum number of in-sync replicas (ISRs), and whether unclean leaders can be elected. This means that users need to have quite a bit of control over these settings, and may need different settings for different topics. And, this should work in a backward compatible way for connectors built using earlier versions of Kafka Connect. 

This feature does not affect sink connectors and does not change the topic-specific settings on any existing topics.

Public Interfaces and Proposed Changes

This proposal defines a flexible way to specify topic-specific settings for new topics by introducing the concept of topic creation rules that are specified for a source connector entirely through connector-level configuration properties. Each topic creation rule has a name, a regular expression that is used to determine whether the rule applies to a particular topic, and the topic-specific settings that would be used when creating the new topic. When Kafka Connect determines that a new topic may need to be created, it will find the first topic creation rule that applies to the new topic's name and then use the topic-specific settings specified by that rule. Kafka Connect will then use a new Java API to let the source connector validate and override these topic-specific settings, and then will use these final topic-specific settings when creating the new topic.

Note that if no topic creation rules are defined or when no topic creation rules match, the Kafka Connect framework will not attempt to create the topic before sending source records to that topic. Therefore, by default Kafka Connect will rely upon the broker to auto-create topics as needed. 

Configuration

We will add a configuration property that lists the user-defined names of zero or more topic creation rules:
  • topic.creation

Then each of the named topic creation rules will be defined by one configuration property that define the regular expression used to match topic names and additional configuration properties that define the topic-specific settings:

  • topic.creation.${ruleName}.regex
  • topic.creation.${ruleName}.replication.factor
  • topic.creation.${ruleName}.partitions
  • topic.creation.${ruleName}.${kafkaTopicSpecificConfigName}

None of these new connector configuration properties has a default value, although any topic-specific setting not specified when Kafka Connect creates the topic will inherit the broker's corresponding topic-specific settings.

The following example defines two topic creation rules named "firstRule" and "defaultRule" that are applied in that order:

 

topic.creation=firstRule,defaultRule
 
topic.creation.firstRule.regex=MyPrefix.*
topic.creation.firstRule.replication.factor=3
topic.creation.firstRule.partitions=5
topic.creation.firstRule.cleanup.policy=compact
topic.creation.firstRule.min.insync.replicas=2
 topic.creation.firstRule.unclean.leader.election.enable=false
  
topic.creation.defaultRule.regex=.*
topic.creation.defaultRule.replication.factor=3
topic.creation.defaultRule.partitions=1
topic.creation.defaultRule.cleanup.policy=compact
topic.creation.defaultRule.min.insync.replicas=2
topic.creation.defaultRule.unclean.leader.election.enable=false

This style of configuration properties is very similar to those defined for Single Message Transforms.

Java API

To allow source connector implementations may the ability to validate or override some or all of these topic-specific settings, we will modify the following existing abstract class in the Kafka Connect public API:

  • org.apache.kafka.connect.source.SourceTask

by adding a non-abstract method with the following signature that will by default simply return the input TopicSettings:

public abstract class SourceTask implements Task {
...
    /**
* Determine the topic-specific settings for a new topic to which the {@link SourceRecord} {@link #poll() produced by this task}
* are to be written. This method is called whenever a new topic is seen in the {@link SourceRecord}s, and sh
* <p>
* By default this method simply returns the supplied initial settings. Implementations can override this method
* to set the topic-specific settings that should be used when creating the new topic. The broker's own
* topic-specific configuration settings will be used as defaults for any settings not set via the resulting object.
* </p>
*
* @param settings the initial settings; never null
* @return the topic-specific settings; may be null if the broker should auto-create the topic
*/
public TopicSettings settingsForNewTopic(TopicSettings settings) {
return settings;
}
}

 

We will also add the new interface org.apache.kafka.connect.storage.TopicSettings that has methods for SourceTask.settingsForNewTopic implementation to easily access and update the topic-specific settings:

 

/**
* Topic-specific settings for a new topic.
*/
public interface TopicSettings {

 

/**
* Topic-specific settings for a new topic.
*/
public interface TopicSettings {
/**
* The log cleanup policy for segments beyond the retention window
*/
enum CleanupPolicy {
/**
* Ensures that Kafka will always retain at least the last known value for each message key within the log of data for a single topic partition.
*/
COMPACT,
/**
* Discard old log data after a fixed period of time or when the log reaches some predetermined size.
*/
DELETE,
/**
* {@link #COMPACT Compact} to retain at least the last known value for each message key <i>and</i>
* {@link #DELETE delete} messages after a period of time.
*/
COMPACT_AND_DELETE;
}
/**
* GetThe thelog namecleanup ofpolicy thefor topic.
segments beyond * @return the name of the topicretention window
*/
enum String name();CleanupPolicy {
/**
* Get the number of* partitions.
Ensures that Kafka will always *retain at @returnleast the numberlast ofknown partitions;value alwaysfor positive
each message key within the */
log of data for int partitions();a single topic partition. /**
* Get the replication factor.*/
* @return the replicationCOMPACT,
factor; always positive
/*/*
short replicationFactor(); /**
* Get the cleanup policy * Discard old log data after a fixed period of time or when the log reaches some predetermined size.
* @return the cleanup*/
policy; may be null if the broker's default setting for new topics is to be used DELETE,
/**
*/
CleanupPolicy cleanupPolicy();
    /**
* Get the minimum number of in-sync replicas that must exist for the topic to remain available.
* @return the minimum number of in-sync replicas; may be null if the broker's default setting for new topics is to be used
*/
Short minInSyncReplicas();
* {@link #COMPACT Compact} to retain at least the last known value for each message key <i>and</i>
* {@link #DELETE delete} messages after a period of time.
*/
COMPACT_AND_DELETE;
}
        /**
* Get whetherthe name of the topic.
broker is allowed to elect* an@return uncleanthe leadername forof the topic.
*/
@return true if unclean String name();
    /**
* Get the number of partitions.
* @return the number of partitions; always positiveleader election is allowed (potentially leading to data loss), or false otherwise;
* may be null if the broker's default setting for new topics is to be used
*/
Booleanint uncleanLeaderElectionpartitions();
    /**
* Get the value for the named topic-specific configurationreplication factor.
* @param name the name of the topic-specific configuration
* @return the configurationreplication value,factor; or null if the specified configuration has not been setalways positive
*/
Objectshort configreplicationFactor(String name);
    /**
* Get the values for the set topic-specific configuration.
* @return the map of configuration values keyed by configuration name; never nullcleanup policy.
* @return the cleanup policy; may be null if the broker's default setting for new topics is to be used
*/
Map<String, Object> configCleanupPolicy cleanupPolicy();
    /**
* SpecifyGet the desiredminimum number of partitionsin-sync forreplicas the topic.
*that must exist for the topic to remain available.
* @param@return numPartitions the desiredminimum number of partitionsin-sync replicas; mustmay be positive
null if the broker's default *setting @returnfor thisnew settingstopics objectis to allow methods to be chained; never nullused
*/
TopicSettingsShort partitionsminInSyncReplicas(int numPartitions);
    /**
* SpecifyGet whether the desired replication factorbroker is allowed to elect an unclean leader for the topic.
*
@return true if unclean * @param replicationFactor the desired replication factor; must be positiveleader election is allowed (potentially leading to data loss), or false otherwise;
* @returnmay thisbe settingsnull objectif tothe allowbroker's methodsdefault tosetting befor chained;new never nulltopics is to be used
*/
Boolean TopicSettings replicationFactor(short replicationFactoruncleanLeaderElection();
    /**
* SpecifyGet the desiredvalue cleanup policy for the named topic-specific configuration.
* @param policyname the cleanupname policy;of maythe not be nulltopic-specific configuration
* @return thisthe settingsconfiguration objectvalue, toor allownull methodsif tothe bespecified chained;configuration neverhas null
not been set
*/
TopicSettingsObject cleanupPolicyconfig(CleanupPolicyString policyname);
    /**
* SpecifyGet the minimumvalues for numberthe ofset intopic-sync replicas required for this topicspecific configuration.
*
* @param minInSyncReplicas @return the minimummap number of in-syncconfiguration replicasvalues allowedkeyed forby theconfiguration topicname; mustnever be positivenull
*/
@return this settingsMap<String, object toObject> config();
    /**
* Specify the desired number of partitions for the topic. allow methods to be chained; never null
*/
TopicSettings minInSyncReplicas(short minInSyncReplicas);
/ **
* Specify@param whethernumPartitions the brokerdesired isnumber allowedof topartitions; electmust abe leaderpositive
that was not an in-sync* replica@return whenthis nosettings ISRs
object to allow methods to * are available.be chained; never null
*/
TopicSettings partitions(int numPartitions);
    /**
* Specify the desired replication factor for the topic. * @param allow true if unclean leaders can be elected, or false if they are not allowed
*
@return * @param replicationFactor the desired replication factor; must be positive
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings uncleanLeaderElectionreplicationFactor(booleanshort allowreplicationFactor);
    /**
* Specify the configurationdesired cleanup propertiespolicy for the topic, overwriting any previously-set properties.
*
@param policy the cleanup *policy; @parammay configNamenot thebe name of the topic-specific configuration property
* @param configValue the value for the topic-specific configuration property
* null
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings configcleanupPolicy(String configName, Object configValueCleanupPolicy policy);
    /**
* Specify the configuration properties minimum number of in-sync replicas required for thethis topic,.
overwriting all previously-set properties,*
* including {@link #cleanupPolicy(CleanupPolicy)}, {@link #minInSyncReplicas(short)}, and {@link #uncleanLeaderElection(boolean)}.
*@param minInSyncReplicas the minimum number of in-sync replicas allowed for the topic; must be positive
* @param@return configsthis thesettings desiredobject topicto configurationallow properties,methods orto null if all existing properties should be clearedbe chained; never null
*/
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings config(Map<String, Object> configs);
}

 

 

Configuration

A transformation chain will be configured at the connector-level. The order of transformations is defined by the transforms config which represents a list of aliases. An alias in transforms implies that some additional keys are configurable:
transforms.$alias.type – fully qualified class name for the transformation
transforms.$alias.* – all other keys as defined in Transformation.config() are embedded with this prefix

Example:

 

 

transforms=tsRouter,insertKafkaCoordinates
 
transforms.tsRouter.type=org.apache.kafka.connect.transforms.TimestampRouter
transforms.tsRouter.topic.format=${topic}-${timestamp}
transforms.tsRouter.timestamp.format=yyyyMMdd
  
transforms.insertKafkaCoordinates.type=org.apache.kafka.connect.transforms.InsertInValue
transforms.insertKafkaCoordinates.topic=kafka_topic
transforms.insertKafkaCoordinates.partition=kafka_partition
transforms.insertKafkaCoordinates.offset=kafka_offset

 

TopicSettings minInSyncReplicas(short minInSyncReplicas);
    /**
* Specify whether the broker is allowed to elect a leader that was not an in-sync replica when no ISRs
* are available.
*
* @param allow true if unclean leaders can be elected, or false if they are not allowed
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings uncleanLeaderElection(boolean allow);
    /**
* Specify the configuration properties for the topic, overwriting any previously-set properties.
*
* @param configName the name of the topic-specific configuration property
* @param configValue the value for the topic-specific configuration property
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings config(String configName, Object configValue);
    /**
* Specify the configuration properties for the topic, overwriting all previously-set properties,
* including {@link #cleanupPolicy(CleanupPolicy)}, {@link #minInSyncReplicas(short)}, and {@link #uncleanLeaderElection(boolean)}.
*
* @param configs the desired topic configuration properties, or null if all existing properties should be cleared
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings config(Map<String, Object> configs);
}

 

Kafka Connect will provide an implementation of TopicSettings. When Kafka Connect determines it should attempt to create a new topic, it will find the first applicable topic creation rule, instantiate a TopicSettings object with that rule's topic-specific settings, pass it to the SourceTask's settingsForNewTopic method, and use the resulting TopicSettings instance and its topic-specific settings when the framework attempts to create the new topic. Note that Kafka Connect will altogether skip creating the new topic if no topic creation rule applies to the topic or if the settingsForNewTopic method returns null. Kafka Connect will log these activities at the appropriate level. 

Compatibility, Deprecation, and Migration Plan

When users upgrade an existing Kafka Connect installation, they do not need to change any configurations or upgrade any connectors: this feature will not be enabled and Kafka Connect will behave exactly as before by relying upon the broker to auto-create any new topics or upon users to manually create the topics before they are used. There are no plans to remove this legacy behavior.

After upgrading, they must alter the configuration of any source connector to enable the creation of new topics, by defining the topic.creation property with the names of one or more rules and the corresponding topic.creation.${ruleName}.* properties for each of the rules.

The developers of source connectors do not need to update or rebuild their connectors, since this proposal's Java API changes are binary compatible. However, if they want their source connector to validate and/or override any of the topic-specific settings, the developers must override the SourceTask's settingsForNewTopic method and release their connector. For example, a source connector whose topics should always be compacted can always call settings.cleanupPolicy(COMPACT) to effectively define the connector's own constraints.

This feature does not affect sink connectors and does not change the topic-specific settings on any existing topics.

Rejected Alternatives

Several alternative designs were considered but ultimately rejected:

  1. Change only the Java API and have no configuration changes. This very simple approach would have required no changes to a connector configuration yet still given the source connector tremendous flexibility and responsibility in defining the topic-specific settings for each new topics. This approach was rejected because it still relies upon the connector implementation to address/handle all variation in topic-specific settings that might be desired between new topics; because connector users have very little control over the topic-specific settings; and because the connector to be modified to take advantage of the new feature and would therefore not work with older connectors.
  2. Change the Java API and use connector configuration properties to define the topic-specific settings used as defaults on all topics. This approach is a bit more flexible than the first alternative in that it allows for connector users to specify some default topic-specific settings in configuration properties. However, this approach was rejected because it offers connector users very little flexibility since it still relies upon the source connector to determine the settings for each of the topics.
  3. Change only the connector configuration properties. This approach is identical to this proposal except there are no changes to the Java API. This gives users a lot of flexibility and control over the topic-specific settings for each of the topics created by the connector, but it does not offer any way for the source connector to validate any of these settings. For example, a connector that really requires all new topics be compacted could not enforce this.

 

Those initial settings will be chosen to create topics with:

  • a single partition
  • a replication factor of 3
  • cleanup.policy=compact
  • min.insync.replicas=2
  • unclean.leader.election.enable=false

 

 

does not implement he old behavior of relying upon the broker to auto-create the necessary topics will be 

If a source connector does not override this method

Source connectors need not 

or let the broker auto-create it.

 

The additional method will have an  and will have the following signature:

  •  

 

. the topic-specific settings prior to Kafka Connect attempting using the AdminClient to 

We will add a method to the following existing abstract class in the Kafka Connect public API:

  • org.apache.kafka.connect.source.SourceTask

The method will have the following signature and be called by the Kafka Connect framework when it needs to create a new topic

org.apache.kafka.connect.source

We will add a new interface

 

This proposal adds to the existing SourceTask abstract class a new non-abstract method 

public abstract class SourceTask implements Task {
...
    /**
* Determine the topic-specific settings for a new topic to which the {@link SourceRecord} {@link #poll() produced by this task}
* are to be written. This method is called whenever a new topic is seen in the {@link SourceRecord}s, and sh
* <p>
* By default this method simply returns the supplied initial settings. Implementations can override this method
* to set the topic-specific settings that should be used when creating the new topic. The broker's own
* topic-specific configuration settings will be used as defaults for any settings not set via the resulting object.
* </p>
*
* @param settings the initial settings; never null
* @return the topic-specific settings; may be null if the broker should auto-create the topic
*/
public TopicSettings settingsForNewTopic(TopicSettings settings) {
return null;
}
}
 
The TopicSettings interface is new and represents the topic-specific configuration settings:
/**
* Topic-specific settings for a new topic.
*/
public interface TopicSettings {
/**
* The log cleanup policy for segments beyond the retention window
*/
enum CleanupPolicy {
/**
* Ensures that Kafka will always retain at least the last known value for each message key within the log of data for a single topic partition.
*/
COMPACT,
/**
* Discard old log data after a fixed period of time or when the log reaches some predetermined size.
*/
DELETE,
/**
* {@link #COMPACT Compact} to retain at least the last known value for each message key <i>and</i>
* {@link #DELETE delete} messages after a period of time.
*/
COMPACT_AND_DELETE;
}
    /**
* Get the name of the topic.
* @return the name of the topic
*/
String name();
    /**
* Get the number of partitions.
* @return the number of partitions; always positive
*/
int partitions();
    /**
* Get the replication factor.
* @return the replication factor; always positive
*/
short replicationFactor();
    /**
* Get the cleanup policy.
* @return the cleanup policy; may be null if the broker's default setting for new topics is to be used
*/
CleanupPolicy cleanupPolicy();
    /**
* Get the minimum number of in-sync replicas that must exist for the topic to remain available.
* @return the minimum number of in-sync replicas; may be null if the broker's default setting for new topics is to be used
*/
Short minInSyncReplicas();
    /**
* Get whether the broker is allowed to elect an unclean leader for the topic.
* @return true if unclean leader election is allowed (potentially leading to data loss), or false otherwise;
* may be null if the broker's default setting for new topics is to be used
*/
Boolean uncleanLeaderElection();
    /**
* Get the value for the named topic-specific configuration.
* @param name the name of the topic-specific configuration
* @return the configuration value, or null if the specified configuration has not been set
*/
Object config(String name);
    /**
* Get the values for the set topic-specific configuration.
* @return the map of configuration values keyed by configuration name; never null
*/
Map<String, Object> config();
    /**
* Specify the desired number of partitions for the topic.
*
* @param numPartitions the desired number of partitions; must be positive
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings partitions(int numPartitions);
    /**
* Specify the desired replication factor for the topic.
*
* @param replicationFactor the desired replication factor; must be positive
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings replicationFactor(short replicationFactor);
    /**
* Specify the desired cleanup policy for the topic.
* @param policy the cleanup policy; may not be null
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings cleanupPolicy(CleanupPolicy policy);
    /**
* Specify the minimum number of in-sync replicas required for this topic.
*
* @param minInSyncReplicas the minimum number of in-sync replicas allowed for the topic; must be positive
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings minInSyncReplicas(short minInSyncReplicas);
    /**
* Specify whether the broker is allowed to elect a leader that was not an in-sync replica when no ISRs
* are available.
*
* @param allow true if unclean leaders can be elected, or false if they are not allowed
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings uncleanLeaderElection(boolean allow);
    /**
* Specify the configuration properties for the topic, overwriting any previously-set properties.
*
* @param configName the name of the topic-specific configuration property
* @param configValue the value for the topic-specific configuration property
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings config(String configName, Object configValue);
    /**
* Specify the configuration properties for the topic, overwriting all previously-set properties,
* including {@link #cleanupPolicy(CleanupPolicy)}, {@link #minInSyncReplicas(short)}, and {@link #uncleanLeaderElection(boolean)}.
*
* @param configs the desired topic configuration properties, or null if all existing properties should be cleared
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings config(Map<String, Object> configs);
}

 

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.