...
This is a proposal to get rid of the Type CombinedKey in the return type. We would internally use a Combined key and a Combined Key Serde and apply the mappers only at the processing boundaries (ValueGetterSupplier, context.forward). The data will still be serialized for repartitioning in a way that is specific to Kafka and might prevent users from using their default tooling with these topics.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * * Joins one record of this KTable to n records of the other KTable, * an update in this KTable will update all n matching records, an update * in other table will update only the one matching record. * * @param the table containing n records for each K of this table * @param keyExtractor a {@code ValueMapper} returning the key of this table from the others value * @param customCombinedKey a {@code ValueMapper} allowing the CombinedKey to be wrapped in a custom object * @param combinedKey a {@code ValueMapper} allowing to unwrap the custom object again. * @param <VO> the resultings tables Value * @param joiner * @return */ <KO VO, K1, V1> KTable<KO,VO> oneToManyJoin(KTable<K1, V1> other, ValueMapper<V1, K> keyExtractor, ValueMapper<CombinedKey<K1,K>,KO> customCombinedKey, ValueMapper<KO,CombinedKey<K1,K>> combinedKey, ValueJoiner<V, V1, VO> joiner, Serde<K1> keyOtherSerde, Serde<V1> valueOtherSerde, Serde<VO> joinValueSerde); |
...
Introducing an additional new Serde. This is the approach is the counterpart to having a back and forth mapper. With this approach it is possible to keep any Custom serialization mechanism off the wire. How to serialize is completely with the user.
Code Block |
---|
package org.apache.kafka.streams; public class CombinedKeySerde<K,K1> extends Serde<CombinedKey<K,K1>> { public byte[]Serializer<K> getPartialKeybytesgetPartialKeySerializer(K prefixKey); } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * * Joins one record of this KTable to n records of the other KTable, * an update in this KTable will update all n matching records, an update * in other table will update only the one matching record. * * @param the table containing n records for each K of this table * @param keyExtractor a {@code ValueMapper} returning the key of this table from the others value * @param leftKeyExtractor a {@code ValueMapper} extracting the Key of this table from the resulting Key * @param <VO> the resultings tables Value * @param joiner * @return */ <VO, K1, V1> KTable<CombinedKey<K,K1>,VO> oneToManyJoin(KTable<K1, V1> other, ValueMapper<V1, K> keyExtractor, ValueJoiner<V, V1, VO> joiner, Serde<K1> keyOtherSerde, Serde<V1> valueOtherSerde, Serde<VO> joinValueSerde, CombinedKeySerde<KO,KCombinedKeySerde<K,K1> combinedKeySerde); |
...
Streams
We will implement a default CombinedKeySerde that will use a regular length encoding for both fields. So calls to the "intrusive approach" would constuct a default CombinedKeySerde and invoke the Serde Overload. This would work with all serde frameworks if the user is not interested in how the data is serialized in the topics.
Protobuf / Avro / thrift / Hadoop-Writeable / Custom
Users of these frameworks should have a very easy time implementing a CombinedKeySerde. Essentially they define an object that wraps K and K1 as usual keeping K1 as an optional field. The serializer returned from getPartialKeySerializer() would do the following:
- create such a wrapping object
- set the value for the K field
- serialize the wrapped object as usual.
This should work straight forward and users might implement a CombinedKeySerde that is specific to their framework and reuse the logic without implementing a new Serde for each key-pair.
JSON
Implementing a CombinedKeySerde depends on the specific framework with json. A full key would look like this "{ "a" :{ "key":"a1" }, "b": {"key":"b5" } }" to generate a true prefix one had to generate "{ "a" :{ "key":"a1" }", which is not valid json. This invalid Json will not leave the jvm but it might be more or less tricky to implement a serializer generating it. Maybe we could provide users with a utility method to make sure their serde statisfies our invariants.
Proposed Changes
Goal
With the two relations A,B and there is one A for each B and there may be many B's for each A. A is represented by the KTable the method described above gets invoked on, while B is represented by that methods first argument. We want to implement a Set of processors that allows a user to create a new KTable where A and B are joined based on the reference to A in B. A and B are represented as KTable B being partitioned by B's key and A being partitioned by A's key.
...