...
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
This KIP combines KIP-149 and provides an atomic a hybrid solution to rich functions in Streams and accessing read-only keys within ValueJoiner, ValueTransformer, ValueMapper interfaces.
Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods. For example:
...
- having access to RecordContext within an operator
- having access to a read-only key for
XXXWithKey
interfacesValueJoiner, ValueTransformer, ValueMapper interfaces
Rich Interfaces
Code Block | ||
---|---|---|
| ||
public interface RichValueMapper<KRichInitializer<V, V, VR>K> { VRV apply(final K key, final V ); } public interface RichValueMapper<V, VR, K> { VR apply(final V value, final K key, final RecordContext recordContext); } public interface RichValueJoiner<KRichValueJoiner<V1, V1V2, V2VR, VR>K> { VR apply(final KV1 keyvalue1, final V1V2 value1value2, final V2K value2key, final RecordContext recordContext); } public interface RichKeyValueMapper<K, V, VR> { VR apply(final K key, final V value, final RecordContext recordContext); } public interface RichReducer<KRichReducer<V, V>K> { V apply(final KV keyvalue1, final V value1value2, final VK value2key, final RecordContext recordContext); } public interface RichAggregator<K, V, VA> { VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext); } public interface RichForeachAction<K, V> { void apply(final K key, final V value, final RecordContext recordContext); } public interface RichPredicate<K, V> { boolean test(final K key, final V value, final RecordContext recordContext); } public interface RichMerger<K, V> { V apply(final K aggKey, final V aggOne, final V aggTwo, final RecordContext recordContext); } public interface RichValueTransformer<KRichValueTransformer<V, VVR, VR>K> { void init(final ProcessorContext context); VR transform(final V value); @Deprecated VR punctuate(, final K key, final long timestamp); void close(); } public interface RichValueTransformerSupplier<KRichValueTransformerSupplier<V, VVR, VR>K> { RichValueTransformer<KRichValueTransformer<V, VVR, VR>K> get(); } |
Public Interfaces
...
Code Block | ||
---|---|---|
| ||
KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate); KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate); <KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper); <KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper); <VR> KStream<K, VR> mapValues(RichValueMapper<? super KV, ? superextends VVR, ? extendssuper VR>K> mapper); <KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper); <VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super KV, ? superextends V, Iterable<? extends Iterable<VR>, ? extendssuper VR>>K> mapper); void foreach(final RichForeachAction<? super K, ? super V> action); KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action); KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates); <VR> KStream<K, VR> transformValues(final RichValueTransformerSupplier<? super KV, ? superextends VVR, ? extendssuper VR>K> valueTransformerSupplier, final String... stateStoreNames); <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector); <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector, final Serialized<KR, V> serialized); <KR> KGroupedStream<KR <VO, VR> KStream<K, V>VR> groupByjoin(final RichKeyValueMapper<? super KKStream<K, ? super V, KR> selectorVO> otherStream, final RichValueJoiner<? super V, ? super final Serde<KR> keySerde, VO, ? extends VR, ? super K> joiner, final Serde<V>JoinWindows valSerdewindows); <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final RichValueJoiner<? super KV, ? super VVO, ? superextends VOVR, ? extendssuper VR>K> joiner, final JoinWindows windows, final Joined<K, V, VO> joined); <VO, VR> KStream<K, VR> joinleftJoin(final KStream<K, VO> otherStream, final RichValueJoiner<? super KV, ? super VVO, ? superextends VOVR, ? extendssuper VR>K> joiner, final JoinWindows windows, final Joined<K, V, VO> joined); final JoinWindows windows); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final RichValueJoiner<? super KV, ? super VVO, ? superextends VOVR, ? extendssuper VR>K> joiner, final JoinWindows windows);, <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final Joined<K, V, VO> joined); <VO, VR> KStream<K, VR> outerJoin(final RichValueJoiner<?KStream<K, superVO> KotherStream, ? super V, ? super VO, ? extends VR> joiner, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? finalsuper JoinWindowsK> windowsjoiner, final Joined<K, V, VO> joinedJoinWindows windows); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final RichValueJoiner<? super KV, ? super VVO, ? superextends VOVR, ? extendssuper VR>K> joiner, final JoinWindows windows); <VO, final Joined<K, V, VO> joined); <VT, VR> KStream<K, VR> outerJoinjoin(final KStream<KKTable<K, VO>VT> otherStreamtable, final RichValueJoiner<? super K, ? super V, ? super VOVT, ? extends VR> joiner, ); <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, final JoinWindows windows, RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner, final Joined<K, V, VO>VT> joined); <VT, VR> KStream<K, VR> joinleftJoin(final KTable<K, VT> table, final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner); <VT, VR> KStream<K, VR> joinleftJoin(final KTable<K, VT> table, final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner, final Joined<K, V, VT> joined); <VT<GK, GV, VR>RV> KStream<K, VR>RV> leftJoinjoin(final KTable<KGlobalKTable<GK, VT>GV> tableglobalKTable, final RichValueJoiner<RichKeyValueMapper<? super K, ? super V, ? superextends GK> VTkeyValueMapper, ? extends VR> joiner); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, final RichValueJoiner<? super K, ? super V, ? super VTGV, ? extends VR>RV> joiner); <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable, final RichKeyValueMapper<? super Joined<KK, ? super V, VT> joined); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, ? extends GK> keyValueMapper, final RichValueJoiner<? super K, ? super V, ? super VTGV, ? extends VR> joiner, RV> valueJoiner); |
KTable
Code Block | ||
---|---|---|
| ||
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate); KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final Serde<K> keySerde, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, V> filterNot(final RichPredicate<? super K, ? super final Serde<V> valSerde); <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTableV> predicate); KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> mapValues(final KeyValueMapper<RichValueMapper<? super KV, ? superextends VVR, ? extendssuper GK> keyValueMapper, K> mapper); <VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR, ? super K> mapper, final RichValueJoiner<? super K, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <KR> KStream<KR, V> toStream(final RichKeyValueMapper<? super VK, ? super GVV, ? extends RV>KR> joinermapper); <GK <KR, GV, RV> KStream<KVR> KGroupedTable<KR, RV>VR> joingroupBy(final GlobalKTable<GK,RichKeyValueMapper<? GV>super globalKTableK, ? super V, KeyValue<KR, VR>> selector); <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, ?KeyValue<KR, extendsVR>> GK> keyValueMapperselector, final ValueJoiner<? super V, ? superfinal GVSerialized<KR, ? extends RV> joinerVR> serialized); <GK<VO, GVVR> KTable<K, RV> KStream<K, RV>VR> join(final GlobalKTable<GKKTable<K, GV>VO> globalKTableother, final RichValueJoiner<? super V, final RichKeyValueMapper<? super KVO, ? superextends VVR, ? extends GK> keyValueMapper, super K> joiner); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final RichValueJoiner<? super KV, ? super VVO, ? superextends GVVR, ? extendssuper RV>K> joiner); <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable, , final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VO, VR> KTable<K, final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapperVR> leftJoin(final KTable<K, VO> other, final RichValueJoiner<? super KV, ? super VVO, ? superextends GVVR, ? extendssuper RV>K> valueJoinerjoiner); <GK<VO, GV, RV> KStream<KVR> KTable<K, RV>VR> leftJoin(final GlobalKTable<GKKTable<K, GV>VO> globalKTableother, final RichKeyValueMapper<ValueJoiner<? super K, ? super V, ? extendssuper GK> keyValueMapperVO, ? extends VR> joiner, final ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner); <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final RichKeyValueMapper<RichValueJoiner<? super KV, ? super VVO, ? extends GK> keyValueMapperVR, ? super K> joiner); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final RichValueJoiner<? super K, final RichValueJoiner<? super V, ? super GVVO, ? extends RV> valueJoiner); |
KTable
Code Block | ||
---|---|---|
| ||
KTable<K, V> filter(final RichPredicate<VR, ? super KK> joiner, ? super V> predicate); KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final Materialized<K, VVR, KeyValueStore<Bytes, byte[]>> materialized); |
KGroupedStream
Code Block | ||
---|---|---|
| ||
KTable<K, V> filterNotreduce(final RichPredicate<? super KRichReducer<V, ? super V> predicateK> reducer); KTable<K, V> filterNotreduce(final RichPredicate<? super KRichReducer<V, ? super V> predicateK> reducer, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> mapValuesaggregate(final RichValueMapper<?RichInitializer<VR, superK> Kinitializer, ? super V, ? extends VR> mapper); <VR> KTable<K, VR> mapValues(final RichValueMapper< final RichAggregator<? super K, ? super V, ? extends VR> mapperaggregator, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <KR><VR> KStream<KRKTable<K, V>VR> toStreamaggregate(final RichKeyValueMapper<?RichInitializer<VR, superK> Kinitializer, ? super V, ? extends KR> mapper); <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector); <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper< final RichAggregator<? super K, ? super V, KeyValue<KR, VR>> selector, final Serialized<KR, VR> serializedaggregator); |
SessionWindowedKStream
There are 3 rich interfaces in aggregate() methods. So converting all possible combinations to their rich counterparts can cause a lot of overloads. So, I propose to overload one method with all rich interfaces.
Code Block | ||
---|---|---|
| ||
<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T, Windowed<K>> initializer <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final RichValueJoiner<RichAggregator<? super K, ? super V, ?T> super VOaggregator, ? extends VR> joiner); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final RichValueJoiner<RichMerger<? super K, ? super V, ? super VO, ? extends VR> joiner, T> sessionMerger); <VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, Windowed<K>> initializer, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other final RichAggregator<? super K, ? super V, VR> aggregator, final RichValueJoiner<? super K, ? super V,final RichMerger<? super VOK, ? extends VR> joiner); <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, sessionMerger, final ValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner, Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized); KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer); KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer, final Materialized<K, VRV, KeyValueStore<BytesSessionStore<Bytes, byte[]>> materializedmaterializedAs); , |
TimeWindowedKStream
Code Block | ||
---|---|---|
| ||
<VR> KTable<Windowed<K><VO, VR> KTable<K, VR> outerJoinaggregate(final KTable<KRichInitializer<VR, VO>K> otherinitializer, final RichValueJoiner<RichAggregator<? super K, ? super V, ? super VO, ? extends VR> joineraggregator); <VO, VR> KTable<K<VR> KTable<Windowed<K>, VR> outerJoinaggregate(final KTable<KRichInitializer<VR, VO>K> otherinitializer, final RichValueJoiner<RichAggregator<? super K, ? super V, ? super VO, ? extends VR> joiner, VR> aggregator, final Materialized<K, VR, KeyValueStore<BytesWindowStore<Bytes, byte[]>> materialized); |
KGroupedStream
Code Block | ||
---|---|---|
| ||
KTable<KKTable<Windowed<K>, V> reduce(final RichReducer<KRichReducer<V, V>K> reducer); KTable<KKTable<Windowed<K>, V> reduce(final RichReducer<KRichReducer<V, V>K> reducer, final Materialized<K, V, KeyValueStore<BytesWindowStore<Bytes, byte[]>> materialized); |
KGroupedTable
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final RichReducer<V, K> adder, <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super KRichReducer<V, ? super V, VR> aggregator, K> subtractor, final Materialized<K, VRV, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR>V> aggregatereduce(final RichInitializer<KRichReducer<V, VR>K> initializeradder, final RichReducer<V, K> subtractor); <VR> KTable<K, VR> aggregate(final Aggregator<? super K, ? super V, VR> aggregator,RichInitializer<VR> initializer, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<KRichAggregator<? super K, ? super V, VR> aggregate(final RichInitializer<K, VR> initializeradder, final RichAggregator<? super K, ? super V, VR> aggregatorsubtractor, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> aggregate(final RichInitializer<K,RichInitializer<VR> VR> initializer, final Aggregator<RichAggregator<? super K, ? super V, VR> aggregator); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializeradder, final RichAggregator<? super K, ? super V, VR> aggregatorsubtractor); <VR> KTable<K, VR> aggregate(final RichInitializer<K, VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator); |
SessionWindowedKStream
There are 3 rich interfaces in aggregate() methods. So converting all possible combinations to their rich counterparts can cause a lot of overloads. So, I propose to overload one method with all rich interfaces.
Code Block | ||
---|---|---|
| ||
<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
final RichAggregator<? super K, ? super V, T> aggregator,
final RichMerger<? super K, T> sessionMerger);
<VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> aggregator,
final RichMerger<? super K, VR> sessionMerger,
final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized);
KTable<Windowed<K>, V> reduce(final RichReducer<K, V> reducer);
KTable<Windowed<K>, V> reduce(final RichReducer<K, V> reducer,
final Materialized<K, V, SessionStore<Bytes, byte[]>> materializedAs);
|
TimeWindowedKStream
Code Block | ||
---|---|---|
| ||
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> aggregator);
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> aggregator,
final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);
KTable<Windowed<K>, V> reduce(final RichReducer<K, V> reducer);
KTable<Windowed<K>, V> reduce(final RichReducer<K, V> reducer,
final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
|
KGroupedTable
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final Reducer<V> adder,
final RichReducer<K, V> subtractor,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
KTable<K, V> reduce(final RichReducer<K, V> adder,
final Reducer<V> subtractor,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
KTable<K, V> reduce(final RichReducer<K, V> adder,
final RichReducer<K, V> subtractor,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
KTable<K, V> reduce(final Reducer<V> adder,
final RichReducer<K, V> subtractor);
KTable<K, V> reduce(final RichReducer<K, V> adder,
final Reducer<V> subtractor);
KTable<K, V> reduce(final RichReducer<K, V> adder,
final RichReducer<K, V> subtractor);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final RichAggregator<? super K, ? super V, VR> subtractor,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> adder,
final RichAggregator<? super K, ? super V, VR> subtractor,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final RichAggregator<? super K, ? super V, VR> subtractor);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> adder,
final RichAggregator<? super K, ? super V, VR> subtractor);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final RichAggregator<? super K, ? super V, VR> subtractor,
final Serde<VR> aggValueSerde);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
final Serde<VR> aggValueSerde);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> adder,
final RichAggregator<? super K, ? super V, VR> subtractor,
final Serde<VR> aggValueSerde);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final RichAggregator<? super K, ? super V, VR> subtractor,
final StateStoreSupplier<KeyValueStore> storeSupplier);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
final StateStoreSupplier<KeyValueStore> storeSupplier);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> adder,
final RichAggregator<? super K, ? super V, VR> subtractor,
final StateStoreSupplier<KeyValueStore> storeSupplier);
|
Proposed changes
Make record context open to public
StreamTask.updateProcessorContext()
) :Code Block | ||
---|---|---|
| ||
private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) {
processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
processorContext.setCurrentNode(currNode);
}
|
Thus, the record context is not available in ProcessorContext. As a result, we make the following changes to make it "public"
Code Block | ||
---|---|---|
| ||
public interface ProcessorContext {
...
RecordContext recordContext();
}
public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
...
@Override
public RecordContext recordContext() {
return this.recordContext();
}
}
|
Add commit() to RecordContext
We already have RecordContext
class with desired set of methods. However, it does not have commit()
method. In this KIP we add commit()
method to RecordContext
interface.
...
language | java |
---|
Proposed changes
Move
RecordContext
from.
processor.internals
to.processor
Make record context open to public
StreamTask.updateProcessorContext()
) :Code Block | ||
---|---|---|
| ||
// the below code snippet already exists, this is just for background.
private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) {
processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
processorContext.setCurrentNode(currNode);
} |
...
Sample processor should look like this:
...
Code Block | ||
---|---|---|
| ||
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> { ... private RecordContext recordContext; // this line is added in this KIP ... @Override public void process(final K1 key, final V1 value) { recordContext = new RecordContext() { @Override public void commit() { // recordContext initialization context().commit(); }is added in this KIP @Override public long offset() { return context().recordContext().offset(); } @Override public long timestamp() { return context().recordContext().timestamp(); } @Override public String topic() { return context().recordContext().topic(); } @Override public int partition() { return context().recordContext().partition(); } }; if (key != null && value != null) { final V2 value2 = valueGetter.get(keyMapper.apply(key, value)); if (leftJoin || value2 != null) { context().forward(key, joiner.apply(value, value2, recordContext)); } } } } |
...