Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
KTable<K, V> reduce(final RichReducer<V> reducer);


KTable<K, V> reduce(final RichReducer<V> reducer,
                    final String queryableStoreName);


KTable<K, V> reduce(final RichReducer<V> reducer,
                    final StateStoreSupplier<KeyValueStore> storeSupplier);


<W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                                                 final Windows<W> windows,
                                                 final String queryableStoreName);


<W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                                                 final Windows<W> windows);


<W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                                                 final Windows<W> windows,
                                                 final StateStoreSupplier<WindowStore> storeSupplier);


KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                              final SessionWindows sessionWindows,
                              final String queryableStoreName);


 
KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                              final SessionWindows sessionWindows);
 
KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                              final SessionWindows sessionWindows,
                              final StateStoreSupplier<SessionStore> storeSupplier);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);
 
<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);



<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde);



<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);


<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer,
                                                         final Aggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final Serde<VR> aggValueSerde,
                                                         final String queryableStoreName);

<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                         final RichAggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final Serde<VR> aggValueSerde,
                                                         final String queryableStoreName);


<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer,
                                                         final RichAggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final Serde<VR> aggValueSerde,
                                                         final String queryableStoreName);
 
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer,
                                                         final Aggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final StateStoreSupplier<WindowStore> storeSupplier);

<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                         final RichAggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final StateStoreSupplier<WindowStore> storeSupplier);
 
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer,
                                                         final RichAggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final StateStoreSupplier<WindowStore> storeSupplier);



<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final Aggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final String queryableStoreName);

<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final String queryableStoreName);

<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final String queryableStoreName);


<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final Aggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde);

<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde);
 
<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde);


<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final StateStoreSupplier<SessionStore> storeSupplier);

<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final Aggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final StateStoreSupplier<SessionStore> storeSupplier);

<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final StateStoreSupplier<SessionStore> storeSupplier);


 

 

  • KGroupedTable

 

 

...

We create a subset of features from ProcessorContext and put into RecordContext interface

