You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state:  Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As of 0.11.0.0, Kafka Connect can automatically create its internal topics using the new AdminClient (see KIP-154), but it still relies upon the broker to auto-create new topics to which source connector records are written. This is error prone, as it's easy for the topics to be created with an inappropriate cleanup policy, replication factor, and/or number of partitions. Some users would rather not configure their brokers with auto.create.topics.enable=true, and in these cases users must manually pre-create the necessary topics. That, of course, can be quite challenging for some source connectors that choose topics dynamically based upon the source and that result in large numbers of topics.

Kafka Connect should instead create the topics automatically and allow the source connector to specify/customize the topic-specific settings for each of the new topics. 

Public Interfaces

We will modify the following existing abstract class in the Kafka Connect public API:

  • org.apache.kafka.connect.source.SourceTask

by adding a non-abstract method with the following signature:

  • public TopicSettings settingsForNewTopic(TopicSettings settings);

This new method will be called by the Kafka Connect framework when it determines that a new topic may need to be created. It allows the Kafka Connect framework to provide default topic-specific settings to the source connector and allows the source connector to override any or all of these settings. The method's default implementation will be to return null, signaling Kafka Connect should use the current behavior of letting the broker auto-create the topics. This will also be the behavior for source connectors compiled against an earlier version of Kafka Connect that did not have this feature but deployed using a later version of Kafka Connect that does have this feature. However, if the developers of a source connector choose to override this method and return a TopicSettings, then Kafka Connect will use those settings when it attempts to use the AdminClient to explicitly create the new topic.

We will also add the following interface to the Kafka Connect public API:

  • org.apache.kafka.connect.storage.TopicSettings

This interface will contain methods that allow the settingsForNewTopic method implementations to easily access and modify the initial topic-specific settings supplied as a parameter to the method. The initial topic-specific settings for a given topic will be determined by new connector-level configurations.

 

Proposed Changes

This section outlines in more detail the changes to the Java API, connector configuration, and behaviors.

Java API

This proposal adds a single non-abstract method to the existing org.apache.kafka.connect.source.SourceTask class:

public abstract class SourceTask implements Task {
...

/**
* Determine the topic-specific settings for a new topic to which the {@link SourceRecord} {@link #poll() produced by this task}
* are to be written. This method is called whenever a new topic is seen in the {@link SourceRecord}s, and sh
* <p>
 * By default this method simply returns the supplied initial settings. Implementations can override this method
* to set the topic-specific settings that should be used when creating the new topic. The broker's own
* topic-specific configuration settings will be used as defaults for any settings not set via the resulting object.
* </p>
 *
* @param settings the initial settings; never null
* @return the topic-specific settings; may be null if the broker should auto-create the topic
*/
  public TopicSettings settingsForNewTopic(TopicSettings settings) { ... }
}

and adds a new org.apache.kafka.connect.storage.TopicSettings interface: 

