Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Table of Contents

Status

Current state:  Under Discussion

Discussion thread: here

 Adopted

Current active discussion thread: here

Previous discussion threads: here and here

Vote thread (current): here

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-5295

Released: AK 2.6.0JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

As of 0.11.0.0, Kafka Connect can automatically create its internal topics using the new AdminClient (see KIP-154), but it still relies upon the broker to auto-create new topics to which source connector records are written. This is error prone, as it's easy for the topics to be created with an inappropriate cleanup policy, replication factor, and/or number of partitions. Some users would rather not configure their brokers with autoKafka clusters disable auto topic creation via auto.create.topics.enable=truefalse, and in these cases users creating connectors must manually pre-create the necessary topics. That, of course, can be quite challenging for some source connectors that choose topics dynamically based upon the source and that result in large numbers of topics.

Kafka Connect should instead be able to create the topics automatically for source connectors. New topics for a connector may need to have a variety of topic-specific settings like the , using a replication factor, the number of partitions, the cleanup policy, the minimum number of in-sync replicas (ISRs), and whether unclean leaders can be elected. This means that users need to have quite a bit of control over these settings, and may need different settings for different topics. And, this should work in a backward compatible way for connectors built using earlier versions of Kafka Connect. This feature does not affect sink connectors and does not change the topic-specific settings on any existing topics.as well as other topic-specific settings declared in a source connector configuration. Additionally, after the introduction of KIP-458, Kafka Connect may create source connector topics by optionally using connector-specific Kafka client settings that are declared in the source connector's configuration using appropriate overrides. If these properties are not specified, the previous Connect behavior of relying upon the topics to exist or be auto created by the broker. Additionally, operators of Connect clusters should be able to either enable or disable this feature.

Public Interfaces and Proposed Changes

This proposal defines a flexible simple way for source connector configurations to specify topic-specific settings for new topics by introducing the concept of topic creation rules that are specified for a source connector entirely through connector-level configuration properties. Each topic creation rule has a name, a regular expression that is used to determine whether the rule applies to a particular topic, and the topic-specific settings that would be used when creating the new topic. When Kafka Connect determines that a new topic may need to be created, it will find and use only the first topic creation rule that applies to the new topic's name, pass that rule's topic-specific settings to the source connector for validation / override, and then use these final topic-specific settings when creating the new topic.

Note that if no topic creation rules are defined or when no topic creation rules match, the Kafka Connect framework will not attempt to create the topic before sending source records to that topic. Therefore, by default Kafka Connect will rely upon the broker to auto-create topics as needed. 

Configuration

We will add a configuration property that lists the user-defined names of zero or more topic creation rules:
  • topic.creation

Then each of the named topic creation rules will be defined by one configuration property that define the regular expression used to match topic names and additional configuration properties that define the topic-specific settings:

  • topic.creation.${ruleName}.regex
  • topic.creation.${ruleName}.replication.factor
  • topic.creation.${ruleName}.partitions
  • topic.creation.${ruleName}.${kafkaTopicSpecificConfigName}

None of these new connector configuration properties has a default value, although any topic-specific setting not specified when Kafka Connect creates the topic will inherit the broker's corresponding topic-specific settings.

The following example defines two topic creation rules named "firstRule" and "defaultRule":

 

...

topic.creation=firstRule,defaultRule
 
topic.creation.firstRule.regex=MyPrefix.*
topic.creation.firstRule.replication.factor=3
topic.creation.firstRule.partitions=5
topic.creation.firstRule.cleanup.policy=compact
topic.creation.firstRule.min.insync.replicas=2
topic.creation.firstRule.unclean.leader.election.enable=false
  
topic.creation.defaultRule.regex=.*
topic.creation.defaultRule.replication.factor=3
topic.creation.defaultRule.partitions=1
topic.creation.defaultRule.cleanup.policy=compact
topic.creation.defaultRule.min.insync.replicas=2
topic.creation.defaultRule.unclean.leader.election.enable=false

