...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In streams applications it is common to chain multiple joins together in-order to enrich a large dataset with some, often, smaller side-data. For example: enriching an Order with customer data. This also maps into the relational world where it is common to join some fact tables with some dimensional data. The facts are often large, with frequent updates and/or new data arrivals, i.e., a Purchases stream, whereas the dimensional data is generally much smaller with a potentially less frequent update rate, i.e., Products.
...
A convenient solution, where the dimensions are of a manageable size, is to replicate the dimension data, in their entirety, onto each Kafka Streams node. This then allows joins to be performed without the prerequisite that the fact stream be partitioned by the dimension table’s key, i.e., all dimension data is available to all partitions of the fact table, so we could perform non-key based joins from a fact to many dimensions regardless of their partitioning key, all at low cost.
Example
We have a bespoke website for selling some custom made goods. When a purchase is made we want to do some data enrichment to immediately produce, and, send detailed invoices to customers. We have three inputs:
...
Now we’ve eliminated the repartitioning, intermediate topics, and the need to co-partition all the input topics. So the flow now looks like:
Public Interfaces
GlobalKTable
A new interface to represent the GlobalKTable. This interface is deliberately restrictive as its primary purpose is to enable non-key joins without having to do any re-partitioning.
Code Block | ||
---|---|---|
| ||
// Represents a Table that is fully replicated on each KafkaStreams instance public interface GlobalKTable<K, V> { <K1, V1, R> GlobalKTable<K, R> join(final GlobalKTable<K1, V1> other, final KeyValueMapper<K, V, K1> keyMapper, final ValueJoiner<V, V1, R> joiner final String queryableViewName); <K1, V1, R> GlobalKTable<K, R> leftJoin(final GlobalKTable<K1, V1> other, final KeyValueMapper<K, V, K1> keyMapper, final ValueJoiner<V, V1, R> joiner, final String queryableViewName); } |
KStream
Add overloaded methods for joining with GlobalKTable
Code Block | ||
---|---|---|
| ||
/** * perform a left join with a GlobalKTable using the provided KeyValueMapper * to map from the (key, value) of the KStream to the key of the GlobalKTable */ <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> replicatedTable, final KeyValueMapper<K, V, K1> keyValueMapper, final ValueJoiner<V, V1, R> valueJoiner); /** * perform a join with a GlobalKTable using the provided KeyValueMapper * to map from the (key, value) of the KStream to the key of the GlobalKTable */ <K1, V1, V2> KStream<K, V2> join(final GlobalKTable<K1, V1> table, final KeyValueMapper<K, V, K1> keyValueMapper, final ValueJoiner<V, V1, V2> joiner); |
KTable
Add overloaded methods for joining with GlobalKTable
Code Block | ||
---|---|---|
| ||
/** * perform a join with a GlobalKTable using the provided KeyValueMapper * to map from the (key, value) of the KTable to the key of the GlobalKTable */ <K1, V1, R> KTable<K, R> join(final GlobalKTable<K1, V1> globalTable, final KeyValueMapper<K, V, K1> keyMapper, final ValueJoiner<V, V1, R> joiner); /** * perform a left join with a GlobalKTable using the provided KeyValueMapper * to map from the (key, value) of the KTable to the key of the GlobalKTable */ <K1, V1, R> KTable<K, R> leftJoin(final GlobalKTable<K1, V1> globalTable, final KeyValueMapper<K, V, K1> keyMapper, final ValueJoiner<V, V1, R> joiner); |
KStreamBuilder
Code Block | ||
---|---|---|
| ||
/** * Add a GlobalKTable to the topology using the provided Serdes */ public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName) /** * Add a GlobalKTable to the topology using default Serdes */ public <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName) |
TopologyBuilder
Code Block |
---|
// add a Global Store that is backed by a source topic to the TopologyBuilder public synchronized TopologyBuilder addGlobalStore(final StateStore store, final String sourceName, final Deserializer keyDeserializer, final Deserializer valueDeserializer, final String topic, final String processorName, final ProcessorSupplier stateUpdateSupplier) // A method to add Global Stores that are not backed by a source topic, i.e., the view that is the result of a join between two GlobalKTables public synchronized TopologyBuilder addGlobalStore(final StateStore store) // All Global State Stores will be part of a single ProcessorTopology // this provides an easy way to build that topology public synchronized ProcessorTopology buildGlobalStateTopology(); // Retrieve the Global State Stores from the builder. public synchronized Map<String, StateStore> builder.globalStateStores() |
...
Compatibility, Deprecation, and Migration Plan
- None - this is a new feature and doesn’t impact any existing usages.
Test Plan
- Unit tests to validate that all the individual components work as expected.
- Integration and/or System tests to ensure that the feature works correctly end-to-end.
Rejected Alternatives
Replicating per task: Each StreamTask would have its own complete replica of the table. Though this would provide the ability to do best-effort time synchronization it would be too costly in terms of resource usage.
Replicating per thread: Doesn’t provide any benefit beyond per instance, but adds additional replication overhead
Introduce a broadcastJoin() API that uses the existing KTable interface and automatically converts it, internally, to a global table. The feeling is that this would muddy the API, i.e. it is better to be explicit with the type.