Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

public abstract class SourceTask implements Task {
...
    /**
* Determine the topic-specific settings for a new topic to which the {@link SourceRecord} {@link #poll() produced by this task}
* are to be written. This method is called whenever a new topic is seen in the {@link SourceRecord}s, and sh
* <p>
* By default this method simply returns the supplied initial settings. Implementations can override this method
* to set the topic-specific settings that should be used when creating the new topic. The broker's own
* topic-specific configuration settings will be used as defaults for any settings not set via the resulting object.
* </p>
*
* @param settings the initial settings; never null
* @param currentClusterSize the current number of brokers in the cluster, which can be used as an upper limit on the replication factor; always positive
* @return the topic-specific settings; may be null if the broker should auto-create the topic
*/
public TopicSettings settingsForNewTopic(TopicSettings settings, int currentClusterSize) {
return settings;
}
}

 

We will also add the new interface org.apache.kafka.connect.storage.TopicSettings that has methods for SourceTask.settingsForNewTopic implementation to easily access and update the topic-specific settings:

...

The developers of source connectors do not need to update or rebuild their connectors, since this proposal's Java API changes are binary compatible. However, if they want their source connector to validate and/or override any of the topic-specific settings, the developers must override the SourceTask's settingsForNewTopic method and release their connector. For example, a source connector whose topics should always be compacted can always call settings.cleanupPolicy(COMPACT) to effectively define the connector's own constraints. Or, a source connector may want to simply log warnings when some topic-specific settings are potentially incorrect (e.g., the number of available brokers or the replication factor does not provide enough replication).

This feature does not affect sink connectors and does not change the topic-specific settings on any existing topics.

...