public interface TopicSettings {
 /**
* The log cleanup policy for segments beyond the retention window
*/
enum CleanupPolicy {
/**
* Ensures that Kafka will always retain at least the last known value for each message key within the log of data for a single topic partition.
*/
 COMPACT,
 /**
* Discard old log data after a fixed period of time or when the log reaches some predetermined size.
*/
  DELETE,
  /**
* {@link #COMPACT Compact} to retain at least the last known value for each message key <i>and</i>
 * {@link #DELETE delete} messages after a period of time.
*/
 COMPACT_AND_DELETE;
}

/**
* Get the name of the topic.
* @return the name of the topic
*/
String name();

/**
* Get the number of partitions.
* @return the number of partitions; always positive
*/
int partitions();

/**
* Get the replication factor.
* @return the replication factor; always positive
*/
short replicationFactor();

/**
* Get the cleanup policy.
* @return the cleanup policy; may be null if the broker's default setting for new topics is to be used
*/
CleanupPolicy cleanupPolicy();

/**
* Get the minimum number of in-sync replicas that must exist for the topic to remain available.
* @return the minimum number of in-sync replicas; may be null if the broker's default setting for new topics is to be used
*/
Short minInSyncReplicas();

/**
* Get whether the broker is allowed to elect an unclean leader for the topic.
* @return true if unclean leader election is allowed (potentially leading to data loss), or false otherwise;
* may be null if the broker's default setting for new topics is to be used
*/
Boolean uncleanLeaderElection();

/**
* Get the value for the named topic-specific configuration.
* @param name the name of the topic-specific configuration
* @return the configuration value, or null if the specified configuration has not been set
*/
Object config(String name);

/**
* Get the values for the set topic-specific configuration.
* @return the map of configuration values keyed by configuration name; never null
*/
Map<String, Object> config();

/**
* Specify the desired number of partitions for the topic.
*
* @param numPartitions the desired number of partitions; must be positive
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings partitions(int numPartitions);

/**
* Specify the desired replication factor for the topic.
*
* @param replicationFactor the desired replication factor; must be positive
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings replicationFactor(short replicationFactor);

/**
* Specify the desired cleanup policy for the topic.
* @param policy the cleanup policy; may not be null
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings cleanupPolicy(CleanupPolicy policy);

/**
* Specify the minimum number of in-sync replicas required for this topic.
*
* @param minInSyncReplicas the minimum number of in-sync replicas allowed for the topic; must be positive
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings minInSyncReplicas(Short minInSyncReplicas);

/**
* Specify whether the broker is allowed to elect a leader that was not an in-sync replica when no ISRs
* are available.
*
* @param allow true if unclean leaders can be elected, or false if they are not allowed
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings uncleanLeaderElection(Boolean allow);

/**
* Specify the configuration properties for the topic, overwriting any previously-set properties.
*
* @param configName the name of the topic-specific configuration property
* @param configValue the value for the topic-specific configuration property
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings config(String configName, Object configValue);

/**
* Specify the configuration properties for the topic, overwriting all previously-set properties,
* including {@link #cleanupPolicy(CleanupPolicy)}, {@link #minInSyncReplicas(short)}, and {@link #uncleanLeaderElection(boolean)}.
*
* @param configs the desired topic configuration properties, or null if all existing properties should be cleared
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings config(Map<String, Object> configs);

}

 

Configuration

A transformation chain will be configured at the connector-level. The order of transformations is defined by the transforms config which represents a list of aliases. An alias in transforms implies that some additional keys are configurable:
transforms.$alias.type – fully qualified class name for the transformation
transforms.$alias.* – all other keys as defined in Transformation.config() are embedded with this prefix

Example:

 

 

transforms=tsRouter,insertKafkaCoordinates
 
transforms.tsRouter.type=org.apache.kafka.connect.transforms.TimestampRouter
transforms.tsRouter.topic.format=${topic}-${timestamp}
transforms.tsRouter.timestamp.format=yyyyMMdd
  
transforms.insertKafkaCoordinates.type=org.apache.kafka.connect.transforms.InsertInValue
transforms.insertKafkaCoordinates.topic=kafka_topic
transforms.insertKafkaCoordinates.partition=kafka_partition
transforms.insertKafkaCoordinates.offset=kafka_offset

 

Those initial settings will be chosen to create topics with:

  • a single partition
  • a replication factor of 3
  • cleanup.policy=compact
  • min.insync.replicas=2
  • unclean.leader.election.enable=false

 

 

does not implement he old behavior of relying upon the broker to auto-create the necessary topics will be 

If a source connector does not override this method

Source connectors need not 

or let the broker auto-create it.

 

The additional method will have an  and will have the following signature:

  •  

 

. the topic-specific settings prior to Kafka Connect attempting using the AdminClient to 

We will add a method to the following existing abstract class in the Kafka Connect public API:

  • org.apache.kafka.connect.source.SourceTask

The method will have the following signature and be called by the Kafka Connect framework when it needs to create a new topic

org.apache.kafka.connect.source

We will add a new interface

 

This proposal adds to the existing SourceTask abstract class a new non-abstract method 

public abstract class SourceTask implements Task {
...
    /**
* Determine the topic-specific settings for a new topic to which the {@link SourceRecord} {@link #poll() produced by this task}
* are to be written. This method is called whenever a new topic is seen in the {@link SourceRecord}s, and sh
* <p>
* By default this method simply returns the supplied initial settings. Implementations can override this method
* to set the topic-specific settings that should be used when creating the new topic. The broker's own
* topic-specific configuration settings will be used as defaults for any settings not set via the resulting object.
* </p>
*
* @param settings the initial settings; never null
* @return the topic-specific settings; may be null if the broker should auto-create the topic
*/
public TopicSettings settingsForNewTopic(TopicSettings settings) {
return null;
}
}
 
