Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces

 

 

  • KStream.java

     

We create a subset of features from ProcessorContext and put into RecordContext interface

Code Block
languagejava
public interface RecordContext {
    String applicationId();
    TaskId taskId();
    StreamsMetrics metrics();
    String topic();
    int partition();
    long offset();
    long timestamp();
    Map<String, Object> appConfigs();
    Map<String, Object> appConfigsWithPrefix(String prefix);
}
 
 
public interface ProcessorContext extends RecordContext {
   // all methods but the ones in RecordContext
}
  • Limiting the ProcessorContext - RecordContext interface

We create a subset of features from ProcessorContext and put into RecordContext interface

Code Block
languagejava
public interface RecordContext {
    String applicationId();
    TaskId taskId();
    StreamsMetrics metrics();
    String topic();
    int partition();
	void commit();
    long offset();
    long timestamp();
    Map<String, Object> appConfigs();
    Map<String, Object> appConfigsWithPrefix(String prefix);
}
 
 
public interface ProcessorContext extends RecordContext {
   // all methods but the ones in RecordContext
}

 

Once we need a conversion from ProcessorContext and RecordContext, we just cast: 

Code Block
languagejava
private class KStreamMapProcessor extends AbstractProcessor<K, V> {
    @Override
    public void init(ProcessorContext processorContext) {
KStream<K, V> filter(Predicate<? super K, ? super V> predicate, final RecordContext recordContext);
 
KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate, final RecordContext recordContext);

<KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper, final RecordContext recordContext);

<VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper, final RecordContext recordContext);

<KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper, final RecordContext recordContext);

<VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> processor);<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows, final RecordContext recordContext);

<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                super.init(processorContext);
        richMapper.init((RecordContext) processorContext);  				// here finalmake ValueJoiner<?a supercast
 V, ? super VO,}

 ? extends VR> joiner,@Override
    public void process(final K key, final V value) {
        V1 newValue = mapper.apply(key, value);
      final JoinWindows windows, context().forward(key, newValue);
    }

    @Override
    public void close() {
        super.close();
      final Serde<K> keySerde, mapper.close();
    }
}

  • Rich Interfaces

 

 

Code Block
languagejava
public interface RichValueMapper<V, VR> {
    VR apply(final V value, final RecordContext recordContext);
}

public interface RichValueJoiner<V1, V2, VR> {
    VR  apply(final Serde<V>V1 thisValueSerdevalue1,
 final V2 value2, final RecordContext recordContext);
}

public interface RichKeyValueMapper<K, V, VR> {
    VR apply(final K key, final V         final Serde<VO> otherValueSerde, 
                             final RecordContext recordContext) 



<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows, 
								 final RecordContext recordContext);

<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows,
                                 final Serde<K> keySerde,
                                 final Serde<V> thisValSerde,
                                 final Serde<VO> otherValueSerde, 
								 final RecordContext recordContext);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows, 
								  final RecordContext recordContext);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows,
                                  final Serde<K> keySerde,
                                  final Serde<V> thisValueSerde,
                                  final Serde<VO> otherValueSerde, 
								  final RecordContext recordContext);

<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final RecordContext recordContext);

<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                             final Serde<K> keySerde,
                             final Serde<V> valSerde, 
							 final RecordContext recordContext);

<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final RecordContext recordContext);

<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                                 final Serde<K> keySerde,
                                 final Serde<V> valSerde,
							     final RecordContext recordContext);

<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
                                 final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                 final ValueJoiner<? super V, ? super GV, ? extends RV> joiner,
								 final RecordContext recordContext);

<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
                                     final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                     final ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner, 
									 final RecordContext recordContext);

 

    • KTable.java

 

Code Block
languagejava
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate);


KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);


KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate);


KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);


<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);


<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName);


<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
                             final Serde<VR> valueSerde,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);


<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);


<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                            final Serde<VR> joinSerde,
                            final String queryableStoreName);


<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                            final StateStoreSupplier<KeyValueStore> storeSupplier);


<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);


<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                final Serde<VR> joinSerde,
                                final String queryableStoreName);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                final StateStoreSupplier<KeyValueStore> storeSupplier);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final Serde<VR> joinSerde,
                                 final String queryableStoreName);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final StateStoreSupplier<KeyValueStore> storeSupplier);

 

    • KGroupedStream.java

 

We create a subset of features from ProcessorContext and put into RecordContext interface

Code Block
languagejava
KTable<K, V> reduce(final Reducer<V> reducer);


KTable<K, V> reduce(final Reducer<V> reducer,
                    final String queryableStoreName);


KTable<K, V> reduce(final Reducer<V> reducer,
                    final StateStoreSupplier<KeyValueStore> storeSupplier);


<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                 final Windows<W> windows,
                                                 final String queryableStoreName);


<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                 final Windows<W> windows);


<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                 final Windows<W> windows,
                                                 final StateStoreSupplier<WindowStore> storeSupplier);

KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                              final SessionWindows sessionWindows,
                              final String queryableStoreName);
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                              final SessionWindows sessionWindows);

KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                              final SessionWindows sessionWindows,
                              final StateStoreSupplier<SessionStore> storeSupplier);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);

<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                         final Aggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final Serde<VR> aggValueSerde,
                                                         final String queryableStoreName);

<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                         final Aggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final Serde<VR> aggValueSerde);


<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                         final Aggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final StateStoreSupplier<WindowStore> storeSupplier);

<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                     final Aggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final String queryableStoreName);

<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                     final Aggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde);

<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                     final Aggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final StateStoreSupplier<SessionStore> storeSupplier);
  • Limiting the ProcessorContext - RecordContext interface

We create a subset of features from ProcessorContext and put into RecordContext interface

Code Block
languagejava
public interface RecordContext {
    String applicationId();
    TaskId taskId();
    StreamsMetrics metrics();
    String topic();
    int partition();
    long offset();
    long timestamp();
    Map<String, Object> appConfigs();
    Map<String, Object> appConfigsWithPrefix(String prefix);
}
 
 
public interface ProcessorContext extends RecordContext {
   // all methods but the ones in RecordContext
}

 

Once we need a conversion from ProcessorContext and RecordContext, we just cast: 

Code Block
languagejava
private class KStreamMapProcessor extends AbstractProcessor<K, V> {
    @Override
    public void init(ProcessorContext processorContext) {
        super.init(processorContext);
        richMapper.init((RecordContext) processorContext);  				// here make a cast
    }

    @Override
    public void process(final K key, final V value) {
        V1 newValue = mapper.apply(key, value);
        context().forward(key, newValue);
    }

    @Override
    public void close() {
        super.close();
        mapper.close();
    }
}

  • Rich Interfaces

If the interface is value-only (like ValueJoiner, ValueMapper) we extend its rich interface from its withKey'ed counterpart. 

If the interface is key-value (KeyValueMapper) we extend its rich interface from itself. 

 

Code Block
languagejava
public interface RichFunction {
    void init(RecordContext recordContext);

    void close();
}
 
public interface ValueMapperWithKey<K, V, VR> {

    VRvalue, final RecordContext recordContext);
}

public interface RichReducer<V> {
    V apply(final V value1, final V value2, final RecordContext recordContext);
}


public interface RichInitializer<VA> {
    VA apply(final RecordContext recordContext);
}


public interface Aggregator<K, V, VA> {
    VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext);
}

 
public interface RichValueMapper<KRichForeachAction<K, V, VR>  extends ValueMapperWithKey<K, V, VR>, RichFunction {

}
 
public interface RichKeyValueMapper<K, V, VR>  extends KeyValueMapper<K, V, VR>, RichFunction {

}
 V> {
    void apply(final K key, final V value, final RecordContext recordContext);
}

 

The same semantics apply to other interfaces as well. 

...