These properties can appear in the connector's configuration in any order, but the order of the names in topic.creation is important and defines the order in which the framework evaluates whether each rule applies to a topic with a given name. For example, if a new topic named "MyPrefixABC" is to be created, the framework would first use the regular expression of the "firstRule" to see if it matched the topic name "MyPrefixABC". Because it does, the topic-specific settings defined in the topic.creation.firstRule.* properties would be used and passed to the connector for validation / overrides and ultimately used to create the topic. However, a topic named "XYZ" would match the defaultRule, and thus the topic-specific settings defined in the topic.creation.defaultRule.* properties would be used and passed to the connector for validation / overrides and ultimately used to create the topic.

This style of configuration properties is very similar to those defined for Single Message Transforms.

Java API

To allow source connector implementations may the ability to validate or override some or all of these topic-specific settings, we will modify the following existing abstract class in the Kafka Connect public API:

  • org.apache.kafka.connect.source.SourceTask

by adding a non-abstract method with the following signature that will by default simply return the input TopicSettings:

Code Block
languagejava
titleChanges to SourceTask.java
public abstract class SourceTask implements Task {
    ...
    /**
     * Determine the topic-specific settings for a new topic to which the {@link SourceRecord} {@link #poll() produced by this task}
     * are to be written. This method is called whenever Connect sees a topic in the {@link SourceRecord}s for the first time and when
     * verifies that the topic does not already exist.
     * <p>
     * By default this method simply returns the supplied initial settings. Implementations can override this method
     * to set the topic-specific settings that should be used when creating the new topic. The broker's own
     * topic-specific configuration settings will be used as defaults for any settings not set via the resulting object.
     * </p>
     *
     * @param settings the initial settings; never null
     * @param currentClusterSize the current number of brokers in the cluster, which can be used as an upper limit on the replication factor; always positive
     * @return the topic-specific settings; may be null if Connect should not attempt to create the topic (and potentially rely upon the broker auto-creating the topic)
     */
    public TopicSettings settingsForNewTopic(TopicSettings settings, int currentClusterSize) {
        return settings;
    }
}

...

whether topics to which the source connector will write should be created by Connect if those topics do not already exist. Additionally, this feature is enabled by default for new Connect workers, though it can be disabled via a new Connect worker configuration property. The proposed topic creation during runtime is relevant only to source connectors and it does not affect sink connectors. It also does not change the topic-specific settings on any existing topics.

Worker Configuration

The proposed feature is enabled by default and its activation is controlled by the single configuration property topic.creation.enable. In order to use this feature, the Connect cluster operator must configure the configurations for all Connect workers in the cluster with topic.creation.enable=true

After an upgrade and by explicitly switching this configuration to true, this feature will only be used for source connectors whose configuration specifies at least the replication factor and number of partitions for at least one group, as described below.


This proposal adds one new Connect worker configuration, which must be set identically on all workers in the Connect cluster:

PropertyTypeDefaultPossible ValuesDescription
topic.creation.enable
booleantruetrue, falseWhether the Connect worker should allow source connector configurations to define topic creation settings. When true, source connectors can use this feature. When false , new source connector configurations that use these topic.creation.*  properties would error, while these configs would be ignored (and a warning reported) for previously-registered source connector configs that used these properties.


Source Connector Configuration

This proposal adds several source connector configuration properties that specify the default replication factor, number of partitions, and other topic-specific settings to be used by Connect to create any topic to which the source connector writes that does not exist at the time the source connector generates its records. None of these properties has defaults, so therefore this feature is enabled for this connector only when the feature is enabled for the Connect cluster and when the source connector configuration specifies at least the replication factor and number of partitions for at least one group. Users may choose to use the default values specified in the Kafka broker by setting the replication factor or the number of partitions to -1 respectively.

Different classes of configuration properties can be defined through the definition of groups. Group definition is following a pattern that resembles what has been used previously to introduce property definition for transformations in Kafka Connect. The config property groups are listed within the property topic.creation.groups. The hierarchy of groups is built on top of a single implicit group that is called default. The default group always exists and does not need to be listed explicitly in topic.creation.groups (if it does, it will be ignored with a warning message). 


