Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4125
 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3907

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods especially when init(Some params), close() methods are needed.

Another motivation of this KIP is to introduce on-demand commit() feature. 

Public Interfaces

The following methods will be added to related classes.

 

  • KStream.java

 

Code Block
languagejava
KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate);


KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate);


<KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);


<KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);


<VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR> mapper);


<KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);


<VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>> processor);


void foreach(final RichForeachAction<? super K, ? super V> action);


KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action);


KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates);


<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector);


<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector,
                                   final Serde<KR> keySerde,
                                   final Serde<V> valSerde);


<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows);


<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows,
                             final Serde<K> keySerde,
                             final Serde<V> thisValueSerde,
                             final Serde<VO> otherValueSerde);


<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows);


<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows,
                                 final Serde<K> keySerde,
                                 final Serde<V> thisValSerde,
                                 final Serde<VO> otherValueSerde);


<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows);


<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows,
                                  final Serde<K> keySerde,
                                  final Serde<V> thisValueSerde,
                                  final Serde<VO> otherValueSerde);


<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner);


<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                             final Serde<K> keySerde,
                             final Serde<V> valSerde);


<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner);


<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                                 final Serde<K> keySerde,
                                 final Serde<V> valSerde);


<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
                                 final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                 final RichValueJoiner<? super V, ? super GV, ? extends RV> joiner);


<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
                                     final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                     final RichValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner);

 

 

  • KTable.java

 

 

...

 

Code Block
languagejava
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate);


KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final String queryableStoreName);


KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate);


KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final String queryableStoreName);


<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper);


<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName);


<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper,
                             final Serde<VR> valueSerde,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);


void foreach(final RichForeachAction<? super K, ? super V> action);


<KR> KStream<KR, V> toStream(final RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);


<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector);


<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
                                       final Serde<KR> keySerde,
                                       final Serde<VR> valueSerde);


<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner);


<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                            final Serde<VR> joinSerde,
                            final String queryableStoreName);


<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                            final StateStoreSupplier<KeyValueStore> storeSupplier);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                final Serde<VR> joinSerde,
                                final String queryableStoreName);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                final StateStoreSupplier<KeyValueStore> storeSupplier);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final Serde<VR> joinSerde,
                                 final String queryableStoreName);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final StateStoreSupplier<KeyValueStore> storeSupplier);

 

 

...

  

  • KGroupedStream

 

Code Block
languagejava
KTable<K, V> reduce(final RichReducer<V> reducer);


KTable<K, V> reduce(final RichReducer<V> reducer,
                    final String queryableStoreName);


KTable<K, V> reduce(final RichReducer<V> reducer,
                    final StateStoreSupplier<KeyValueStore> storeSupplier);


<W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                                                 final Windows<W> windows,
                                                 final String queryableStoreName);


<W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                                                 final Windows<W> windows);


<W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                                                 final Windows<W> windows,
                                                 final StateStoreSupplier<WindowStore> storeSupplier);


KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                              final SessionWindows sessionWindows,
                              final String queryableStoreName);


 
KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                              final SessionWindows sessionWindows);
 
KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                              final SessionWindows sessionWindows,
                              final StateStoreSupplier<SessionStore> storeSupplier);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);
 
<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);



<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde);



<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);


<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer,
                                                         final Aggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final Serde<VR> aggValueSerde,
                                                         final String queryableStoreName);

<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                         final RichAggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final Serde<VR> aggValueSerde,
                                                         final String queryableStoreName);


<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer,
                                                         final RichAggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final Serde<VR> aggValueSerde,
                                                         final String queryableStoreName);
 
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer,
                                                         final Aggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final StateStoreSupplier<WindowStore> storeSupplier);

<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                         final RichAggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final StateStoreSupplier<WindowStore> storeSupplier);
 
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer,
                                                         final RichAggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final StateStoreSupplier<WindowStore> storeSupplier);



<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final Aggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final String queryableStoreName);

<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final String queryableStoreName);

<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final String queryableStoreName);


<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final Aggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde);

<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde);
 
<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde);


<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final StateStoreSupplier<SessionStore> storeSupplier);

<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final Aggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final StateStoreSupplier<SessionStore> storeSupplier);

<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final StateStoreSupplier<SessionStore> storeSupplier);


...

Code Block
languagejava
KTable<K, V> reduce(final RichReducer<V> adder,
                    final Reducer<V> subtractor,
                    final String queryableStoreName);


KTable<K, V> reduce(final Reducer<V> adder,
                    final RichReducer<V> subtractor,
                    final String queryableStoreName);


KTable<K, V> reduce(final RichReducer<V> adder,
                    final RichReducer<V> subtractor,
                    final String queryableStoreName);


 
