Table of Contents |
---|
Status
Current state: Under Discussion Accepted
Discussion thread: here
JIRA: here KAFKA-4490
Released: 0.10.2
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
A new interface to represent the GlobalKTable. This interface is deliberately restrictive as its primary purpose is to enable non-key joins without having to do any re-partitioning.
Code Block | ||
---|---|---|
| ||
// Represents a Table that is fully replicated on each KafkaStreams instance public interface GlobalKTable<K, V> { <K1, V1, R> GlobalKTable<K, R> join(final GlobalKTable<K1, V1> other, } |
KStream
Add overloaded methods for joining with GlobalKTable
Code Block | ||
---|---|---|
| ||
/** * perform a left join with a GlobalKTable using the provided KeyValueMapper * to map from the (key, value) of the KStream to the key of the GlobalKTable */ <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> replicatedTable, final KeyValueMapper<K, V, K1> keyMapperkeyValueMapper, final ValueJoiner<V, V1, R> valueJoiner); /** * perform a join with a GlobalKTable using the provided KeyValueMapper * to map from finalthe ValueJoiner<V, V1, R> joiner final String queryableViewName); (key, value) of the KStream to the key of the GlobalKTable */ <K1, V1, R>V2> GlobalKTable<KKStream<K, R>V2> leftJoinjoin(final GlobalKTable<K1, V1> othertable, final KeyValueMapper<K, V, K1> keyValueMapper, final KeyValueMapper<KValueJoiner<V, VV1, V2> joiner); |
KTable
Add overloaded methods for joining with GlobalKTable
Code Block | ||
---|---|---|
| ||
/** * perform a join with a GlobalKTable using the provided KeyValueMapper * to map from the (key, value) of the KTable to the key of the GlobalKTable */ <K1, V1, R> KTable<K, R> join(final GlobalKTable<K1, V1> globalTable, K1> keyMapper, final ValueJoiner<V, V1, R> joiner, final KeyValueMapper<K, V, K1> keyMapper, final String queryableViewName); } |
KStream
Add overloaded methods for joining with GlobalKTable
Code Block | ||
---|---|---|
| ||
final ValueJoiner<V, V1, R> joiner); /** * perform a left join with a GlobalKTable using the provided KeyValueMapper * to map from the (key, value) of the KStreamKTable to the key of the GlobalKTable */ <K1, V1, R> KStream<KKTable<K, R> leftJoin(final GlobalKTable<K1, V1> replicatedTableglobalTable, final KeyValueMapper<K, V, K1> keyValueMapper, final KeyValueMapper<K, V, K1> keyMapper, final ValueJoiner<V, V1, R> valueJoiner); /** * perform a join with a GlobalKTable using the provided KeyValueMapper * to map from the (key, value) of the KStream to thefinal keyValueJoiner<V, of the V1, R> joiner); |
KStreamBuilder
Code Block | ||
---|---|---|
| ||
/** * Add a GlobalKTable to the topology using the provided SerdesGlobalKTable */ <K1,public V1<K, V2>V> KStream<KGlobalKTable<K, V2>V> joinglobalTable(final GlobalKTable<K1Serde<K> keySerde, V1> table, final KeyValueMapper<K,Serde<V> VvalSerde, K1> keyValueMapper, final String ValueJoiner<V, V1, V2> joiner); |
KTable
Add overloaded methods for joining with GlobalKTable
Code Block | ||
---|---|---|
| ||
/** * perform a join with a GlobalKTable using the provided KeyValueMapper * to map from the (key, value) of the KTable to the key of the GlobalKTable */ <K1, V1, R> KTable<K, R> join(final GlobalKTable<K1, V1> globalTable, topic, final String storeName) /** * Add a GlobalKTable to the topology using default Serdes */ public <K, V> GlobalKTable<K, V> globalTable(final String topic, final KeyValueMapper<K, V, K1> keyMapper, final ValueJoiner<V, V1, R> joiner); /** * perform a left join with a GlobalKTable using the provided KeyValueMapper * to map from the (key, value) of the KTable to the key of the GlobalKTable */ <K1, V1, R> KTable<K, R> leftJoin(final GlobalKTable<K1, V1> globalTable, String storeName) |
TopologyBuilder
Code Block |
---|
// add a Global Store that is backed by a source topic to the TopologyBuilder public synchronized TopologyBuilder addGlobalStore(final StateStore store, final KeyValueMapper<K, V, K1> keyMapper, final String ValueJoiner<VsourceName, V1, R> joiner); |
KStreamBuilder
Code Block | ||
---|---|---|
| ||
/** * Add a GlobalKTable to the topology using the provided Serdes */ public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Deserializer keyDeserializer, final Deserializer valueDeserializer, final String topic, final String processorName, final final Serde<V> valSerde, final String topic, final String storeName) /** * Add a GlobalKTable to the topology using default Serdes */ public <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName) |
TopologyBuilder
Code Block |
---|
// add a Global Store that is backed by a source topic to the TopologyBuilder
public synchronized TopologyBuilder addGlobalStore(final StateStore store,
final String sourceName,
final Deserializer keyDeserializer,
final Deserializer valueDeserializer,
final String topic,
final String processorName,
final ProcessorSupplier stateUpdateSupplier)
// A method to add Global Stores that are not backed by a source topic, i.e., the view that is the result of a join between two GlobalKTables
public synchronized TopologyBuilder addGlobalStore(final StateStore store)
// All Global State Stores will be part of a single ProcessorTopology
// this provides an easy way to build that topology
public synchronized ProcessorTopology buildGlobalStateTopology();
// Retrieve the Global State Stores from the builder.
public synchronized Map<String, StateStore> builder.globalStateStores() |
Proposed Changes
ProcessorSupplier stateUpdateSupplier)
// All Global State Stores will be part of a single ProcessorTopology
// this provides an easy way to build that topology
public synchronized ProcessorTopology buildGlobalStateTopology();
// Retrieve the Global State Stores from the builder.
public synchronized Map<String, StateStore> builder.globalStateStores() |
Proposed Changes
On close of the GlobalKTable, we will write out a checkpoint file to the global table’s state directory, containing the global table’s state directory, located under the global sub-directory of StreamsConfig.STATE_DIR_CONFIG, i.e, /state.dir/appId/global, containing the current offset for each partition in the GlobalKTable. The checkpoint file will be used to recover during restarts. On restart we will first check for the existence of the checkpoint file, if it exists we will read in the current offsets for each partition and seek to those offsets and start consuming. In the event the checkpoint file doesn’t exist, we will restore from the earliest offset.
...
Code Block |
---|
final GlobalKTable products = builder.globalTable(“product”, “products-store”); final KStream orders = builder.stream(“orders”); order.leftJoin(products, products, (key, value) -> value.productId(), (key, order, product) -> (key, value) -> value.productId(), (key, order, product) -> new EnrichedOrder(order, product));new EnrichedOrder(order, product)); |
Compatibility, Deprecation, and Migration Plan
...
- Unit tests to validate that all the individual components work as expected.
- Integration and/or System tests to ensure that the feature works correctly end-to-end.
Rejected Alternatives and Future Work
- Replicating per task: Each StreamTask would have its own complete replica of the table. Though this would provide the ability to do best-effort time synchronization it would be too costly in terms of resource usage.
Replicating per thread: Doesn’t provide any benefit beyond per instance, but adds additional replication overhead
- Introduce a broadcastJoin() API that uses the existing KTable interface and automatically converts it, internally, to a global table. The feeling is that this would muddy the API, i.e. it is better to be explicit with the typethe API, i.e. it is better to be explicit with the type.
- Supporting GlobalKTable to GlobalKTable joins: In order to fully support bi-directional joins would require KeyValueMappers , and either materializing the join in a Physical StateStore, i.e., RocksDB, or by having yet another interface to Map from the key of the join table to the keys of both input tables. We decided that for now we will not support GlobalKTable/GlobalKTable joins, but revisit in the future should the need arise.