Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

by adding a non-abstract method with the following signature that will by default simply return the input TopicSettings:

Code Block
languagejava
titleChanges to SourceTask.java
public abstract class SourceTask implements Task {

...


    ...

...


    /**

...


     * Determine the topic-specific settings for a new topic to which the {@link SourceRecord} {@link #poll() produced by this task}

...


     * are to be written. This method is called whenever Connect sees a

...

 topic 

...

in the {@link SourceRecord}s

...

 for the first time and when
     * verifies that the topic does not already exist.
     * <p>

...


     * By default this method simply returns the supplied initial settings. Implementations can override this method

...


     * to set the topic-specific settings that should be used when creating the new topic. The broker's own

...


     * topic-specific configuration settings will be used as defaults for any settings not set via the resulting object.

...


     * </p>

...


     *

...


     * @param settings the initial settings; never null

...


     * @param currentClusterSize the current number of brokers in the cluster, which can be used as an upper limit on the replication factor; always positive

...


     * @return the topic-specific settings; may be null if Connect should not attempt to create the topic (and potentially rely upon the broker 

...

auto-

...

creating the topic

...

)
     */

...


    public TopicSettings settingsForNewTopic(TopicSettings settings, int currentClusterSize) {

...


        return settings;

...


    }

...


}

...



We will also 
add the new interface org.apache.kafka.connect.storage.TopicSettings that has methods for SourceTask.settingsForNewTopic implementation to easily access and update the topic-specific settings:

...

Code Block
languagejava
titleNew interface TopicSettings.java
/**

...


 * Topic-specific settings for a new topic.

...


 */

...


public interface TopicSettings {

...


    /**

...


     * The log cleanup policy for segments beyond the retention window

...


     */

...


    enum CleanupPolicy {

...


        /**

...


         * Ensures that Kafka will always retain at least the last known value for each message key within the log of
         * data for a single topic partition.

...


         */

...


        COMPACT,

...


        /**

...


         * Discard old log data after a fixed period of time or when the log reaches some predetermined size.

...


         */

...


        DELETE,

...


        /**

...


         * {@link #COMPACT Compact} to retain at least the last known value for each message key <i>and</i>

...


         * {@link #DELETE delete} messages after a period of time.

...


         */

...


        COMPACT_AND_DELETE

...


    }

...



    /**

...


     * Get the name of the topic.

...


     * @return the name of the topic

...


     */

...


    String name();

...



    /**

...


     * Get the number of partitions.

...


     * @return the number of partitions; always positive

...


     */

...


    int partitions();

...



    /**

...


     * Get the replication factor.

...


     * @return the replication factor; always positive

...


     */

...


    short replicationFactor();

...



    /**

...


     * Get the cleanup policy.

...


     * @return the cleanup policy; may be null if the broker's default setting for new topics is to be used

...


     */

...


    CleanupPolicy cleanupPolicy();

...



    /**

...


     * Get the 

...

maximum time the broker will retain a log before discarding old log segments to free up space.
     * 

...

This 

...

only 

...

applies 

...

when 

...

{@link #cleanupPolicy()} is set to {@link CleanupPolicy#DELETE}.
     *
     * @param unit the time unit in which the retention is defined; may not be null
     * 

...

@return 

...

the 

...

retention 

...

time, 

...

or 

...

-1 

...

if 

...

there 

...

is 

...

no 

...

time 

...

limit; may be null if the broker's default setting is to be used
     */
    Long retentionTime(TimeUnit unit);

    /**
     * Get the maximum size a partition (which consists of log segments) can grow to before the broker discards old log segments
     * 

...

to 

...

free 

...

up 

...

space. 

...

This 

...

only 

...

applies when {@link #cleanupPolicy()} is set to {@link CleanupPolicy#DELETE}.
     *
     

...

* @return the 

...

number of 

...

bytes, or 

...

-1 if 

...

there 

...

is 

...

no 

...

size 

...

limit; 

...

may be null if the broker's default setting is to be used
     */

...


    

...

Long 

...

retentionBytes(

...

);

...



