Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
 
KTable<K, V> reduce(final Reducer<V> adder,
                    final RichReducer<K, V> subtractor,
                    final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
KTable<K, V> reduce(final RichReducer<K, V> adder,
                    final Reducer<V> subtractor,
                    final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
KTable<K, V> reduce(final RichReducer<K, V> adder,
                    final RichReducer<K, V> subtractor,
                    final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);



KTable<K, V> reduce(final Reducer<V> adder,
                    final RichReducer<K, V> subtractor);
KTable<K, V> reduce(final RichReducer<K, V> adder,
                    final Reducer<V> subtractor);
KTable<K, V> reduce(final RichReducer<K, V> adder,
                    final RichReducer<K, V> subtractor);



<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final Aggregator<? super K, ? super V, VR> subtractor,
                             final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);




<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final Aggregator<? super K, ? super V, VR> subtractor);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor);



<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final Serde<VR> aggValueSerde);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final Aggregator<? super K, ? super V, VR> subtractor,
                             final Serde<VR> aggValueSerde);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final Serde<VR> aggValueSerde);



<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final Aggregator<? super K, ? super V, VR> subtractor,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);

 

 

 

 

 

 

Proposed changes

 

For better understanding, I mark the changed/updated/added code snippets with blue color. 

 

 

  • Make record context open to public

...

Code Block
languagejava
 private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) {
    processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
    processorContext.setCurrentNode(currNode);
}
 

 

Thus, the record context is not available  available in ProcessorContext. As a result, we make the following changes to make it "public"

...

Code Block
languagejava
 
public interface ProcessorContext {
  ...
  ...
  RecordContext recordContext();
}
 
 
public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
  ...
  @Override
  public RecordContext recordContext() {
    return this.recordContext();
  }
}
 

...

  • Add commit() to RecordContext

We already have RecordContext class with Currently RecordContext interface have most of the desired set of methods for this KIP. However, it does not have commit() method. In this KIP we add commit() method to  RecordContext interface.

...