Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • on-demand commit() feature
  • having access to RecordContext within an operator
  • having access to read-only key for XXXWithKey interfaces

...



Rich Interfaces

...

The following methods will be added to related classes.

 

  • KStream.java


 

Code Block
languagejava
KStream<K,public interface RichValueMapper<K, V, VR> {
    VR apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichValueJoiner<K, V1, V2, VR> {
    VR apply(final K key, final V1 value1, final V2 value2, final RecordContext recordContext);
}

public interface RichKeyValueMapper<K, V, VR> {
    VR apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichReducer<K, V> {
    V apply(final K key, final V value1, final V value2, final RecordContext recordContext);
}

public interface RichAggregator<K, V, VA> {
    VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext);
}
 
public interface RichForeachAction<K, V> {
    void apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichPredicate<K, V> {
    boolean test(final K key, final V value, final RecordContext recordContext);
}

 







Public Interfaces


KStream

 

Code Block
languagejava
KStream<K, V> filter(RichPredicate<V> filter(RichPredicate<? super K, ? super V> predicate);


KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate);


<KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);


<KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);


<VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR> mapper);


<KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);


<VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>> processor);


void foreach(final RichForeachAction<? super K, ? super V> action);


KStream<K, V> peek(final RichForeachAction<? super K, ? super V> actionpredicate);


KStream<K, V>[] branchfilterNot(final RichPredicate<RichPredicate<? super K, ? super V>... predicatespredicate);


<KR> KGroupedStream<KRKStream<KR, V> groupByselectKey(final RichKeyValueMapper<? super K, ? super V, ? extends KR> selectormapper);


<KR> KGroupedStream<KR<KR, VR> KStream<KR, V>VR> groupBymap(final RichKeyValueMapper<? super K, ? super V, KR>? selector,
extends KeyValue<? extends KR, ? extends VR>> mapper);


<VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR> mapper);


<KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? finalextends Serde<KR> keySerde,
                                   final Serde<V> valSerde);


<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final RichValueJoiner<? super VVR>>> mapper);


<VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>> processor);


void foreach(final RichForeachAction<? super K, ? super V> action);


KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action);


KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates);


<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector);


<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super VOV, ? extends VR> joinerKR> selector,
                             final JoinWindows windows);


<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStreamSerde<KR> keySerde,
                             final RichValueJoiner<? super V, ? super VO,final ? extendsSerde<V> valSerde);


<VO, VR> joinerKStream<K,
 VR> join(final KStream<K, VO> otherStream,
                        final JoinWindows windows,
   final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                final Serde<K> keySerde,
           final JoinWindows windows);


<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
         final Serde<V> thisValueSerde,
                  final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final Serde<VO> otherValueSerde);


<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
JoinWindows windows,
                             final Serde<K> keySerde,
                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
  final Serde<V> thisValueSerde,
                             final JoinWindowsSerde<VO> windowsotherValueSerde);


<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows,
        );


<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                         final Serde<K> keySerde,
      final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                 final Serde<V> thisValSerde,
              final JoinWindows windows,
                 final Serde<VO> otherValueSerde);


<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
       final Serde<K> keySerde,
                         final RichValueJoiner<? super V, ? super VO, ? extendsfinal VR>Serde<V> joinerthisValSerde,
                                  final JoinWindowsSerde<VO> windowsotherValueSerde);


<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows);


<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final Serde<K> keySerdeRichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final Serde<V>JoinWindows thisValueSerdewindows,
                                  final Serde<VO> otherValueSerde);


<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
Serde<K> keySerde,
                                  final Serde<V> thisValueSerde,
   final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner)                      final Serde<VO> otherValueSerde);


<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner);


<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final Serde<K> keySerde,
      RichValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                       final Serde<V> valSerde);


<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> tableSerde<K> keySerde,
                             final    Serde<V> valSerde);


<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner);


<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                                 final Serde<K> keySerde,
                                 final Serde<V> valSerde);


<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
                                 final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                 final RichValueJoiner<? super V, ? super GV, ? extends RV> joiner);


<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
                                     final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                     final RichValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner);

...

We already have RecordContext  class with desired set of methods. However, it does not have commit() method. In this KIP we add commit() method to  RecordContext interface.

Code Block
languagejava
public interface RecordContext {
	. . .
    void commit ();
}
 
public class ProcessorRecordContext implements RecordContext { 
   . . .
   @Override
   void commit () {
     throw new UnsupportedOperationException("commit() is not supported in this context");
   }
}
 

 

 

Sample processor should look like this:

 

languagejava
public interface RecordContext {
	. . .
    void commit ();
}
 
public class ProcessorRecordContext implements RecordContext { 
   . . .
   @Override
   void commit 
Code Block
languagejava
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {

    ...
    private RecordContext recordContext;

    ...

    @Override
	public void process(final K1 key, final V1 value) {

    	recordContext = new RecordContext() {
    		@Override
    		public void commit() {
        		context().commit();
    		}

    		@Override
    		public long offset() {
     throw new UnsupportedOperationException("commit() is not supported 		return context().recordContext().offset()in this context");
    		}
}
 

 

 

Sample processor should look like this:

 

Code Block
languagejava
class KStreamKTableJoinProcessor<K1, K2,  		@Override
    		public long timestamp() {
V1, V2, R> extends AbstractProcessor<K1, V1> {

    ...
    private 		return context().recordContext().timestamp();RecordContext recordContext;

    		}...

    		@Override
	public void process(final K1 key, 		publicfinal StringV1 topic(value) {

    	recordContext =   		return context().recordContext().topic();
    		}

new RecordContext() {
    		@Override
    		public intvoid partitioncommit() {
        		return context().recordContextcommit().partition();
    		}

      };

			@Override
 
   		public iflong (key != null && value != nulloffset() {
        final V2 value2 = valueGetter.get(keyMapper.apply(key, value))		return context().recordContext().offset();
    		}

    		@Override
 if (leftJoin || value2 != null		public long timestamp() {
           		return context().forward(key, joiner.apply(value, value2, recordContext));recordContext().timestamp();
    		}

        }		@Override
    }
}


}

 

  • Rich Interfaces

 

Code Block
languagejava
public interface RichValueMapper<V, VR>		public String topic() {
    VR apply(final V value, final RecordContext recordContext		return context().recordContext().topic();
    		}

public interface RichValueJoiner<V1, V2, VR> {
 		@Override
    		public VRint apply(final V1 value1, final V2 value2, final RecordContext recordContext);
}

public interface RichKeyValueMapper<K, V, VR> {partition() {
        		return context().recordContext().partition();
    VR apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichReducer<V> {		}
      };

	
 
    Vif apply(final V value1, final V value2, final RecordContext recordContext);
}

public interface RichAggregator<K, V, VA> {
    VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext);
}
 
public interface RichForeachAction<K, V> {
    void apply(final K key, final V value, final RecordContext recordContext);
}


public interface RichPredicate<K, V> {
    boolean test(final K key, final V value, final RecordContext recordContext);
}

key != null && value != null) {
        final V2 value2 = valueGetter.get(keyMapper.apply(key, value));
        if (leftJoin || value2 != null) {
            context().forward(key, joiner.apply(value, value2, recordContext));    
        }
    }
}


}

 

Rejected Alternatives

Not yet.