THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- on-demand
commit()
feature - having access to RecordContext within an operator
- having access to read-only key for
XXXWithKey
interfaces
...
Rich Interfaces
...
The following methods will be added to related classes.
KStream.java
Code Block | ||
---|---|---|
| ||
KStream<K,public interface RichValueMapper<K, V, VR> { VR apply(final K key, final V value, final RecordContext recordContext); } public interface RichValueJoiner<K, V1, V2, VR> { VR apply(final K key, final V1 value1, final V2 value2, final RecordContext recordContext); } public interface RichKeyValueMapper<K, V, VR> { VR apply(final K key, final V value, final RecordContext recordContext); } public interface RichReducer<K, V> { V apply(final K key, final V value1, final V value2, final RecordContext recordContext); } public interface RichAggregator<K, V, VA> { VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext); } public interface RichForeachAction<K, V> { void apply(final K key, final V value, final RecordContext recordContext); } public interface RichPredicate<K, V> { boolean test(final K key, final V value, final RecordContext recordContext); } |
Public Interfaces
KStream
Code Block | ||
---|---|---|
| ||
KStream<K, V> filter(RichPredicate<V> filter(RichPredicate<? super K, ? super V> predicate); KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate); <KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper); <KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper); <VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR> mapper); <KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper); <VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>> processor); void foreach(final RichForeachAction<? super K, ? super V> action); KStream<K, V> peek(final RichForeachAction<? super K, ? super V> actionpredicate); KStream<K, V>[] branchfilterNot(final RichPredicate<RichPredicate<? super K, ? super V>... predicatespredicate); <KR> KGroupedStream<KRKStream<KR, V> groupByselectKey(final RichKeyValueMapper<? super K, ? super V, ? extends KR> selectormapper); <KR> KGroupedStream<KR<KR, VR> KStream<KR, V>VR> groupBymap(final RichKeyValueMapper<? super K, ? super V, KR>? selector, extends KeyValue<? extends KR, ? extends VR>> mapper); <VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR> mapper); <KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? finalextends Serde<KR> keySerde, final Serde<V> valSerde); <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final RichValueJoiner<? super VVR>>> mapper); <VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>> processor); void foreach(final RichForeachAction<? super K, ? super V> action); KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action); KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates); <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector); <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super VOV, ? extends VR> joinerKR> selector, final JoinWindows windows); <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStreamSerde<KR> keySerde, final RichValueJoiner<? super V, ? super VO,final ? extendsSerde<V> valSerde); <VO, VR> joinerKStream<K, VR> join(final KStream<K, VO> otherStream, final JoinWindows windows, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Serde<K> keySerde, final JoinWindows windows); <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final Serde<V> thisValueSerde, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Serde<VO> otherValueSerde); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, JoinWindows windows, final Serde<K> keySerde, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Serde<V> thisValueSerde, final JoinWindowsSerde<VO> windowsotherValueSerde); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, ); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final Serde<K> keySerde, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Serde<V> thisValSerde, final JoinWindows windows, final Serde<VO> otherValueSerde); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final Serde<K> keySerde, final RichValueJoiner<? super V, ? super VO, ? extendsfinal VR>Serde<V> joinerthisValSerde, final JoinWindowsSerde<VO> windowsotherValueSerde); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final Serde<K> keySerdeRichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Serde<V>JoinWindows thisValueSerdewindows, final Serde<VO> otherValueSerde); <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, Serde<K> keySerde, final Serde<V> thisValueSerde, final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner) final Serde<VO> otherValueSerde); <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner); <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, final Serde<K> keySerde, RichValueJoiner<? super V, ? super VT, ? extends VR> joiner, final Serde<V> valSerde); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> tableSerde<K> keySerde, final Serde<V> valSerde); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner, final Serde<K> keySerde, final Serde<V> valSerde); <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable, final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper, final RichValueJoiner<? super V, ? super GV, ? extends RV> joiner); <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable, final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper, final RichValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner); |
...
We already have RecordContext
class with desired set of methods. However, it does not have commit()
method. In this KIP we add commit()
method to RecordContext
interface.
Code Block | ||
---|---|---|
| ||
public interface RecordContext {
. . .
void commit ();
}
public class ProcessorRecordContext implements RecordContext {
. . .
@Override
void commit () {
throw new UnsupportedOperationException("commit() is not supported in this context");
}
}
|
Sample processor should look like this:
| ||
public interface RecordContext {
. . .
void commit ();
}
public class ProcessorRecordContext implements RecordContext {
. . .
@Override
void commit | ||
Code Block | ||
| ||
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> { ... private RecordContext recordContext; ... @Override public void process(final K1 key, final V1 value) { recordContext = new RecordContext() { @Override public void commit() { context().commit(); } @Override public long offset() { throw new UnsupportedOperationException("commit() is not supported return context().recordContext().offset()in this context"); } } |
Sample processor should look like this:
Code Block | ||
---|---|---|
| ||
class KStreamKTableJoinProcessor<K1, K2, @Override public long timestamp() { V1, V2, R> extends AbstractProcessor<K1, V1> { ... private return context().recordContext().timestamp();RecordContext recordContext; }... @Override public void process(final K1 key, publicfinal StringV1 topic(value) { recordContext = return context().recordContext().topic(); } new RecordContext() { @Override public intvoid partitioncommit() { return context().recordContextcommit().partition(); } }; @Override public iflong (key != null && value != nulloffset() { final V2 value2 = valueGetter.get(keyMapper.apply(key, value)) return context().recordContext().offset(); } @Override if (leftJoin || value2 != null public long timestamp() { return context().forward(key, joiner.apply(value, value2, recordContext));recordContext().timestamp(); } } @Override } } } |
Rich Interfaces
Code Block | ||
---|---|---|
| ||
public interface RichValueMapper<V, VR> public String topic() { VR apply(final V value, final RecordContext recordContext return context().recordContext().topic(); } public interface RichValueJoiner<V1, V2, VR> { @Override public VRint apply(final V1 value1, final V2 value2, final RecordContext recordContext); } public interface RichKeyValueMapper<K, V, VR> {partition() { return context().recordContext().partition(); VR apply(final K key, final V value, final RecordContext recordContext); } public interface RichReducer<V> { } }; Vif apply(final V value1, final V value2, final RecordContext recordContext); } public interface RichAggregator<K, V, VA> { VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext); } public interface RichForeachAction<K, V> { void apply(final K key, final V value, final RecordContext recordContext); } public interface RichPredicate<K, V> { boolean test(final K key, final V value, final RecordContext recordContext); } key != null && value != null) { final V2 value2 = valueGetter.get(keyMapper.apply(key, value)); if (leftJoin || value2 != null) { context().forward(key, joiner.apply(value, value2, recordContext)); } } } } |
Rejected Alternatives
Not yet.