PropertyTypeDefaultPossible ValuesDescription
topic.creation.groups
List of String typesemptyThe group default is always defined for topic configurations. The values of this property refer to additional groupsA list of group aliases that will be used to define per group topic configurations for matching topics. If the feature if topic configs is enabled, The group default always exists and matches all topics. 
topic.creation.$alias.include
List of String typesemptyComma separated list of exact topic names or regular expressions. A list of strings that represent regular expressions that may match topic names. This list is used to include topics that match their values and apply this group's specific configuration to the topics that match this inclusion list. $alias applies to any group defined in topic.creation.groups but not the default
topic.creation.$alias.exclude
List of String typesemptyComma separated list of exact topic names or regular expressionsA list of strings that represent regular expressions that may match topic names. This list is used to exclude topics that match their values and refrain from applying this group's specific configuration to the topics that match this exclusion list. $alias applies to any group defined in topic.creation.groups but not the default. Note that exclusion rules have precedent and override any inclusion rules for topics. 
topic.creation.$alias.replication.factor
intn/a>= 1 for a specific valid value, or -1 to use the broker's default valueThe replication factor for new topics created for this connector. This value must not be larger than the number of brokers in the Kafka cluster, or otherwise an error will be thrown when the connector will attempt to create a topic. For the default group this configuration is required. For any other group defined in topic.creation.groups this config is optional and if it's missing it gets the value the default group
topic.creation.$alias.partitions
intn/a>= 1 for a specific valid value, or -1 to use the broker's default valueThe number of partitions new topics created for this connector. For the default group this configuration is required. For any other group defined in topic.creation.groups this config is optional and if it's missing it gets the value the default group
topic.creation.$alias.${kafkaTopicSpecificConfigName}
severalbroker value
Any of the Kafka topic-level configurations for the version of the Kafka broker where the records will be written. The broker's topic-level configuration value will be used if that configuration is not specified for the rule. $alias applies to the default as well as any group defined in topic.creation.groups

Note that these configuration properties will be forwarded to the connector via its initialization methods (e.g. start or reconfigure). Also note that the Kafka topic-level configurations do vary by Kafka version, so source connectors should specify only those topic settings that the Kafka broker knows about. Topic settings rejected by the Kafka broker will result in the connector failing with an exception, to avoid silently ignoring invalid topic creation properties. 

If the connector fails to create a topic for any reason, the task that attempts to create that topic will be stopped and will have to be manually restarted once the issue that resulted in it failure is resolved. Given that topic creation is supported during runtime, such failures are expected to happen any time during the lifetime of a connector and not only during its initial deployment. 

The configuration properties that accept regular expressions accept regex that are defined as Java regex

Topic configs have always at least one group, the default group. This group has as required config properties the replication factor and the number of partitions. Also it has an implicit inclusion list that matches all topics and an implicit exclusion list that is empty. Therefore, configuring these two properties for the default topic config group is not required and will be ignored (with a warning message in the logs). 

In terms of configuration composition between groups and precedence order: 

  • A topic might get its configuration properties from one or more groups
  • Groups are listed in order of preference within the property topic.creation.groups, with the highest preference group first and the lowest priority group last. The default does not have to be explicitly listed and has always the lowest priority. Given that preference is uniquely defined by the sequence of groups in topic.creation.groups, the order in which group configurations occur within a java properties file or a json encoded configuration does not matter. 

These properties have no effect if the feature is disabled on the Connect cluster via topic.creation.enable=false in the cluster's worker configurations.

Configuration Examples

The following are examples that demonstrate the additions in the configuration of source connectors that this KIP is proposing. For simplicity, these examples show only snippets of the connector configuration properties that deal with topic creation and they are shown in Java properties format. The replication factor and number of partitions must be specified at least for the default group in the source connector configuration in order to enable topic creation for the connector.


Example 1: All new topics created by Connect for this connector will have replication factor of 3 and 5 partitions. Since default is the only group of topic creation properties, the config topic.creation.groups can be skipped:

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
...
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5
...


Example 2: By default, new topics created by Connect for this connector will have replication factor of 3 and 5 partitions with the exception of topics that match the inclusion list of the inorder group, which will have 1 partition:

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
...
topic.creation.groups=inorder
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5

topic.creation.inorder.include=status, orders.*
topic.creation.inorder.partitions=1
...


