...
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final Reducer<V> adder, final RichReducer<K, V> subtractor, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, V> reduce(final RichReducer<K, V> adder, final Reducer<V> subtractor, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, V> reduce(final RichReducer<K, V> adder, final RichReducer<K, V> subtractor, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, V> reduce(final Reducer<V> adder, final RichReducer<K, V> subtractor); KTable<K, V> reduce(final RichReducer<K, V> adder, final Reducer<V> subtractor); KTable<K, V> reduce(final RichReducer<K, V> adder, final RichReducer<K, V> subtractor); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> adder, final RichAggregator<? super K, ? super V, VR> subtractor, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, final Aggregator<? super K, ? super V, VR> subtractor, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, final RichAggregator<? super K, ? super V, VR> subtractor, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> adder, final RichAggregator<? super K, ? super V, VR> subtractor); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, final Aggregator<? super K, ? super V, VR> subtractor); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, final RichAggregator<? super K, ? super V, VR> subtractor); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> adder, final RichAggregator<? super K, ? super V, VR> subtractor, final Serde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, final Aggregator<? super K, ? super V, VR> subtractor, final Serde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, final RichAggregator<? super K, ? super V, VR> subtractor, final Serde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> adder, final RichAggregator<? super K, ? super V, VR> subtractor, final StateStoreSupplier<KeyValueStore> storeSupplier); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, final Aggregator<? super K, ? super V, VR> subtractor, final StateStoreSupplier<KeyValueStore> storeSupplier); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, final RichAggregator<? super K, ? super V, VR> subtractor, final StateStoreSupplier<KeyValueStore> storeSupplier); |
Proposed changes
For better understanding, I mark the changed/updated/added code snippets with blue color.
Make record context open to public
...
Code Block | ||
---|---|---|
| ||
private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) {
processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
processorContext.setCurrentNode(currNode);
}
|
Thus, the record context is not available available in ProcessorContext. As a result, we make the following changes to make it "public"
...
Code Block | ||
---|---|---|
| ||
public interface ProcessorContext {
...
...
RecordContext recordContext();
}
public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
...
@Override
public RecordContext recordContext() {
return this.recordContext();
}
}
|
...
Add commit() to RecordContext
We already have RecordContext
class with Currently RecordContext
interface have most of the desired set of methods for this KIP. However, it does not have commit()
method. In this KIP we add commit()
method to RecordContext
interface.
...