Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: "Under Discussion"

Discussion thread: TBD here

JIRA: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods especially when init(Some params), close() methods are needed.

Public Interfaces

 

  • KStream.java

 

 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4125
           
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4218
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4726
 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP combines KIP-149 and provides a hybrid solution to rich functions in Streams and accessing read-only keys within ValueJoiner, ValueTransformer, ValueMapper interfaces. 

 

Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods. For example:

  • having access to RecordContext within an operator
  • having access to a read-only key for ValueJoiner, ValueTransformer, ValueMapper interfaces



Rich Interfaces

 

Code Block
languagejava
 
public interface RichInitializer<V, K> {
    V apply(K key);
}

public interface RichValueMapper<V, VR, K> {
    VR apply(final V value, final K key, final RecordContext recordContext);
}

public interface RichValueJoiner<V1, V2, VR, K> {
    VR apply(final V1 value1, final V2 value2, final K key, final RecordContext recordContext);
}

public interface RichKeyValueMapper<K, V, VR> {
    VR apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichReducer<V, K> {
    V apply(final V value1, final V value2, final K key, final RecordContext recordContext);
}

public interface RichAggregator<K, V, VA> {
    VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext);
}
 
public interface RichForeachAction<K, V> {
    void apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichPredicate<K, V> {
    boolean test(final K key, final V value, final RecordContext recordContext);
}

 
public interface RichMerger<K, V> {
	V apply(final K aggKey, final V aggOne, final V aggTwo, final RecordContext recordContext);
}

 
 
public interface RichValueTransformer<V, VR, K> {
	void init(final ProcessorContext context);

	VR transform(final V value, final K key);

	void close();
}


 
public interface RichValueTransformerSupplier<V, VR, K> {

    RichValueTransformer<V, VR, K> get();
}


 


 

Public Interfaces


KStream

 

Code Block
languagejava
KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate);
KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate);
<KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);
<KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);
<VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR, ? super K> mapper);
<KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
 
<VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>, ? super K> mapper);



void foreach(final RichForeachAction<? super K, ? super V> action);
KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action);
KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates);
 
<VR> KStream<K, VR> transformValues(final RichValueTransformerSupplier<? super V, ? extends VR, ? super K> valueTransformerSupplier,
                                    final String... stateStoreNames);
<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector);
<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector,
                                   final Serialized<KR, V> serialized);
 

<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
                             final JoinWindows windows);
<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
                             final JoinWindows windows,
                             final Joined<K, V, VO> joined);
 

<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
                                 final JoinWindows windows);
<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
                                 final JoinWindows windows,
                                 final Joined<K, V, VO> joined);
 

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
                                  final JoinWindows windows);
<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
                                  final JoinWindows windows,
                                  final Joined<K, V, VO> joined);
 

<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner);
<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final RichValueJoiner<
Code Block
languagejava
KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate);


KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate);


<KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);


<KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);


<VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR> mapper);


<KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);


<VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>> processor);


void foreach(final RichForeachAction<? super K, ? super V> action);


KStream<KV, V> peek(final RichForeachAction<? super KVT, ? superextends V> action);


KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates);


<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector);


<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector,
VR> joiner,
                             final Joined<K, V, VT> joined);
 

<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner);
<VT, VR> KStream<K, VR> leftJoin(final Serde<KR> keySerdeKTable<K, VT> table,
                                 final RichValueJoiner<? super K, ? super finalV, Serde<V> valSerde);


<VO,? super VT, ? extends VR> KStream<Kjoiner,
 VR> join(final KStream<K, VO> otherStream,
                            final final RichValueJoiner<? super V, ? super VO, ? extends VR> joinerJoined<K, V, VT> joined);
 
<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
                                 final JoinWindows windows);


<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStreamRichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                 final RichValueJoiner<? super K, ? super V, ? super VOGV, ? extends VR>RV> joiner);

<GK,
 GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK,          GV> globalKTable,
             final JoinWindows windows,
                         final RichKeyValueMapper<? super K, final? Serde<K>super keySerdeV,
 ? extends GK> keyValueMapper,
                         final Serde<V> thisValueSerde,
          final RichValueJoiner<? super K, ? super V, ? super GV, ? extends RV> valueJoiner);

 





KTable

 

Code Block
languagejava
KTable<K, V> filter(final RichPredicate<? super K, ? finalsuper Serde<VO>V> otherValueSerdepredicate);


<VOKTable<K, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStreamV> filter(final RichPredicate<? super K, ? super V> predicate,
                    final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
KTable<K, V> filterNot(final RichPredicate<? super K, ? super final RichValueJoiner<? super V, V> predicate);
KTable<K, V> filterNot(final RichPredicate<? super VOK, ? extendssuper VR>V> joinerpredicate,
                                 final JoinWindows windows);


<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final RichValueJoiner<final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR, ? super K> mapper);
<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? superextends VOVR, ? extendssuper VR>K> joinermapper,
                                 final JoinWindowsMaterialized<K, windowsVR,
                                 final Serde<K> keySerde,
                                 final Serde<V> thisValSerde,
 KeyValueStore<Bytes, byte[]>> materialized);