Example 3: By default, new topics created by Connect for this connector will have replication factor of 3 and 5 partitions, while the key_value_topic and another.compacted.topic topics or topics that begin with the prefix configurations will be compacted and have a replication factor of 5 and 1 partition. 

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
...
topic.creation.groups=compacted
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5

topic.creation.compacted.include=key_value_topic, another\\.compacted\\.topic, configurations.*
topic.creation.compacted.replication.factor=5
topic.creation.compacted.partitions=1
topic.creation.compacted.cleanup.policy=compact
...


Example 4: By default, new topics created by Connect for this connector will have replication factor of 3 and 5 partitions, while topics that begin with the prefix configurations will be compacted. Additionally, topics that match match the inclusion list of highly_parallel and don't match its exclusion list will have replication factor of 1 and 1 partition.

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
...
topic.creation.groups=compacted, highly_parallel
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5

topic.creation.highly_parallel.include=hpc.*,parallel.*
topic.creation.highly_parallel.exclude=.*internal, .*metadata, .*config.*
topic.creation.highly_parallel.replication.factor=1
topic.creation.highly_paralle.partitions=1

topic.creation.compacted.include=configurations.*
topic.creation.compacted.cleanup.policy=compact
...

Sink Connector Configuration

This feature does not affect sink connectors or their configuration. Any topic creation properties added to sink connectors will be ignored and will produce a warning in the log.

REST API

The existing Connect REST API includes several resources whose request and response payloads will be affected by this proposal, although the structure of those payloads are already dependent upon the specific type of connector. Applications that use the REST API must already expect such variation, and therefore should not break after the extensions proposed by this KIP are introduced. 

Security

When topic creation is enabled in the Connect worker, the worker may attempt to create topics to which the source connector(s) write that are not known to exist. The Admin API allows the Connect worker to request these topics be created, but will only attempt to create topics that do not already exist. 

Therefore, in order to use this feature, the Kafka principal specified in the worker configuration and used for the source connectors (e.g., producer.*) must have the permission to DESCRIBE and CREATE topics. 

Note that when the Connect worker starts up, it already has the ability to create in the Kafka cluster the internal topics used for storing connector configurations, connector and task statuses, and source connector offsets.

To address cases in which the security settings need to differ for the Connect worker and its ability to create Connect's internal topics and a source connector that needs to be allowed to create new topics using the feature described in this KIP, Connect will give the ability of such specialization via the config overrides that were introduced with KIP-458. For example, if a specific source connector contains the right properties with the prefix admin.override. then this connector will be allowed to create new topics in cases where the Connect worker's settings would not be the appropriate. The following two examples highlight two different use cases. 


Example 5: The connector is deployed with special properties that work both for producing records and creating topics:

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
...
producer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="alice" \
    password="alice-secret";
...


Example 6: The connector is deployed with special properties that work both for producing records and creating topics:

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
...
producer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="alice" \
    password="alice-secret";

admin.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="bob" \
    password="bob-secret";
...

If neither the worker properties or the connector overrides allow for creation of new topics during runtime, if this feature is enabled, an error will be logged and the task will fail. 

If creating topics is not desired for security purposes, this feature should be disabled by setting topic.creation.enable=false. In this case, the previous behavior of assuming the topics already exist or that the broker will auto-create them when needed will be in effect.

Compatibility, Deprecation, and Migration Plan

When users upgrade an existing Kafka Connect installation, they do not need to change any configurations or upgrade any connectors: this feature will be enabled by default. However, as previously-registered source connector configurations would not include any topic.creation.* configuration properties, Kafka Connect will behave exactly as before by assuming the topics exist or else will be auto-created by the broker.

After upgrading an entire Connect cluster, users must alter the configuration of any source connector to enable the creation of new topics, by adding the topic.creation.default.replication.factor and topic.creation.default.partitions properties plus optionally other topic.creation.default.* properties.

This feature will not affect source or sink connector implementations, as the connector API is unchanged and running connectors have no exposure to this feature.

Existing topics will also see no changes after connectors get reconfigured, since - as it's mentioned in the API description - these configuration properties apply only to new topics at the time that these properties are set. 

