Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Under Discussion Accepted

Discussion thread: here

JIRA: here KAFKA-4490

Released: 0.10.2

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

A new interface to represent the GlobalKTable. This interface is deliberately restrictive as its primary purpose is to enable non-key joins without having to do any re-partitioning.

The joins between GlobalKTables are never materialized to a Physical StateStore, and aren't backed by another change-log topic. Instead they are just a view on top of the joined GlobalKTables and the joins are resolved on demand. 

Code Block
languagejava
// Represents a Table that is fully replicated on each KafkaStreams instance
public interface GlobalKTable<K, V> {
	<K1, V1, R> GlobalKTable<K, R> join(final GlobalKTable<K1, V1> other,
                              }


 

KStream

Add overloaded methods for joining with GlobalKTable

Code Block
languagejava
/**
 * perform a left join with a GlobalKTable using the provided KeyValueMapper
 * to map from the (key, value) of the KStream to the key of the GlobalKTable
 */
<K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> replicatedTable, 
				     	    final KeyValueMapper<K, V, K1> keyMapper,
      final KeyValueMapper<K, V, K1> keyValueMapper,
 				                   final ValueJoiner<V, V1, R> valueJoiner);

/**
 * perform a join with final ValueJoiner<V, V1, R> joiner
   										final String queryableViewName);


<K1, V1, R> GlobalKTable<K, R> leftJoin(final GlobalKTable<K1, V1> other,
         a GlobalKTable using the provided KeyValueMapper
 * to map from the (key, value) of the KStream to the key of the GlobalKTable
 */
<K1, V1, V2> KStream<K, V2> join(final GlobalKTable<K1, V1> table, 
					             final KeyValueMapper<K, V, K1> keyValueMapper,
 					             final KeyValueMapper<KValueJoiner<V, VV1, V2> joiner);


 

KTable

Add overloaded methods for joining with GlobalKTable

 

Code Block
languagejava
/**
 * perform a join with a GlobalKTable using the provided KeyValueMapper
 * to map from the (key, value) of the KTable to the key of the GlobalKTable
 */
<K1, V1, R> KTable<K, R> join(final GlobalKTable<K1, V1> globalTable,
K1> keyMapper,
                                        final ValueJoiner<V, V1, R> joiner,
             final KeyValueMapper<K, V, K1> keyMapper,
                       final String queryableViewName);


}


 

KStream

Add overloaded methods for joining with GlobalKTable

Code Block
languagejava
      final ValueJoiner<V, V1, R> joiner);

/**
 * perform a left join with a GlobalKTable using the provided KeyValueMapper
 * to map from the (key, value) of the KStreamKTable to the key of the GlobalKTable
 */
<K1, V1, R> KStream<KKTable<K, R> leftJoin(final GlobalKTable<K1, V1> replicatedTableglobalTable,
 
				                    final KeyValueMapper<K, V, K1> keyValueMapper,
 				        final KeyValueMapper<K, V, K1> keyMapper,
       final ValueJoiner<V, V1, R> valueJoiner);

/**
 * perform a join with a GlobalKTable using the provided KeyValueMapper
 * to map from the (key, value) of the KStream to thefinal keyValueJoiner<V, of the V1, R> joiner);

 

 

KStreamBuilder

Code Block
languagejava
/**
 * Add a GlobalKTable to the topology using the provided SerdesGlobalKTable
 */ 
<K1,public V1<K, V2>V> KStream<KGlobalKTable<K, V2>V> joinglobalTable(final Serde<K> GlobalKTable<K1keySerde, V1> table, 

						                     final KeyValueMapper<K,Serde<V> VvalSerde, K1> keyValueMapper,
 						                     final String ValueJoiner<V, V1, V2> joiner);


 

KTable

Add overloaded methods for joining with GlobalKTable

 

Code Block
languagejava
/**
 * perform a join with a GlobalKTable using the provided KeyValueMapper
 * to map from the (key, value) of the KTable to the key of the GlobalKTable
 */
<K1, V1, R> KTable<K, R> join(final GlobalKTable<K1, V1> globalTable,
                   topic, 
                                             final String storeName)

/**
 * Add a GlobalKTable to the topology using default Serdes
 */ 
public <K, V> GlobalKTable<K, V> globalTable(final String topic, 
           final KeyValueMapper<K, V, K1> keyMapper,
                              final ValueJoiner<V, V1, R> joiner);

/**
 * perform a left join with a GlobalKTable using the provided KeyValueMapper
 * to map from the (key, value) of the KTable to the key of the GlobalKTable
 */
<K1, V1, R> KTable<K, R> leftJoin(final GlobalKTable<K1, V1> globalTable,
String storeName)

TopologyBuilder

Code Block
// add a Global Store that is backed by a source topic to the TopologyBuilder
public synchronized TopologyBuilder addGlobalStore(final StateStore store,                                                   
             final KeyValueMapper<K, V, K1> keyMapper,
                                  final String ValueJoiner<VsourceName,
 V1, R> joiner);

 

 

KStreamBuilder

Code Block
languagejava
/**
 * Add a GlobalKTable to the topology using the provided Serdes
 */ 
public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, 
						                     final Serde<V> valSerde, 
						                     final String topic, 
                                             final String storeName)

/**
 * Add a GlobalKTable to the topology using default Serdes
 */ 
public <K, V> GlobalKTable<K, V> globalTable(final String topic, 
                                             final String storeName)

TopologyBuilder

Code Block
// add a Global Store that is backed by a source topic to the TopologyBuilder
public synchronized TopologyBuilder addGlobalStore(final StateStore store,                                                   
                                                   final String sourceName,
   												   final Deserializer keyDeserializer,
   												   final Deserializer valueDeserializer,
   			 												   final StringDeserializer topickeyDeserializer,
   												   final StringDeserializer processorName,
   											       final ProcessorSupplier stateUpdateSupplier)
 
// A method to add Global Stores that are not backed by a source topic, i.e., the view that is the result of a join between two GlobalKTables
public synchronized TopologyBuilder addGlobalStore(final StateStore store)
 
valueDeserializer,
   												   final String topic,
   												   final String processorName,
   											       final ProcessorSupplier stateUpdateSupplier)
 
 
// All Global State Stores will be part of a single ProcessorTopology
// this provides an easy way to build that topology
public synchronized ProcessorTopology buildGlobalStateTopology();
 
// Retrieve the Global State Stores from the builder. 
public synchronized Map<String, StateStore> builder.globalStateStores()

 

 

...

  • Replicating per thread: Doesn’t provide any benefit beyond per instance, but adds additional replication overhead

  • Introduce a broadcastJoin() API that uses the existing KTable interface and automatically converts it, internally, to a global table. The feeling is that this would muddy the API, i.e. it is better to be explicit with the type.
  • Supporting GlobalKTable to GlobalKTable outerJoin: There are many constraints around outerJoin that we cannot check during the building of the outerJoin at runtime. Further this would require KeyValueMappers in both directionsjoins: In order to fully support bi-directional joins would require KeyValueMappers , and either materializing the outerJoin join in a Physical StateStore, i.e., RocksDB, or by having yet another interface to Map from the key of the outerJoin join table to the keys of both input tables. We decided that for now we will not support outerJoin GlobalKTable/GlobalKTable  joins, but revisit in the future should the need arise.