The TopicSettings interface is new and represents the topic-specific configuration settings:
/**
* Topic-specific settings for a new topic.
*/
public interface TopicSettings {
/**
* The log cleanup policy for segments beyond the retention window
*/
enum CleanupPolicy {
/**
* Ensures that Kafka will always retain at least the last known value for each message key within the log of data for a single topic partition.
*/
COMPACT,
/**
* Discard old log data after a fixed period of time or when the log reaches some predetermined size.
*/
DELETE,
/**
* {@link #COMPACT Compact} to retain at least the last known value for each message key <i>and</i>
* {@link #DELETE delete} messages after a period of time.
*/
COMPACT_AND_DELETE;
}
    /**
* Get the name of the topic.
* @return the name of the topic
*/
String name();
    /**
* Get the number of partitions.
* @return the number of partitions; always positive
*/
int partitions();
    /**
* Get the replication factor.
* @return the replication factor; always positive
*/
short replicationFactor();
    /**
* Get the cleanup policy.
* @return the cleanup policy; may be null if the broker's default setting for new topics is to be used
*/
CleanupPolicy cleanupPolicy();
    /**
* Get the minimum number of in-sync replicas that must exist for the topic to remain available.
* @return the minimum number of in-sync replicas; may be null if the broker's default setting for new topics is to be used
*/
Short minInSyncReplicas();
    /**
* Get whether the broker is allowed to elect an unclean leader for the topic.
* @return true if unclean leader election is allowed (potentially leading to data loss), or false otherwise;
* may be null if the broker's default setting for new topics is to be used
*/
Boolean uncleanLeaderElection();
    /**
* Get the value for the named topic-specific configuration.
* @param name the name of the topic-specific configuration
* @return the configuration value, or null if the specified configuration has not been set
*/
Object config(String name);
    /**
* Get the values for the set topic-specific configuration.
* @return the map of configuration values keyed by configuration name; never null
*/
Map<String, Object> config();
    /**
* Specify the desired number of partitions for the topic.
*
* @param numPartitions the desired number of partitions; must be positive
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings partitions(int numPartitions);
    /**
* Specify the desired replication factor for the topic.
*
* @param replicationFactor the desired replication factor; must be positive
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings replicationFactor(short replicationFactor);
    /**
* Specify the desired cleanup policy for the topic.
* @param policy the cleanup policy; may not be null
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings cleanupPolicy(CleanupPolicy policy);
    /**
* Specify the minimum number of in-sync replicas required for this topic.
*
* @param minInSyncReplicas the minimum number of in-sync replicas allowed for the topic; must be positive
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings minInSyncReplicas(short minInSyncReplicas);
    /**
* Specify whether the broker is allowed to elect a leader that was not an in-sync replica when no ISRs
* are available.
*
* @param allow true if unclean leaders can be elected, or false if they are not allowed
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings uncleanLeaderElection(boolean allow);
    /**
* Specify the configuration properties for the topic, overwriting any previously-set properties.
*
* @param configName the name of the topic-specific configuration property
* @param configValue the value for the topic-specific configuration property
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings config(String configName, Object configValue);
    /**
* Specify the configuration properties for the topic, overwriting all previously-set properties,
* including {@link #cleanupPolicy(CleanupPolicy)}, {@link #minInSyncReplicas(short)}, and {@link #uncleanLeaderElection(boolean)}.
*
* @param configs the desired topic configuration properties, or null if all existing properties should be cleared
* @return this settings object to allow methods to be chained; never null
*/
TopicSettings config(Map<String, Object> configs);
}

 



Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.


  • No labels