Finally, this feature uses Kafka's Admin API methods to check for the existence of a topic and to create new topics. If the broker does not support the Admin API methods, an error will be logged and the task will fail if this feature is enabled. If ACLs are used, the Kafka principal used in the Connect worker's producer.* settings is assumed to have privilege to create topics when needed; if not, then appropriate overrides will have to be present in the connector's configuration, and, finally, if that's not in place either, then an error will be logged and the task will fail if this feature is enabled. 

Rejected Alternatives

Several alternative designs were considered but ultimately rejected:

  1. Change only the Java API and have no configuration changes. This very simple approach would have required no changes to a connector configuration yet still given the source connector tremendous flexibility and responsibility in defining the topic-specific settings for each new topics (e.g., using the Admin API). This approach was rejected because it still relies upon the connector implementation to address/handle all variation in topic-specific settings that might be desired between new topics; because connector users have very little control over the topic-specific settings; and because the connector to be modified to take advantage of the new feature and would therefore not work with older connectors.
  2. Change the Java API and use connector configuration properties to define the topic-specific settings used as defaults on all topics. This approach is a bit more flexible than the first alternative in that it allows for connector users to specify some default topic-specific settings in configuration properties. However, this approach was rejected because it offers connector users very little flexibility since it still relies upon the source connector to determine the settings for each of the topics.
  3. Change the Java API and use connector configuration properties to define the topic-specific settings using rules that apply different settings to different topics. This approach was proposed in an earlier version of this KIP, but discussion highlighted that this was optimizing for the exceptional case where source connectors wrote to many topics and those topics needed different replication factors, number of partitions, and/or topic-specific settings. This resulted in a very complex configuration that was thought to be useful in a very small number of cases. It also exposed connectors to a new Java API, but again this would require changes in the source connector implementations and would restrict the Connect versions on which those connectors could be deployed.
  4. Allow the connector to modify the topic-specific settings on an existing topic. This can be complicated, since not all topic settings can be easily changed. It also would introduce potential conflicts between a connector and other admin clients that are attempting to change the topic configuration settings to different values. Such a scenario would be extremely confusing to users, since they might not expect that the source connector is configured to modify the topic settings for an existing topic.
  5. Should topic.creation.default.replication.factor have a default value? A default replication factor of 3 is a sensible default for production, but it would fail on small development clusters. By making this property be explicit, users that are configuring source connectors have to choose a value that makes sense for their Kafka cluster. It also has the advantage that not having a default means that this property is required to enable topic creation on a source connector, and this obviates the need for a separate topic.creation.enable in the connector configuration.

  6. Should the default value for topic.creation.default.replication.factor take into account the current number of brokers? Doing so would be very brittle and subject to transient network partitions and/or failed brokers, since the actual number of brokers might be smaller than the replication factor assumed by the user creating the connector configuration, and the user would have no feedback that a topic was created with fewer replicas than desired.
  7. Should the topic.creation.default.partitions have a default value? The only sensible default is 1, and that's not always very sensible.

  8. Should the Connect worker have a new topic.creation.enable property? This property allows operators of a Connect cluster to prevent source connectors from even using this feature. It would be possible (albeit more complicated) to not have the worker configuration property and to instead expect operators to use ACLs and instead give the Connect worker's producer CREATE topic permissions.

...

Code Block
languagejava
titleNew interface TopicSettings.java
/**
 * Topic-specific settings for a new topic.
 */
public interface TopicSettings {
    /**
     * The log cleanup policy for segments beyond the retention window
     */
    enum CleanupPolicy {
        /**
         * Ensures that Kafka will always retain at least the last known value for each message key within the log of
         * data for a single topic partition.
         */
        COMPACT,
        /**
         * Discard old log data after a fixed period of time or when the log reaches some predetermined size.
         */
        DELETE,
        /**
         * {@link #COMPACT Compact} to retain at least the last known value for each message key <i>and</i>
         * {@link #DELETE delete} messages after a period of time.
         */
        COMPACT_AND_DELETE
    }

    /**
     * Get the name of the topic.
     * @return the name of the topic
     */
    String name();

    /**
     * Get the number of partitions.
     * @return the number of partitions; always positive
     */
    int partitions();

    /**
     * Get the replication factor.
     * @return the replication factor; always positive
     */
    short replicationFactor();