Code Block
languagejava
public interface RecordContext {KTable<K, V> reduce(final RichReducer<V> adder,
    String applicationId();
       TaskId taskId();
    StreamsMetrics metrics();
   final String topic();Reducer<V> subtractor,
    int partition();
	void commit();
    long offset();
    long timestamp();
    Map<String,final Object>String appConfigs(queryableStoreName);


KTable<K, V> reduce(final Reducer<V> Map<Stringadder,
 Object> appConfigsWithPrefix(String prefix);
}
 
 
public interface ProcessorContext extends RecordContext {
   // all methods but the ones in RecordContext
}

 

Once we need a conversion from ProcessorContext and RecordContext, we just cast: 

Code Block
languagejava
private class KStreamMapProcessorfinal extendsRichReducer<V> AbstractProcessor<Ksubtractor,
 V> {
    @Override
    public void init(ProcessorContext processorContext) {
      final String super.init(processorContextqueryableStoreName);


KTable<K, V> reduce(final RichReducer<V> adder,
       richMapper.init((RecordContext) processorContext);  				// HERE WE MAKE A CAST
    }

 final RichReducer<V>  @Overridesubtractor,
    public void process(final K key, final V value) {
        V1final newValue = mapper.apply(key, value);
        context().forward(key, newValue);String queryableStoreName);


 
KTable<K, V> reduce(final RichReducer<V> adder,
    }

    @Override
    public void close() {
     final Reducer<V>  super.close(subtractor);

    
KTable<K, V> reduce(final Reducer<V> adder,
    mapper.close();
    }
}

  • Rich Interfaces

 

 

Code Block
languagejava
public interface RichValueMapper<V, VR> {
    VR apply(final V value, final RecordContextRichReducer<V> recordContextsubtractor);
}

public interface RichValueJoiner<V1, V2, VR> {
KTable<K, V> reduce(final RichReducer<V> adder,
             VR apply(final V1 value1, final V2 value2, final RecordContextRichReducer<V> recordContextsubtractor);
}

public interface RichKeyValueMapper<KKTable<K, V, VR> {V> reduce(final RichReducer<V> adder,
    VR apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichReducer<V> {
    V apply(final VReducer<V> value1subtractor, final V value2, final RecordContext recordContext);
}


public interface RichInitializer<VA> {

                   VA apply(final RecordContextStateStoreSupplier<KeyValueStore> recordContextstoreSupplier);
}


public interface Aggregator<K, V, VA> {
KTable<K, V> reduce(final Reducer<V> adder,
      VA apply(final K key, final V value, final VA aggregate,     final RecordContext recordContext);
}
 
public interface RichForeachAction<K, V> {
RichReducer<V> subtractor,
             void apply(final K key, final V value, final RecordContextStateStoreSupplier<KeyValueStore> recordContextstoreSupplier);
}


public interface RichPredicate<K, V> {
KTable<K, V> reduce(final RichReducer<V> adder,
                   boolean test(final KRichReducer<V> key, final V value,subtractor,
                    final RecordContextStateStoreSupplier<KeyValueStore> recordContextstoreSupplier);
}

 

The same semantics apply to other interfaces as well. 

 

So we don't need to add any overloaded methods for public APIs. Internally we perform 2 changes:

  1. Change the constructor type of all related Processors to accept rich interfaces
  2. Create converters from non-rich to rich interfaces

 

Code Block
languagejava
class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> {

    private final RichValueMapper<K, V, V1> mapper;

    public KStreamMapValues(RichValueMapper<K, V, V1> mapper) {
 
<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final String queryableStoreName);


<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor);


<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);


<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                      this.mapper = mapper;
    }

 final RichAggregator<? super @Override
K, ? super V, publicVR> Processor<Ksubtractor,
  V> get() {
        return new KStreamMapProcessor();
    }

    private class KStreamMapProcessor extends AbstractProcessor<K, V> {
 final Serde<VR> aggValueSerde);


<VR> KTable<K, VR> aggregate(final RichInitializer<VR> @Overrideinitializer,
        public void init(ProcessorContext processorContext) {
            super.init(processorContext);
     final RichAggregator<? super K, ? super  mapper.init((RecordContext) processorContext);
V, VR> adder,
           }

        @Override
        public void process(final RichAggregator<? super K, key? super V, finalVR> Vsubtractor,
 value) {
            V1 newValue = mapper.apply(key, value);
            context().forward(key, newValue);
   final StateStoreSupplier<KeyValueStore> storeSupplier);


 

 

 

 

 

 

 

Proposed changes

 

 

  • Limiting the ProcessorContext - RecordContext interface

We create a subset of features from ProcessorContext and put into RecordContext interface

Code Block
languagejava
public interface RecordContext {
    String }

applicationId();
    TaskId taskId();
     @Override
StreamsMetrics metrics();
    String topic();
    public int partition();
	void closecommit() {;
    long offset();
    long   super.closetimestamp();
    Map<String, Object> appConfigs();
    Map<String, Object> mapper.close(appConfigsWithPrefix(String prefix);
}
 
 
public interface ProcessorContext extends RecordContext {
   // all methods but the ones in RecordContext
}

 

Once we need a conversion from ProcessorContext and RecordContext, we just cast: 

Code Block
languagejava
private class KStreamMapProcessor extends AbstractProcessor<K, V> {
    @Override
    public void init(ProcessorContext processorContext}
    }
}
 
 
 
 
 
static <K, T1, T2, R> RichValueJoiner<K, T1, T2, R> convertToRichValueJoiner(final ValueJoinerWithKey<K, T1, T2, R> valueJoinerWithKey) {
    Objects.requireNonNull(valueJoinerWithKey, "valueJoiner can't be null" super.init(processorContext);
    if  (valueJoinerWithKey instanceof RichValueJoiner) {
        return (RichValueJoiner<K, T1, T2, R>) valueJoinerWithKey; richMapper.init((RecordContext) processorContext);  				// HERE WE MAKE A CAST
    }

 else {
  @Override
    public void returnprocess(final newK RichValueJoiner<Kkey, T1,final T2,V R>(value) {
        V1 newValue =  @Overridemapper.apply(key, value);
        context().forward(key, newValue);
   public void init() {}

    @Override
    public void close() {
 @Override
       super.close();
      public void mapper.close() {;
    }
}

            @Override
  • Rich Interfaces

 

 

Code Block
languagejava
public interface RichValueMapper<V, VR> {
    VR apply(final V value, final    RecordContext recordContext);
}

public R apply(K keyinterface RichValueJoiner<V1, T1 value1V2, T2 value2)VR> {
    VR apply(final V1 value1, final V2 value2, final     return valueJoinerWithKey.apply(key, value1, value2);
     RecordContext recordContext);
}

public interface RichKeyValueMapper<K, V, VR> {
    VR apply(final  }
        };
    }K key, final V value, final RecordContext recordContext);
}

staticpublic <K,interface T1, T2, R> ValueJoinerWithKey<K, T1, T2, R> convertToValueJoinerWithKey(final ValueJoiner<T1, T2, R> valueJoiner)RichReducer<V> {
    V apply(final V value1, final V value2, final RecordContext recordContext);
}


public interface RichInitializer<VA> {
    Objects.requireNonNull(valueJoiner, "valueJoiner can't be null");
    return new ValueJoinerWithKey<K, T1, T2, R>()VA apply(final RecordContext recordContext);
}


public interface RichAggregator<K, V, VA> {
    VA apply(final K key, @Override
final V value, final VA aggregate, final RecordContext recordContext);
}
 
public R apply(K keyinterface RichForeachAction<K, T1V> value1,{
 T2 value2) {
 void apply(final K key, final V value, final RecordContext recordContext);
}


public interface return valueJoiner.apply(value1, value2);RichPredicate<K, V> {
    boolean test(final K key, final }
V value, final RecordContext }recordContext);
}




 

Rejected Alternatives

Not yet.