    /**

...


     * Get the

...

 minimum number of in-sync replicas that must exist for the 

...

topic to remain available.
     * @return the 

...

minimum number of

...

 in-sync replicas; may be null if the broker's default setting for new topics is to be used
     */

...


    

...

Short minInSyncReplicas();

...



    /**

...


     * 

...

Get whether the 

...

broker 

...

is 

...

allowed 

...

to 

...

elect an unclean leader for the topic.

...


     *

...

 @return true if unclean 

...

leader election 

...

is 

...

allowed 

...

(potentially 

...

leading 

...

to 

...

data loss), or false otherwise;
     * may 

...

be 

...

null 

...

if 

...

the 

...

broker's 

...

default 

...

setting 

...

for 

...

new 

...

topics 

...

is to be used
     */

...


    

...

Boolean 

...

uncleanLeaderElection(

...

);

...



    /**

...


     * 

...

Get the 

...

value 

...

for the named topic

...

-specific configuration.
     * @param 

...

name the 

...

name 

...

of 

...

the topic-specific configuration
     * @return 

...

the 

...

configuration 

...

value, or null if the specified configuration has not been set
     */

...


    

...

String 

...

config(

...

String 

...

name);

...



    /**

...


     * 

...

Get the 

...

values for the set topic-specific configuration.

...


     * 

...

@return 

...

the 

...

map 

...

of 

...

configuration 

...

values 

...

keyed 

...

by 

...

configuration name; never null

...


     */

...


    

...

Map<String, String> configs();

    /**

...


     * Specify the 

...

desired number of 

...

partitions for 

...

the topic.

...


     *

...


     * @param 

...

numPartitions the 

...

desired number of 

...

partitions; must be positive
     * @return this settings object to allow methods to be chained; never null

...


     */

...


    TopicSettings 

...

partitions(

...

int 

...

numPartitions);

...



    /**

...


     * Specify 

...

the 

...

desired 

...

replication 

...

factor 

...

for 

...

the 

...

topic.
 

...

 

...

 

...

 

...

 

...

*
 

...

 

...

 

...

 

...

 

...

* @param replicationFactor the desired 

...

replication 

...

factor; must be positive
     

...

* @return this settings object to allow methods to be chained; never null

...


     */

...


    TopicSettings 

...

replicationFactor(

...

short 

...

replicationFactor);

...



    /**

...


     * Specify the desired 

...

cleanup 

...

policy for the topic

...

.
     * @param 

...

policy the 

...

cleanup 

...

policy; 

...

may 

...

not 

...

be null
     * @return this settings object to allow methods to be chained; never null

...


     */

...


    TopicSettings 

...

cleanupPolicy(

...

CleanupPolicy policy);

...



    /**

...


     * Specify the maximum time the

...

 broker will retain a log before discarding old log segments to free up space.
     *

...

 This only applies when {@link #cleanupPolicy(

...

)}

...

 is set to {@link 

...

CleanupPolicy#DELETE}.

...


     *

...


     * @param 

...

time the 

...

retention time, or 

...

-1 if there is no time limit
     * @param unit the time unit; may not be null
     * @return this settings object to allow methods to be chained; never null

...


     */

...


    TopicSettings 

...

retentionTime(

...

long time, 

...

TimeUnit unit);

    /**
     * Specify the maximum size a partition (which consists of log segments) can grow to before the broker discards old log segments
     * to free up space. This only applies when {@link #cleanupPolicy()} is set to {@link CleanupPolicy#DELETE}.
     *
     * @param bytes the number of bytes, or -1 if there is no size limit
     * @return this settings object to allow methods to be chained; never null
     */
    TopicSettings retentionBytes(long bytes);

    /**
     * Specify the minimum number of in-sync replicas required for this topic.
     *
     * @param minInSyncReplicas the minimum number of in-sync replicas allowed for the topic; must be positive
     * @return this settings object to allow methods to be chained; never null
     */
    TopicSettings minInSyncReplicas(short minInSyncReplicas);

    /**
     * Specify whether the broker is allowed to elect a leader that was not an in-sync replica when no ISRs
     * are available.
     *
     * @param allow true if unclean leaders can be elected, or false if they are not allowed
     * @return this settings object to allow methods to be chained; never null
     */
    TopicSettings uncleanLeaderElection(boolean allow);