<KR> KStream<KR, V> toStream(final RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);
 

<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector);
<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
                                       final finalSerialized<KR, Serde<VO>VR> otherValueSerdeserialized);
 

<VO, VR> KStream<KKTable<K, VR> outerJoinjoin(final KStream<KKTable<K, VO> otherStreamother,
                            final RichValueJoiner<? super V, ? super finalVO, RichValueJoiner<? superextends VVR, ? super VO, ? extends VR> joiner K> joiner);
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final RichValueJoiner<? super V, ? super finalVO, JoinWindows windows);


<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream? extends VR, ? super K> joiner,
                            final Materialized<K, VR, KeyValueStore<Bytes, byte[]>>  final RichValueJoiner<? super V, ? super VO, ? extends VR> joinermaterialized);
 

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final RichValueJoiner<? finalsuper JoinWindows windowsV,
 ? super VO, ? extends VR, ? super K> joiner);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K,         VO> other,
          final Serde<K> keySerde,
                        final ValueJoiner<? super K, ? super V, ? super VO, final? extends Serde<V>VR> thisValueSerdejoiner,
                                final  final Serde<VO> otherValueSerdeMaterialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
 

<VT<VO, VR> KStream<KKTable<K, VR> joinouterJoin(final KTable<K, VT>VO> tableother,
                                 final RichValueJoiner<? super V, ? super VTVO, ? extends VR>VR, ? super K> joiner);


<VT<VO, VR> KStream<KKTable<K, VR> joinouterJoin(final KTable<K, VT>VO> tableother,
                                 final RichValueJoiner<? super V, ? super VTVO, ? extends VR> joinerVR,
 ? super K> joiner,
                         final Serde<K> keySerde,
      final Materialized<K, VR, KeyValueStore<Bytes,                    final Serde<V> valSerde);


<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
             byte[]>> materialized);

 

 

 

 

 

 

 

KGroupedStream

 

 

Code Block
languagejava
KTable<K, V> reduce(final RichReducer<V, K> reducer);

KTable<K, V> reduce(final RichReducer<V, K> reducer,
                    final RichValueJoiner<?Materialized<K, super V, ? super VTKeyValueStore<Bytes, ? extends VR> joinerbyte[]>> materialized);


<VT, VR> KStream<K<VR> KTable<K, VR> leftJoinaggregate(final KTable<KRichInitializer<VR, VT>K> tableinitializer,
                                 final RichValueJoiner<RichAggregator<? super VK, ? super VTV, ? extends VR> joineraggregator,
                             final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR, Serde<K>K> keySerdeinitializer,

                             final RichAggregator<? super K, ? super finalV, Serde<V>VR> aggregator);

 

 

 

SessionWindowedKStream

 

There are 3 rich interfaces in aggregate() methods. So converting all possible combinations to their rich counterparts can cause a lot of overloads. So, I propose to overload one method with all rich interfaces. 

 

Code Block
languagejava
<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T, Windowed<K>> initializer,
    valSerde);


<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
                                 final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                 final RichValueJoiner<RichAggregator<? super VK, ? super GVV, ? extends RV> joiner);


<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,T> aggregator,
                                     final RichKeyValueMapper<RichMerger<? super K, ? super V, ? extends GK> keyValueMapper,
T> sessionMerger);
<VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, Windowed<K>> initializer,
                                       final RichValueJoiner<RichAggregator<? super VK, ? super GV, ? extends RV> valueJoiner);

  • Limiting the ProcessorContext - RecordContext interface

We create a subset of features from ProcessorContext and put into RecordContext interface

Code Block
languagejava
public interface RecordContext {
V, VR> aggregator,
                 String applicationId();
    TaskId taskId();
    StreamsMetrics metrics();
    String topic();
    int partition();
	void commit();
    long offset(); final RichMerger<? super K, VR> sessionMerger,
    long timestamp();
    Map<String, Object> appConfigs();
    Map<String, Object> appConfigsWithPrefix(String prefix);
}
 
 
public interface ProcessorContext extends RecordContext {
          // all methods but the ones in RecordContext
}

 

Once we need a conversion from ProcessorContext and RecordContext, we just cast: 

Code Block
languagejava
private class KStreamMapProcessor extends AbstractProcessor<K, V>final {
Materialized<K, VR, SessionStore<Bytes,  @Override
    public void init(ProcessorContext processorContext) {
        super.init(processorContext);byte[]>> materialized);

KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer);
KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer,
        richMapper.init((RecordContext) processorContext);  				// HERE WE MAKE A CAST
    }

    @Override
    public void process(final KMaterialized<K, keyV, final V value) {
        V1 newValue = mapper.apply(key, value);
SessionStore<Bytes, byte[]>> materializedAs);


,

 

 

 

 

TimeWindowedKStream

  

Code Block
languagejava
 
<VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, K> initializer,
               context().forward(key, newValue);
    }

    @Override
    public void close() {
        super.close();
        mapper.close();
    }
}

  • Rich Interfaces

 

 

Code Block
languagejava
public interface RichValueMapper<V, VR> {
    VR apply(final V value, final RecordContext recordContext);
}