    /**
     * Get the cleanup policy.
     * @return the cleanup policy; may be null if the broker's default setting for new topics is to be used
     */
    CleanupPolicy cleanupPolicy();

    /**
     * Get the maximum time the broker will retain a log before discarding old log segments to free up space.
     * This only applies when {@link #cleanupPolicy()} is set to {@link CleanupPolicy#DELETE}.
     *
     * @param unit the time unit in which the retention is defined; may not be null
     * @return the retention time, or -1 if there is no time limit; may be null if the broker's default setting is to be used
     */
    Long retentionTime(TimeUnit unit);

    /**
     * Get the maximum size a partition (which consists of log segments) can grow to before the broker discards old log segments
     * to free up space. This only applies when {@link #cleanupPolicy()} is set to {@link CleanupPolicy#DELETE}.
     *
     * @return the number of bytes, or -1 if there is no size limit; may be null if the broker's default setting is to be used
     */
    Long retentionBytes();

    /**
     * Get the minimum number of in-sync replicas that must exist for the topic to remain available.
     * @return the minimum number of in-sync replicas; may be null if the broker's default setting for new topics is to be used
     */
    Short minInSyncReplicas();

    /**
     * Get whether the broker is allowed to elect an unclean leader for the topic.
     * @return true if unclean leader election is allowed (potentially leading to data loss), or false otherwise;
     * may be null if the broker's default setting for new topics is to be used
     */
    Boolean uncleanLeaderElection();

    /**
     * Get the value for the named topic-specific configuration.
     * @param name the name of the topic-specific configuration
     * @return the configuration value, or null if the specified configuration has not been set
     */
    String config(String name);

    /**
     * Get the values for the set topic-specific configuration.
     * @return the map of configuration values keyed by configuration name; never null
     */
    Map<String, String> configs();

    /**
     * Specify the desired number of partitions for the topic.
     *
     * @param numPartitions the desired number of partitions; must be positive
     * @return this settings object to allow methods to be chained; never null
     */
    TopicSettings partitions(int numPartitions);

    /**
     * Specify the desired replication factor for the topic.
     *
     * @param replicationFactor the desired replication factor; must be positive
     * @return this settings object to allow methods to be chained; never null
     */
    TopicSettings replicationFactor(short replicationFactor);

    /**
     * Specify the desired cleanup policy for the topic.
     * @param policy the cleanup policy; may not be null
     * @return this settings object to allow methods to be chained; never null
     */
    TopicSettings cleanupPolicy(CleanupPolicy policy);

    /**
     * Specify the maximum time the broker will retain a log before discarding old log segments to free up space.
     * This only applies when {@link #cleanupPolicy()} is set to {@link CleanupPolicy#DELETE}.
     *
     * @param time the retention time, or -1 if there is no time limit
     * @param unit the time unit; may not be null
     * @return this settings object to allow methods to be chained; never null
     */
    TopicSettings retentionTime(long time, TimeUnit unit);

    /**
     * Specify the maximum size a partition (which consists of log segments) can grow to before the broker discards old log segments
     * to free up space. This only applies when {@link #cleanupPolicy()} is set to {@link CleanupPolicy#DELETE}.
     *
     * @param bytes the number of bytes, or -1 if there is no size limit
     * @return this settings object to allow methods to be chained; never null
     */
    TopicSettings retentionBytes(long bytes);

    /**
     * Specify the minimum number of in-sync replicas required for this topic.
     *
     * @param minInSyncReplicas the minimum number of in-sync replicas allowed for the topic; must be positive
     * @return this settings object to allow methods to be chained; never null
     */
    TopicSettings minInSyncReplicas(short minInSyncReplicas);

    /**
     * Specify whether the broker is allowed to elect a leader that was not an in-sync replica when no ISRs
     * are available.
     *
     * @param allow true if unclean leaders can be elected, or false if they are not allowed
     * @return this settings object to allow methods to be chained; never null
     */
    TopicSettings uncleanLeaderElection(boolean allow);

    /**
     * Specify the name and value of the topic-specific setting for the topic, overwriting any corresponding property that was
     * previously set.
     *
     * @param configName the name of the topic-specific configuration property
     *                  @param configValue the value for the topic-specific configuration property
     * @return this settings object to allow methods to be chained; never null
     */
    TopicSettings with(String configName, String configValue);