    /**
     * Specify the name and value of the topic-specific setting for the topic, overwriting any corresponding property that was
     * previously set.
     *
     * @param configName the name of the topic-specific configuration property
     *                  @param configValue the value for the topic-specific configuration property
     * @return this settings object to allow methods to be chained; never null
     */
    TopicSettings with(String configName, String configValue);

    /**
     * Specify the name and value of one or more topic-specific settings for the topic, overwriting any corresponding property that was
     * previously set.
     *
     * @param configs the desired topic configuration properties, or null if all existing properties should be cleared
     * @return this settings object to allow methods to be chained; never null
     */
    TopicSettings with(Map<String, String> configs);

    /**
     * Clear all topic-specific settings from this definition. Unless other topic-specific settings are defined after this method is
     * called, the broker's default topic settings will be used when the topic is created.
     *
     * @return this settings object to allow methods to be chained; never null
     */
    TopicSettings clear();
}


Kafka Connect will provide an implementation of TopicSettings. When Kafka Connect sees a topic in the SourceRecords that the worker has not yet seen since starting up, it will check whether that topic exists using the Admin API of the broker. If the topic does not exist, it will find the first applicable topic creation rule, instantiate a

configs);
}

 

Kafka Connect will provide an implementation of TopicSettings. When Kafka Connect determines it should attempt to create a new topic, it will find the first applicable topic creation rule, instantiate a TopicSettings object with that rule's topic-specific settings, pass it to the SourceTask's settingsForNewTopic method, and use the resulting TopicSettings instance and its topic-specific settings when the framework attempts to create the new topic. Note that Kafka Connect will altogether skip creating the new topic if no topic creation rule applies to the topic or if the settingsForNewTopic method returns null. Kafka Connect will log these activities at the appropriate level. 

Kafka Connect will do nothing if the broker does not support the Admin API methods to check for the existence of a topic and to create a new topic. This is equivalent to relying upon auto-topic creation.

Compatibility, Deprecation, and Migration Plan

...

This feature does not affect sink connectors.

This feature and does not change the topic-specific settings on any existing topics.

...

  1. Change only the Java API and have no configuration changes. This very simple approach would have required no changes to a connector configuration yet still given the source connector tremendous flexibility and responsibility in defining the topic-specific settings for each new topics. This approach was rejected because it still relies upon the connector implementation to address/handle all variation in topic-specific settings that might be desired between new topics; because connector users have very little control over the topic-specific settings; and because the connector to be modified to take advantage of the new feature and would therefore not work with older connectors.
  2. Change the Java API and use connector configuration properties to define the topic-specific settings used as defaults on all topics. This approach is a bit more flexible than the first alternative in that it allows for connector users to specify some default topic-specific settings in configuration properties. However, this approach was rejected because it offers connector users very little flexibility since it still relies upon the source connector to determine the settings for each of the topics.
  3. Change only the connector configuration properties. This approach is identical to this proposal except there are no changes to the Java API. This gives users a lot of flexibility and control over the topic-specific settings for each of the topics created by the connector, but it does not offer any way for the source connector to validate any of these settings. For example, a connector that really requires all new topics be compacted could not enforce this.
  4. offers connector users very little flexibility since it still relies upon the source connector to determine the settings for each of the topics.
  5. Change only the connector configuration properties. This approach is identical to this proposal except there are no changes to the Java API. This gives users a lot of flexibility and control over the topic-specific settings for each of the topics created by the connector, but it does not offer any way for the source connector to validate any of these settings. For example, a connector that really requires all new topics be compacted could not enforce this.
  6. Allow the connector or its tasks to explicitly create topics. This adds complexity that has not yet been requested.
  7. Allow the connector to modify the topic-specific settings on an existing topic. This can be complicated, since not all topic settings can be easily changed. It also would introduce potential conflicts between a connector and other admin clients that are attempting to change the topic configuration settings to different values. Such a scenario would be extremely confusing to users, since they might not expect that the source connector is configured to modify the topic settings for an existing topicAllow the connector or its tasks to explicitly create topics. This adds complexity that has not yet been requested.