public interface RichValueJoiner<V1, V2, VR> {
    VR apply(final V1 value1, final V2 value2, final RecordContext recordContext);
}

public interface RichKeyValueMapper<K, V, VR> {
    VR apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichReducer<V> {
    V apply(final V value1, final V value2, final RecordContext recordContext);
}


public interface RichInitializer<VA> {
    VA apply(final RecordContext recordContext);
}


public interface Aggregator<K, V, VA> {
    VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext);
}
 
public interface RichForeachAction<K, V> {
    void apply(final K key, final V value, final RecordContext recordContext);
}


public interface RichPredicate<K, V> {
    boolean test(final K key, final V value, final RecordContext recordContext);
}

 

The same semantics apply to other interfaces as well. 

 

So we don't need to add any overloaded methods for public APIs. Internally we perform 2 changes:

  1. Change the constructor type of all related Processors to accept rich interfaces
  2. Create converters from non-rich to rich interfaces

 

final RichAggregator<? super K, ? super V, VR> aggregator);
<VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, K> initializer,
                                       final RichAggregator<? super K, ? super V, VR> aggregator,
                                       final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);

KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer);
KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer,
                              final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);






 

 

 

 

KGroupedTable

  

Code Block
languagejava
 
KTable<K, V> reduce(final RichReducer<V, K> adder,
                    final RichReducer<V, K> subtractor,
                    final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

KTable<K, V> reduce(final RichReducer<V, K> adder,
                    final RichReducer<V, K> subtractor);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
             
Code Block
languagejava
class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> {

    private final RichValueMapper<K, V, V1> mapper;

    public KStreamMapValues(RichValueMapper<K, V, V1> mapper) {
        this.mapper = mapper;
    }

    @Override
    public Processor<K, V> get() {
        return new KStreamMapProcessor();
    }

  final RichAggregator<? privatesuper classK, KStreamMapProcessor? extendssuper AbstractProcessor<KV, V>VR> {adder,
        @Override
        public void init(ProcessorContext processorContext) {
          final RichAggregator<? super.init(processorContext);
       K, ? super V, VR> subtractor,
      mapper.init((RecordContext) processorContext);
        }

        @Override
        public void process(final KMaterialized<K, keyVR, final V value) {
     KeyValueStore<Bytes, byte[]>> materialized);
<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
       V1 newValue = mapper.apply(key, value);
            context().forward(key, newValue);
     final RichAggregator<? super }

K, ? super V, VR>  adder,
  @Override
        public void close() {
            super.close();
    final RichAggregator<? super K, ? super V,  mapper.close();
        }
    }
}
 
 
 
 
 
static <K, T1, T2, R> RichValueJoiner<K, T1, T2, R> convertToRichValueJoiner(final ValueJoinerWithKey<K, T1, T2, R> valueJoinerWithKeyVR> subtractor);

 

 

 

 

 

 

Proposed changes

 

 

  • Move RecordContext from  .processor.internals  to  .processor 

 

  • Make record context open to public

Currently we set record context through InternalProcessorContext (StreamTask.updateProcessorContext()) :
Code Block
languagejava
// the below code snippet already exists, this is just for background. 
private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) {
    Objects.requireNonNull(valueJoinerWithKey, "valueJoiner can't be null"processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
    processorContext.setCurrentNode(currNode);
}

 

 

 

Sample processor should look like this:

 

Code Block
languagejava
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {
if (valueJoinerWithKey instanceof RichValueJoiner) {
        return (RichValueJoiner<K, T1, T2, R>) valueJoinerWithKey;
    }...
 else {
  private RecordContext recordContext;    return new RichValueJoiner<K, T1, T2, R>() {
    // this line is added in this KIP

    ...

    @Override
	public void process(final K1 key, final V1 value) {

    	recordContext public= voidnew initRecordContext() {}

               // recordContext initialization @Override
is added in this KIP

    		@Override
    		public voidlong closeoffset() {}

   {
          @Override		return context().recordContext().offset();
    		}

    		@Override
    		public Rlong apply(K key, T1 value1, T2 value2timestamp() {
                return valueJoinerWithKey.apply(key, value1, value2		return context().recordContext().timestamp();
    		}

    		@Override
    }		public String topic() {
        }		return context().recordContext().topic();
    		}
}

static  <K, T1, T2, R> ValueJoinerWithKey<K, T1, T2, R> convertToValueJoinerWithKey(final ValueJoiner<T1, T2, R> valueJoiner) {
    Objects.requireNonNull(valueJoiner, "valueJoiner can't be null");
    return new ValueJoinerWithKey<K, T1, T2, R>( 		@Override
    		public int partition() {
        		return context().recordContext().partition();
    		}
      };

	
 
    if (key != null && value != null) {
        @Override
final V2 value2 = valueGetter.get(keyMapper.apply(key, value));
   public  R apply(K key, T1if value1, T2(leftJoin || value2 != null) {
            return valueJoinercontext().forward(key, joiner.apply(value1value, value2, recordContext));    
        }
    };
}




}

 

Rejected Alternatives

Not yet.