    /**
     * Specify the name and value of one or more topic-specific settings for the topic, overwriting any corresponding property that was
     * previously set.
     *
     * @param configs the desired topic configuration properties, or null if all existing properties should be cleared
     * @return this settings object to allow methods to be chained; never null
     */
    TopicSettings with(Map<String, String> configs);

    /**
     * Clear all topic-specific settings from this definition. Unless other topic-specific settings are defined after this method is
     * called, the broker's default topic settings will be used when the topic is created.
     *
     * @return this settings object to allow methods to be chained; never null
     */
    TopicSettings clear();
}

Kafka Connect will provide an implementation of TopicSettings. When Kafka Connect sees a topic in the SourceRecords that the worker has not yet seen since starting up, it will check whether that topic exists using the Admin API of the broker. If the topic does not exist, it will find the first applicable topic creation rule, instantiate a TopicSettings object with that rule's topic-specific settings, pass it to the SourceTask's settingsForNewTopic method, and use the resulting TopicSettings instance and its topic-specific settings when the framework attempts to create the new topic. Note that Kafka Connect will altogether skip creating the new topic if no topic creation rule applies to the topic or if the settingsForNewTopic method returns null. Kafka Connect will log these activities at the appropriate level. 

Kafka Connect will do nothing if the broker does not support the Admin API methods to check for the existence of a topic and to create a new topic. This is equivalent to relying upon auto-topic creation.

Compatibility, Deprecation, and Migration Plan

When users upgrade an existing Kafka Connect installation, they do not need to change any configurations or upgrade any connectors: this feature will not be enabled and Kafka Connect will behave exactly as before by relying upon the broker to auto-create any new topics or upon users to manually create the topics before they are used. There are no plans to remove this legacy behavior.

After upgrading, they must alter the configuration of any source connector to enable the creation of new topics, by defining the topic.creation property with the names of one or more rules and the corresponding topic.creation.${ruleName}.* properties for each of the rules.

The developers of source connectors do not need to update or rebuild their connectors, since this proposal's Java API changes are binary compatible. However, if they want their source connector to validate and/or override any of the topic-specific settings, the developers must override the SourceTask's settingsForNewTopic method and release their connector. For example, a source connector whose topics should always be compacted can always call settings.cleanupPolicy(COMPACT) to effectively define the connector's own constraints. Or, a source connector may want to simply log warnings when some topic-specific settings are potentially incorrect (e.g., the number of available brokers or the replication factor does not provide enough replication).

This feature does not affect sink connectors.

This feature does not change the topic-specific settings on any existing topics.

Rejected Alternatives

Several alternative designs were considered but ultimately rejected:

  1. Change only the Java API and have no configuration changes. This very simple approach would have required no changes to a connector configuration yet still given the source connector tremendous flexibility and responsibility in defining the topic-specific settings for each new topics. This approach was rejected because it still relies upon the connector implementation to address/handle all variation in topic-specific settings that might be desired between new topics; because connector users have very little control over the topic-specific settings; and because the connector to be modified to take advantage of the new feature and would therefore not work with older connectors.
  2. Change the Java API and use connector configuration properties to define the topic-specific settings used as defaults on all topics. This approach is a bit more flexible than the first alternative in that it allows for connector users to specify some default topic-specific settings in configuration properties. However, this approach was rejected because it offers connector users very little flexibility since it still relies upon the source connector to determine the settings for each of the topics.
  3. Change only the connector configuration properties. This approach is identical to this proposal except there are no changes to the Java API. This gives users a lot of flexibility and control over the topic-specific settings for each of the topics created by the connector, but it does not offer any way for the source connector to validate any of these settings. For example, a connector that really requires all new topics be compacted could not enforce this.
  4. Allow the connector or its tasks to explicitly create topics. This adds complexity that has not yet been requested.
  5. Allow the connector to modify the topic-specific settings on an existing topic. This can be complicated, since not all topic settings can be easily changed. It also would introduce potential conflicts between a connector and other admin clients that are attempting to change the topic configuration settings to different values. Such a scenario would be extremely confusing to users, since they might not expect that the source connector is configured to modify the topic settings for an existing topic.

...