Status
Current state: "Under Discussion"
Discussion thread: TBD
JIRA:
Released: 2.6 (target)
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Today we have multiple ways to infer and inherit serde along the topology, and only fall back to the configured serde when inference does not apply. More specifically, the serde overriding precedence is the following:
- Serde specified explicitly via the control objects, such as Consumed, Produced, Grouped, etc.
Serde inferred from the parent / child operators which are specified explicitly. For example, for a topology like builder.stream(.., Consumed.of(..)).groupByKey().reduce() where the source stream's key-value serdes are specified via `Consumed`, then the reduced table's key and value serdes can inherit from the source stream since their types are not changed.
- Serde specified via the global config ("default.key.serde" and "default.value.serde").
Since this serde overriding logic is executed implicitly, it is hard for users to infer which serdes are actually going to be used. And if a user mistakenly sets a serde or simply default to the configured serde which mismatches, they will not get informed until they start processing and get a runtime ClassCastException.
So I'd propose we augment the topology description with serde information on place that would execute serde, i.e. source / sink topics and state store operators.
Public Interfaces
The generated String from TopologyDescription#toString would be augmented with the serde information in the form of:
keySerde: [SerdeClassType], valueSerde: [SerdeClassType]
To illustrate with a concrete example, suppose we have the following application code:
builder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)) .groupBy(..., Grouped.with(STRING_SERDE, STRING_SERDE)) .windowedBy(...) .count(Materialized.as("counts")) .suppress(...withName("myname")) .toStream() .map(...) .to("output", Produced.with(STRING_SERDE, Serdes.Long()));
The current topology-description would be the following:
Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [input]) --> KSTREAM-KEY-SELECT-0000000001 Processor: KSTREAM-KEY-SELECT-0000000001 (stores: []) --> counts-repartition-filter <-- KSTREAM-SOURCE-0000000000 Processor: counts-repartition-filter (stores: []) --> counts-repartition-sink <-- KSTREAM-KEY-SELECT-0000000001 Sink: counts-repartition-sink (topic: counts-repartition) <-- counts-repartition-filter Sub-topology: 1 Source: counts-repartition-source (topics: [counts-repartition]) --> KSTREAM-AGGREGATE-0000000002 Processor: KSTREAM-AGGREGATE-0000000002 (stores: [counts]) --> myname <-- counts-repartition-source Processor: myname (stores: [myname-store]) --> KTABLE-TOSTREAM-0000000006 <-- KSTREAM-AGGREGATE-0000000002 Processor: KTABLE-TOSTREAM-0000000006 (stores: []) --> KSTREAM-MAP-0000000007 <-- myname Processor: KSTREAM-MAP-0000000007 (stores: []) --> KSTREAM-SINK-0000000008 <-- KTABLE-TOSTREAM-0000000006 Sink: KSTREAM-SINK-0000000008 (topic: output-suppressed) <-- KSTREAM-MAP-0000000007
With this proposal, the augmented topology-description would be the following:
Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [input], keySerde: StringDeserializer, valueSerde: StringDeserializer) --> KSTREAM-KEY-SELECT-0000000001 Processor: KSTREAM-KEY-SELECT-0000000001 (stores: []) --> counts-repartition-filter <-- KSTREAM-SOURCE-0000000000 Processor: counts-repartition-filter (stores: []) --> counts-repartition-sink <-- KSTREAM-KEY-SELECT-0000000001 Sink: KSTREAM-SINK-0000000003 (topic: counts-repartition, keySerde: StringSerializer, valueSerde: StringSerializer) <-- counts-repartition-filter Sub-topology: 1 Source: counts-repartition-source (topics: [counts-repartition], keySerde: StringDeserializer, valueSerde: StringDeserializer) --> KSTREAM-AGGREGATE-0000000002 Processor: KSTREAM-AGGREGATE-0000000002 (stores: [(counts, serdes: [StringSerde, LongSerde])]) --> myname <-- counts-repartition-source Processor: myname (stores: [(myname-store, serdes: [SessionWindowedSerde, FullChangeSerde])]) --> KTABLE-TOSTREAM-0000000006 <-- KSTREAM-AGGREGATE-0000000002 Processor: KTABLE-TOSTREAM-0000000006 (stores: []) --> KSTREAM-MAP-0000000007 <-- myname Processor: KSTREAM-MAP-0000000007 (stores: []) --> KSTREAM-SINK-0000000008 <-- KTABLE-TOSTREAM-0000000006 Sink: KSTREAM-SINK-0000000008 (topic: output-suppressed, keySerde: StringSerializer, valueSerde: LongSerializer) <-- KSTREAM-MAP-0000000007
Proposed Changes
Note that the augmented topology description only contains the serde class name, but it does not necessarily include the inner class name.
Compatibility, Deprecation, and Migration Plan
If there are any applications that depends on parsing the string value for, e.g. visualizing the topology description, then their code needs to be updated accordingly. I think this is okay to break such compatibility without introducing a deprecation phase of it.
Rejected Alternatives
None.