KTable<K, V> reduce(final RichReducer<V> adder,
                    final Reducer<V> subtractor);


KTable<K, V> reduce(final Reducer<V> adder,
                    final RichReducer<V> subtractor);


KTable<K, V> reduce(final RichReducer<V> adder,
                    final RichReducer<V> subtractor);


KTable<K, V> reduce(final RichReducer<V> adder,
                    final Reducer<V> subtractor,
                    final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> reduce(final Reducer<V> adder,
                    final RichReducer<V> subtractor,
                    final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> reduce(final RichReducer<V> adder,
                    final RichReducer<V> subtractor,
                    final StateStoreSupplier<KeyValueStore> storeSupplier);


 
<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final String queryableStoreName);


<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor);


<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);


<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final Serde<VR> aggValueSerde);


<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);


  

 

 

 

 

 

Proposed changes

 

 

  • Limiting the ProcessorContext - RecordContext interface

We create a subset of features from ProcessorContext and put into RecordContext interface

  • Make record context open to public

Currently we set record context through InternalProcessorContext (StreamTask.updateProcessorContext()

 

 

Code Block
languagejava
public private interfacevoid RecordContext {
    String applicationId();updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) {
    TaskId taskId();
    StreamsMetrics metrics();
    String topic();
    int partition();
	void commit()processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
    long offset(processorContext.setCurrentNode(currNode);
    long timestamp();
    Map<String, Object> appConfigs();
    Map<String, Object> appConfigsWithPrefix(String prefix);
}

Once we need a conversion from ProcessorContext and RecordContext, we just cast: 

}
 

 

However, the record context is not available in ProcessorContext. So, we make the following changes to make it "public"

 

Code Block
languagejava
 
public interface ProcessorContext {
  ...
  all existing methods
  ...
 
  RecordContext recordContext();
}
 
 
public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
  ...
  @Override
  public RecordContext recordContext() {
    return this.recordContext();
  }
}
 

 

 

  • ExtendedRecordContext

We already have RecordContext class with desired set of methods. However, it does not have commit() method as this is related with ProcessorContext. In our ExtendedRecordContext class, we construct commit() method from ProcessorContext object. 

Code Block
languagejava
public interface ExtendedRecordContext extends RecordContext {
    void commit ();
}

 
 
 
 
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {

    ...
    private ExtendedRecordContext extendedRecordContext;

    ...

    @Override
	public void process(final K1 key, final V1 value) {

    	extendedRecordContext = new ExtendedRecordContext() {
    		@Override
    		public void commit() {
        		context().commit();
    		}

    		@Override
    		public long offset() {
        		return context().recordContext().offset();
    		}

    		@Override
    		public long timestamp() {
        		return context().recordContext().timestamp();
    		}

    		@Override
    		public String topic()
Code Block
languagejava
private class KStreamMapProcessor extends AbstractProcessor<K, V> {
    @Override
    public void init(ProcessorContext processorContext) {
  		return context().recordContext().topic();
    		}

    		@Override
    		public int super.initpartition(processorContext); {
        richMapper.init((RecordContext) processorContext); 		return context().recordContext().partition();
    				// HERE WE MAKE A CAST
    }

    @Override}
      };

	
 
    publicif void process(final K key, final V value(key != null && value != null) {
        V1final V2 newValuevalue2 = mapper.apply(key, value);
        context().forwardvalueGetter.get(keyMapper.apply(key, newValuevalue));
    }

    @Override
if (leftJoin || value2 public void close(!= null) {
          super.close()   context().forward(key, joiner.apply(value, value2, extendedRecordContext));
        mapper.close();}
    }
}


}

 

  • Rich Interfaces

 

 

 

Code Block
languagejava
public interface RichValueMapper<V, VR> {
    VR apply(final V value, final RecordContextExtendedRecordContext recordContext);
}

public interface RichValueJoiner<V1, V2, VR> {
    VR apply(final V1 value1, final V2 value2, final RecordContextExtendedRecordContext recordContext);
}

public interface RichKeyValueMapper<K, V, VR> {
    VR apply(final K key, final V value, final RecordContextExtendedRecordContext recordContext);
}

public interface RichReducer<V> {
    V apply(final V value1, final V value2, final RecordContextExtendedRecordContext recordContext);
}


public interface RichInitializer<VA> {
    VA apply(final RecordContext recordContext);
}


public interface RichAggregator<K, V, VA> {
    VA apply(final K key, final V value, final VA aggregate, final RecordContextExtendedRecordContext recordContext);
}
 
public interface RichForeachAction<K, V> {
    void apply(final K key, final V value, final RecordContextExtendedRecordContext recordContext);
}


public interface RichPredicate<K, V> {
    boolean test(final K key, final V value, final RecordContextExtendedRecordContext recordContext);
}

 

Rejected Alternatives

...