...
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
This KIP combines KIP-149 and provides an atomic a hybrid solution to rich functions in Streams and accessing read-only keys within ValueJoiner, ValueTransformer, ValueMapper interfaces.
Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods. For example:
...
- having access to RecordContext within an operator
- having access to a read-only key for
XXXWithKey
interfacesValueJoiner, ValueTransformer, ValueMapper interfaces
Rich Interfaces
Code Block | ||
---|---|---|
| ||
public interface RichValueMapper<KRichInitializer<V, V, VR>K> { VRV apply(final K key, final V value, final RecordContext recordContext); } public interface RichValueJoiner<KRichValueMapper<V, V1VR, V2, VR>K> { VR apply(final V value, final K key, final RecordContext recordContext); } public interface RichValueJoiner<V1, V2, VR, K> { VR apply(final V1 value1, final V2 value2, final K key, final RecordContext recordContext); } public interface RichKeyValueMapper<K, V, VR> { VR apply(final K key, final V value, final RecordContext recordContext); } public interface RichReducer<KRichReducer<V, V>K> { V apply(final KV keyvalue1, final V value1value2, final VK value2key, final RecordContext recordContext); } public interface RichAggregator<K, V, VA> { VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext); } public interface RichForeachAction<K, V> { void apply(final K key, final V value, final RecordContext recordContext); } public interface RichPredicate<K, V> { boolean test(final K key, final V value, final RecordContext recordContext); } public interface RichValueTransformer<KRichMerger<K, V, VR>V> { voidV initapply(final K ProcessorContext context); VR transform(final V value); @Deprecated VR punctuate(final K key, final long timestampaggKey, final V aggOne, final V aggTwo, final RecordContext recordContext); } public interface RichValueTransformer<V, VR, K> { void init(final ProcessorContext context); VR transform(final V value, final K key); void close(); } public interface RichValueTransformerSupplier<KRichValueTransformerSupplier<V, VVR, VR>K> { RichValueTransformer<KRichValueTransformer<V, VVR, VR>K> get(); } |
Public Interfaces
...
Code Block | ||
---|---|---|
| ||
KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate); KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate); <KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper); <KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper); <VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR> VR, ? super K> mapper); <KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper); <VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>> mapper);VR>, ? super K> mapper); void foreach(final RichForeachAction<? super K, ? super V> action); KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action); KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates); <VR> KStream<K, VR> transformValues(final RichValueTransformerSupplier<? super V, ? extends VR>VR, ? super K> valueTransformerSupplier, final String... stateStoreNames); <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector); <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector, final Serialized<KR, V> serialized); <KR> KGroupedStream<KR <VO, VR> KStream<K, V>VR> groupByjoin(final RichKeyValueMapper<?KStream<K, superVO> KotherStream, ? super V, KR> selector, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? finalsuper Serde<KR>K> keySerdejoiner, final Serde<V>JoinWindows valSerdewindows); <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final RichValueJoiner<? super V, ? super VO, ? extends VR> joinerVR, ? super K> joiner, final JoinWindows windows, final Joined<K, V, VO> joined); <VO, VR> KStream<K, VR> joinleftJoin(final KStream<K, VO> otherStream, final RichValueJoiner<? super V, ? super VO, ? extends VR>VR, ? super K> joiner, final JoinWindows windows); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final Joined<K, V, VO> joined); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner, final RichValueJoiner<? super V, ? super VO, ? extendsfinal VR>JoinWindows joinerwindows, final JoinWindows windows); Joined<K, V, VO> joined); <VO, VR> KStream<K, VR> leftJoinouterJoin(final KStream<K, VO> otherStream, final RichValueJoiner<? super V, ? super VO, ? extends VR>VR, ? super K> joiner, final JoinWindows windows, final Joined<K, V, VO> joined); ); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final RichValueJoiner<? super V, ? super VO, ? extends VR> VR, ? super K> joiner, final JoinWindows windows); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final RichValueJoiner<?Joined<K, super V, ? super VO, ? extends VR> joiner, VO> joined); <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, final RichValueJoiner<? JoinWindowssuper windowsK, ? super V, ? super VT, ? extends final Joined<K, V, VO> joinedVR> joiner); <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner); <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner,, final Joined<K, V, VT> joined); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, final RichValueJoiner<? super VK, ? super VTV, ? extends super VT, ? extends VR> joiner, final Joined<K, V, VT> joined); <VT <GK, GV, VR>RV> KStream<K, VR>RV> leftJoinjoin(final KTable<KGlobalKTable<GK, VT>GV> tableglobalKTable, final RichValueJoiner<RichKeyValueMapper<? super VK, ? super VTV, ? extends VR>GK> joinerkeyValueMapper, final RichValueJoiner<? Serde<K>super keySerdeK, ? super V, ? super GV, ? extends final Serde<V> valSerde); RV> joiner); <GK, GV, RV> KStream<K, RV> joinleftJoin(final GlobalKTable<GK, GV> globalKTable, final KeyValueMapper<? super final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper, final RichValueJoiner<? super K, ? super V, ? super GV, ? extends RV> joinervalueJoiner); <GK |
KTable
Code Block | ||
---|---|---|
| ||
KTable<K, GV, RV> KStream<K, RV> joinV> filter(final GlobalKTable<GK,RichPredicate<? GV>super globalKTableK, ? super V> predicate); KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final RichKeyValueMapper<? super K, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, V> filterNot(final RichPredicate<? super VK, ? extends GK> keyValueMapper, super V> predicate); KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> mapValues(final ValueJoiner<RichValueMapper<? super V, ? superextends GVVR, ? extendssuper RV>K> joinermapper); <GK,<VR> GVKTable<K, RV> KStream<KVR> mapValues(final RichValueMapper<? super V, RV>? join(finalextends GlobalKTable<GKVR, ? super GV>K> globalKTablemapper, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <KR> KStream<KR, V> toStream(final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper, KR> mapper); <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector); <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, final RichValueJoiner<? super V, ? super GV, ?final extendsSerialized<KR, RV>VR> joinerserialized); <GK<VO, GV, RV> KStream<KVR> KTable<K, RV>VR> leftJoinjoin(final GlobalKTable<GKKTable<K, GV>VO> globalKTableother, final RichValueJoiner<? final KeyValueMapper<super V, ? super KVO, ? superextends VVR, ? extendssuper GK> keyValueMapper, K> joiner); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super GVVO, ? extends RV> valueJoiner); <GKVR, GV,? RV>super KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable, K> joiner, final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final ValueJoiner<RichValueJoiner<? super V, ? super GVVO, ? extends RV> valueJoiner VR, ? super K> joiner); <GK<VO, GV, RV> KStream<KVR> KTable<K, RV>VR> leftJoin(final GlobalKTable<GKKTable<K, GV>VO> globalKTableother, final ValueJoiner<? super finalK, RichKeyValueMapper<? super KV, ? super VVO, ? extends GK>VR> keyValueMapperjoiner, final Materialized<K, VR, final RichValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner); |
KTable.java
Code Block | ||
---|---|---|
| ||
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate); KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final String queryableStoreName); KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier); KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate); KTable<K, V> filterNot(final RichPredicate<? super KKeyValueStore<Bytes, byte[]>> materialized); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super V> predicateK> joiner); <VO, final StateStoreSupplier<KeyValueStore> storeSupplier); VR> KTable<K, V>VR> filterNotouterJoin(final RichPredicate<?KTable<K, superVO> Kother, ? super V> predicate, final String queryableStoreName); <VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper); <VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName); <VR> KTable<K, VR> mapValues(final RichValueMapper<? super VRichValueJoiner<? super V, ? super VO, ? extends VR, ? extendssuper VR>K> mapperjoiner, final Serde<VR> valueSerde, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); |
KGroupedStream
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final RichReducer<V, K> reducer); KTable<K, V> reduce(final RichReducer<V, K> reducer, final StateStoreSupplier<KeyValueStore> storeSupplier); void foreach(final RichForeachAction<? superfinal KMaterialized<K, ?V, super V> actionKeyValueStore<Bytes, byte[]>> materialized); <KR><VR> KStream<KRKTable<K, V>VR> toStreamaggregate(final RichKeyValueMapper<?RichInitializer<VR, superK> Kinitializer, ? super V, ? extends KR> mapper); <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector); <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper< final RichAggregator<? super K, ? super V, KeyValue<KRVR> aggregator, VR>> selector, final Serde<KR>Materialized<K, keySerdeVR, KeyValueStore<Bytes, final Serde<VR> valueSerdebyte[]>> materialized); <VO, VR><VR> KTable<K, VR> joinaggregate(final KTable<KRichInitializer<VR, VO>K> otherinitializer, final RichValueJoiner<RichAggregator<? super VK, ? super VOV, VR> aggregator); |
SessionWindowedKStream
There are 3 rich interfaces in aggregate() methods. So converting all possible combinations to their rich counterparts can cause a lot of overloads. So, I propose to overload one method with all rich interfaces.
Code Block | ||
---|---|---|
| ||
<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T, Windowed<K>> initializer? extends VR> joiner); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final RichAggregator<? super K, ? super V, T> aggregator, final Serde<VR> joinSerde, final RichMerger<? super K, T> sessionMerger); <VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, Windowed<K>> initializer, final String queryableStoreName); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final RichValueJoiner<RichAggregator<? super VK, ? super VOV, ? extends VR> joineraggregator, final StateStoreSupplier<KeyValueStore> storeSupplier); <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final RichMerger<? super K, VR> sessionMerger, final RichValueJoiner<? super V, ? superfinal VOMaterialized<K, ?VR, extends VR> joinerSessionStore<Bytes, byte[]>> materialized); <VOKTable<Windowed<K>, V> reduce(final RichReducer<V, VR> KTable<KK> reducer); KTable<Windowed<K>, VR>V> leftJoinreduce(final KTable<KRichReducer<V, VO>K> otherreducer, final RichValueJoiner<?Materialized<K, super V, ? super VO, ? extends VR> joiner, SessionStore<Bytes, byte[]>> materializedAs); , |
TimeWindowedKStream
Code Block | ||
---|---|---|
| ||
<VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, K> initializer, final Serde<VR> joinSerde, RichAggregator<? super K, ? super V, VR> aggregator); <VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, K> initializer, final String queryableStoreName); <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final RichAggregator<? super K, ? super V, VR> aggregator, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized); KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer); KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer, final StateStoreSupplier<KeyValueStore> storeSupplier); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized); |
KGroupedTable
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final RichReducer<V, K> adder, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner); <VO, VR> KTable<K, VR> outerJoin(final KTable<KRichReducer<V, VO>K> othersubtractor, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, V> reduce(final RichReducer<V, K> adder, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final RichReducer<V, K> subtractor); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final Serde<VR> joinSerde, final RichAggregator<? super K, ? super V, VR> adder, final String queryableStoreName); <VO, VR> KTable<K, VR> outerJoin(final KTable<K,RichAggregator<? VO>super otherK, ? super V, VR> subtractor, final RichValueJoiner<? super V, ? super VO, ? extends VR> joinerMaterialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, final? StateStoreSupplier<KeyValueStore> storeSupplier); |
KGroupedStream
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final RichReducer<V> reducer); KTable<K, V> reduce(final RichReducer<V> reducer, super V, VR> adder, final String queryableStoreName); KTable<K, V> reduce(final RichReducer<V> reducer, final RichAggregator<? super K, ? super V, final StateStoreSupplier<KeyValueStore> storeSupplier); <W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final Windows<W> windows, final String queryableStoreName); <W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final Windows<W> windows); <W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final Windows<W> windows, final StateStoreSupplier<WindowStore> storeSupplier); KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final SessionWindows sessionWindows, final String queryableStoreName); KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final SessionWindows sessionWindows); KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final SessionWindows sessionWindows, final StateStoreSupplier<SessionStore> storeSupplier); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, final String queryableStoreName); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, final String queryableStoreName); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final StateStoreSupplier<KeyValueStore> storeSupplier); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final StateStoreSupplier<KeyValueStore> storeSupplier); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final Serde<VR> aggValueSerde, final String queryableStoreName); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final StateStoreSupplier<WindowStore> storeSupplier); <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final RichAggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final String queryableStoreName); <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final RichAggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde); <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final RichAggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final StateStoreSupplier<SessionStore> storeSupplier); |
KGroupedTable
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final RichReducer<V> adder,
final Reducer<V> subtractor,
final String queryableStoreName);
KTable<K, V> reduce(final Reducer<V> adder,
final RichReducer<V> subtractor,
final String queryableStoreName);
KTable<K, V> reduce(final RichReducer<V> adder,
final RichReducer<V> subtractor,
final String queryableStoreName);
KTable<K, V> reduce(final RichReducer<V> adder,
final Reducer<V> subtractor);
KTable<K, V> reduce(final Reducer<V> adder,
final RichReducer<V> subtractor);
KTable<K, V> reduce(final RichReducer<V> adder,
final RichReducer<V> subtractor);
KTable<K, V> reduce(final RichReducer<V> adder,
final Reducer<V> subtractor,
final StateStoreSupplier<KeyValueStore> storeSupplier);
KTable<K, V> reduce(final Reducer<V> adder,
final RichReducer<V> subtractor,
final StateStoreSupplier<KeyValueStore> storeSupplier);
KTable<K, V> reduce(final RichReducer<V> adder,
final RichReducer<V> subtractor,
final StateStoreSupplier<KeyValueStore> storeSupplier);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> adder,
final RichAggregator<? super K, ? super V, VR> subtractor,
final String queryableStoreName);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> adder,
final RichAggregator<? super K, ? super V, VR> subtractor);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> adder,
final RichAggregator<? super K, ? super V, VR> subtractor,
final Serde<VR> aggValueSerde,
final String queryableStoreName);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> adder,
final RichAggregator<? super K, ? super V, VR> subtractor,
final Serde<VR> aggValueSerde);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> adder,
final RichAggregator<? super K, ? super V, VR> subtractor,
final StateStoreSupplier<KeyValueStore> storeSupplier);
|
Proposed changes
Make record context open to public
StreamTask.updateProcessorContext()
) :Code Block | ||
---|---|---|
| ||
private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) {
processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
processorContext.setCurrentNode(currNode);
}
|
Thus, the record context is not available in ProcessorContext. We make the following changes to make it "public"
Code Block | ||
---|---|---|
| ||
public interface ProcessorContext {
...
RecordContext recordContext();
}
public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
...
@Override
public RecordContext recordContext() {
return this.recordContext();
}
}
|
Add commit() to RecordContext
We already have RecordContext
class with desired set of methods. However, it does not have commit()
method. In this KIP we add commit()
method to RecordContext
interface.
...
language | java |
---|
VR> subtractor);
|
Proposed changes
Move
RecordContext
from.
processor.internals
to.processor
Make record context open to public
StreamTask.updateProcessorContext()
) :Code Block | ||
---|---|---|
| ||
// the below code snippet already exists, this is just for background.
private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) {
processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
processorContext.setCurrentNode(currNode);
} |
...
Sample processor should look like this:
...
Code Block | ||
---|---|---|
| ||
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> { ... private RecordContext recordContext; // this line is added in this KIP ... @Override public void process(final K1 key, final V1 value) { recordContext = new RecordContext() { @Override public void commit() { // recordContext initialization context().commit(); }is added in this KIP @Override public long offset() { return context().recordContext().offset(); } @Override public long timestamp() { return context().recordContext().timestamp(); } @Override public String topic() { return context().recordContext().topic(); } @Override public int partition() { return context().recordContext().partition(); } }; if (key != null && value != null) { final V2 value2 = valueGetter.get(keyMapper.apply(key, value)); if (leftJoin || value2 != null) { context().forward(key, joiner.apply(value, value2, recordContext)); } } } } |
...