Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: "Under Discussion" (Draft) Accepted

Discussion thread: here 

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Currently Mirrormaker uses deprecated alterConfigs API AlterConfigs API when syncing topic configurations for broker compatibility to 0.11.0. In addition, Mirromaker has a configurable ConfigPropertyFilter  class to select the topic configuration properties to replicate and uses org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter by default. The default class excludes certain topic configurations from replication such as follower.replication.throttled.replicas and leader.replication.throttled.replicas. 

However, the deprecated alterConfigs() AlterConfigs API replaces any existing configuration with the new configuration. Due to this, additional configurations set on a remote topic get cleared up when MirrorMaker syncs topic configurations whether they are excluded from replication or not. For example this prevents running Cruise Control on the target cluster as it may set follower.replication.throttled.replicas and leader.replication.throttled.replicas.

MirrorMaker should not clear topic configurations that are excluded from replication.

In addition, source and target topic configurations could have different default values. Currently, Mirrormaker overwrites configurations on target topic even if the configurations on the source topic have not been changed but happened to have different default values. However, user may only want to sync configurations that they have changed but leave the ones with default values. 

Public Interfaces

Public Interfaces

A new method will be added to ConfigPropertyFilter.

Code Block
languagejava
public interface ConfigPropertyFilter extends Configurable, AutoCloseable {
  boolean shouldReplicateSourceDefault(String prop);
}      

Proposed Changes

This KIP proposes to use the IncrementalAlterConfigs This KIP proposes to migrate to incrementalAlterConfigs API for syncing topic configurations in MirrorMaker. The incrementalAlterConfigs API IncrementalAlterConfigs API has been around for several years since it was introduced in Kafka 2.3.0 and addresses the shortcoming of the deprecated alterConfigs  AlterConfigs API. In order not to break the compatibility, we will introduce a new setting will be introduced to MirrorMaker to give that gives the option to enable incrementalAlterConfigs or disable IncrementalAlterConfigs API for syncing topic configurations. This new setting is expected to serve as a temporary measure until the next major release when the API the API is always used. 

It also proposes to add an option to use the target cluster default for the target topic configurations instead of the source cluster default. For example, if the user wants to use the target cluster default, the  ConfigPropertyFilter would be used to check if the configuration on the source topic has been dynamically set and decide which cluster default should be used. 

Proposed Changes

1) Add a new method updateTopicConfigsUsingIncrementalAlterConfigs() to MirrorSourceConnector.java which takes a list topic configurations as a parameter and calls incrementalAlterConfigs() to the target cluster. 

2) Extend ConfigPropertyFilter  class to have a method called shouldReplicateSourceDefault which would always return true by default. User would be able to configure this method to return false for a configuration that is DEFAULT_CONFIG.

3) Update MirrorSourceConnector.java class to sync a configuration only if shouldReplicateSourceDefault returns true. 

  • A

...

  • new configuration setting to MirrorMaker:
    • Name: use.incremental.alter.configs
    • Description: Deprecated. Whether to automatically use

...

    • the IncrementalAlterConfigs API for syncing topic configurations. This configuration will be removed in Kafka 4.0 and the

...

    • IncrementalAlterConfigs API will be used by default. Users should make sure that the target cluster is running

...

    • Kafka 2.3.0 or later. 
    • Type:

...

    • String
    • Default:

...

    • requested
    • When set to

...

    • "requested", MirrorMaker will use the IncrementalAlterConfigs for syncing topic configurations. If any request receives an error from an incompatible broker, it will fallback to the deprecated AlterConfigs API for the subsequent calls and log a WARN message e.g. "The target cluster <ALIAS> is not compatible with IncrementalAlterConfigs API. Therefore using deprecated AlterConfigs API for syncing topic configurations".
    • When explicitly set to "never", MirrorMaker will use the deprecated AlterConfigs API for syncing topic configurations.
    • When explicitly set to

...

    • "required", MirrorMaker will use

...

    • the IncrementalAlterConfigs API for syncing topic configurations. If it receives an error from an incompatible broker, the MirrorMaker will report this to the user and fail the connector.

...

3) Update syncTopicConfigs()  in MirrorSourceConnector.java to call the updateTopicConfigsUsingIncrementalAlterConfigs() instead of updateTopicConfigs() if enable.incremental.alter.configs is set to true.

...

Currently if users delete a topic configuration in the source cluster, it gets also deleted from the target cluster due to the replace all behaviour of AlterConfigs API. However with IncrementalAlterConfigs, the topic configuration in the target cluster would potentially be left unchanged. Therefore the default topic configuration on the source cluster will explicitly be cleared from the target cluster via Delete operation when using IncrementalAlterConfigs API

The KIP also proposes adding the ability for users to choose between target cluster's default or source cluster's default when using IncrementalAlterConfigs for syncing topic configurations. For instance, users would be able to configure ConfigPropertyFilter to choose whether or not to replicate the default configurations of the source topic to the target cluster. This gives users a bit more control in how they want to replicate their topic configurations. 

  • Extend ConfigPropertyFilter class to have a new method called shouldReplicateSourceDefault which will return false by default. If set to false while using IncrementalAlterConfigs API, the default configurations in the source cluster will explicitly be cleared from the target cluster using the Delete operation. User will be able to configure this method to return true in order to include source cluster's default configurations in the replication. 
  • Extend DefaultConfigPropertyFilter with a new property "use.defaults.from" that accepts "source" or "target" values. This will allow users to control which cluster's defaults get applied on the target. The user can also use the existing "config.properties.exclude" property to exclude configuration from the replication regardless of which cluster's default is chosen. 

Compatibility, Deprecation, and Migration Plan

By default, the new setting will be set to "requested"  which means we'll use IncrementalAlterConfigs if possible, and fallback to AlterConfigs API if incompatible. 

This means there will be a behaviour change while syncing topic configurations in MM2 after this KIP. For example, if changing retentions.ms of a topic in the source cluster, filtered configurations such as follower.replication.throttled.replicas of the topic currently get cleared in the target cluster. After this KIP, the current value of follower.replication.throttled.replicas would be kept in the target cluster unless the target broker's version is older than Kafka 2.3.0. 

false so it does not change the current behaviour. This setting will be marked as deprecated immediately. 

...

  • use.incremental.alter.configs will be removed removed.
  • MirrorMaker will use the incrementalAlterConfigs IncrementalAlterConfigs API to sync topic configurations. 

...

Currently, the integration tests for MirrorMaker do not check if the target topic has the expected configurations after applying configuration filters, hence this bug was not caught. A new New integration test tests that checks target topic configurations once they have been synced and tests different settings of the new configuration will be added. 

Rejected Alternatives

  • We could have changed the current updateConfigs() method to call describeConfigs DescribeConfigs API before alterConfigs  AlterConfigs so that it had an incremental mode. This would have avoided the migration, however, the topics configurations could change between describeConfigs  DescribeConfigs and alterConfigs  AlterConfigs calls so we could still end up syncing incorrect topic configurations. We would need to stop using the deprecated API at some point anyway.