Status

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-12192

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Historically, the `topology.optimizations` configuration for Kafka Streams was meant to help ensure backwards compatibility (see KIP-295: Add Streams Configuration Allowing for Optional Topology Optimization) and assumed that there would be only two valid options: none and all. There are some optimizations under that umbrella, however, that may have unintended side-effects. Specifically, the source-changelog optimization that reuses the source topic as a changelog has two issues:

First, it assumes that the serde being used is symmetric (i.e. serialize(deserialize(bytes)) == bytes). Because of this assumption, Kafka Streams is able to pass the source topic name (instead of the changelog topic name, which doesn't exist) into the serializer that writes to the RocksDB store (see KAFKA-10179). To illustrate why this is not always desirable, consider the following use case:

In this situation, Kafka Streams will register a new schema to the source topic subject in Schema Registry, even though no messages with that schema exist in the source topic.

Second, it assumes that there are no "dropped" messages in the deserialization exception handler (see KAFKA-8037 for more details).

For the two reasons above, some users may want to disable the source-changelog optimization while keeping other optimizations under the `topology.optimizations` config enabled. In the future, there may also be more optimizations that users may want to selectively enable/disable.

Public Interfaces

We will add the following configuration to `StreamsConfig`:

TOPOLOGY_OPTIMIZATION_DISABLED_LIST_CONFIG = "topology.optimization.disabled.list";
TOPOLOGY_OPTIMIZATION_DISABLED_LIST_DOC = 
    "If topology.optimization" is set to \"all\", this configuration allows users to selectively disable some optimizations. " 
    + "Currently, users can toggle the following optimizations: " + SOURCE_CHANGELOG;

SOURCE_CHANGELOG_OPTIMIZATION = "source_changelog";

...

.define(TOPOLOGY_OPTIMIZATION_DISABLED_LIST_CONFIG,
        Type.LIST,
        "",
        ValidList.in(SOURCE_CHANGELOG_OPTIMIZATION),
        Importance.LOW,
        TOPOLOGY_OPTIMIZATION_DISABLED_LIST_DOC)


Proposed Changes

If the topology.optimization.disabled.list contains "source_changelog", then the InternalStreamsBuilder class will not call the optimizeKTableSourceTopics() method as part of maybePerformOptimizations.

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives