...
Public Interfaces
KStream.java
We create a subset of features from ProcessorContext
and put into RecordContext
interface
Code Block | ||
---|---|---|
| ||
public interface RecordContext {
String applicationId();
TaskId taskId();
StreamsMetrics metrics();
String topic();
int partition();
long offset();
long timestamp();
Map<String, Object> appConfigs();
Map<String, Object> appConfigsWithPrefix(String prefix);
}
public interface ProcessorContext extends RecordContext {
// all methods but the ones in RecordContext
} |
Limiting the ProcessorContext - RecordContext interface
We create a subset of features from ProcessorContext
and put into RecordContext
interface
Code Block | ||
---|---|---|
| ||
public interface RecordContext {
String applicationId();
TaskId taskId();
StreamsMetrics metrics();
String topic();
int partition();
void commit();
long offset();
long timestamp();
Map<String, Object> appConfigs();
Map<String, Object> appConfigsWithPrefix(String prefix);
}
public interface ProcessorContext extends RecordContext {
// all methods but the ones in RecordContext
} |
Once we need a conversion from ProcessorContext
and RecordContext, we just cast:
Code Block | ||
---|---|---|
| ||
private class KStreamMapProcessor extends AbstractProcessor<K, V> { @Override public void init(ProcessorContext processorContext) { KStream<K, V> filter(Predicate<? super K, ? super V> predicate, final RecordContext recordContext); KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate, final RecordContext recordContext); <KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper, final RecordContext recordContext); <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper, final RecordContext recordContext); <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper, final RecordContext recordContext); <VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> processor);<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final RecordContext recordContext); <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, super.init(processorContext); richMapper.init((RecordContext) processorContext); // here finalmake ValueJoiner<?a supercast V, ? super VO,} ? extends VR> joiner,@Override public void process(final K key, final V value) { V1 newValue = mapper.apply(key, value); final JoinWindows windows, context().forward(key, newValue); } @Override public void close() { super.close(); final Serde<K> keySerde, mapper.close(); } } |
Rich Interfaces
Code Block | ||
---|---|---|
| ||
public interface RichValueMapper<V, VR> { VR apply(final V value, final RecordContext recordContext); } public interface RichValueJoiner<V1, V2, VR> { VR apply(final Serde<V>V1 thisValueSerdevalue1, final V2 value2, final RecordContext recordContext); } public interface RichKeyValueMapper<K, V, VR> { VR apply(final K key, final V final Serde<VO> otherValueSerde, final RecordContext recordContext) <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final RecordContext recordContext); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final Serde<K> keySerde, final Serde<V> thisValSerde, final Serde<VO> otherValueSerde, final RecordContext recordContext); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final RecordContext recordContext); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final Serde<K> keySerde, final Serde<V> thisValueSerde, final Serde<VO> otherValueSerde, final RecordContext recordContext); <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final RecordContext recordContext); <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final Serde<K> keySerde, final Serde<V> valSerde, final RecordContext recordContext); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final RecordContext recordContext); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final Serde<K> keySerde, final Serde<V> valSerde, final RecordContext recordContext); <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable, final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper, final ValueJoiner<? super V, ? super GV, ? extends RV> joiner, final RecordContext recordContext); <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable, final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper, final ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner, final RecordContext recordContext); |
KTable.java
Code Block | ||
---|---|---|
| ||
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate);
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate);
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName);
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Serde<VR> valueSerde,
final StateStoreSupplier<KeyValueStore> storeSupplier);
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Serde<VR> joinSerde,
final String queryableStoreName);
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final StateStoreSupplier<KeyValueStore> storeSupplier);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Serde<VR> joinSerde,
final String queryableStoreName);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final StateStoreSupplier<KeyValueStore> storeSupplier);
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Serde<VR> joinSerde,
final String queryableStoreName);
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final StateStoreSupplier<KeyValueStore> storeSupplier); |
KGroupedStream.java
We create a subset of features from ProcessorContext
and put into RecordContext
interface
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final Reducer<V> reducer);
KTable<K, V> reduce(final Reducer<V> reducer,
final String queryableStoreName);
KTable<K, V> reduce(final Reducer<V> reducer,
final StateStoreSupplier<KeyValueStore> storeSupplier);
<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows,
final String queryableStoreName);
<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows);
<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows,
final StateStoreSupplier<WindowStore> storeSupplier);
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows,
final String queryableStoreName);
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows);
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows,
final StateStoreSupplier<SessionStore> storeSupplier);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Serde<VR> aggValueSerde,
final String queryableStoreName);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Serde<VR> aggValueSerde);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final StateStoreSupplier<KeyValueStore> storeSupplier);
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Windows<W> windows,
final Serde<VR> aggValueSerde,
final String queryableStoreName);
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Windows<W> windows,
final Serde<VR> aggValueSerde);
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Windows<W> windows,
final StateStoreSupplier<WindowStore> storeSupplier);
<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Merger<? super K, T> sessionMerger,
final SessionWindows sessionWindows,
final Serde<T> aggValueSerde,
final String queryableStoreName);
<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Merger<? super K, T> sessionMerger,
final SessionWindows sessionWindows,
final Serde<T> aggValueSerde);
<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Merger<? super K, T> sessionMerger,
final SessionWindows sessionWindows,
final Serde<T> aggValueSerde,
final StateStoreSupplier<SessionStore> storeSupplier); |
Limiting the ProcessorContext - RecordContext interface
We create a subset of features from ProcessorContext
and put into RecordContext
interface
Code Block | ||
---|---|---|
| ||
public interface RecordContext {
String applicationId();
TaskId taskId();
StreamsMetrics metrics();
String topic();
int partition();
long offset();
long timestamp();
Map<String, Object> appConfigs();
Map<String, Object> appConfigsWithPrefix(String prefix);
}
public interface ProcessorContext extends RecordContext {
// all methods but the ones in RecordContext
} |
Once we need a conversion from ProcessorContext
and RecordContext, we just cast:
Code Block | ||
---|---|---|
| ||
private class KStreamMapProcessor extends AbstractProcessor<K, V> {
@Override
public void init(ProcessorContext processorContext) {
super.init(processorContext);
richMapper.init((RecordContext) processorContext); // here make a cast
}
@Override
public void process(final K key, final V value) {
V1 newValue = mapper.apply(key, value);
context().forward(key, newValue);
}
@Override
public void close() {
super.close();
mapper.close();
}
}
|
Rich Interfaces
If the interface is value-only (like ValueJoiner, ValueMapper
) we extend its rich interface from its withKey'ed counterpart.
If the interface is key-value (KeyValueMapper) we extend its rich interface from itself.
Code Block | ||
---|---|---|
| ||
public interface RichFunction { void init(RecordContext recordContext); void close(); } public interface ValueMapperWithKey<K, V, VR> { VRvalue, final RecordContext recordContext); } public interface RichReducer<V> { V apply(final V value1, final V value2, final RecordContext recordContext); } public interface RichInitializer<VA> { VA apply(final RecordContext recordContext); } public interface Aggregator<K, V, VA> { VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext); } public interface RichValueMapper<KRichForeachAction<K, V, VR> extends ValueMapperWithKey<K, V, VR>, RichFunction { } public interface RichKeyValueMapper<K, V, VR> extends KeyValueMapper<K, V, VR>, RichFunction { } V> { void apply(final K key, final V value, final RecordContext recordContext); } |
The same semantics apply to other interfaces as well.
...