Status
Current state: "Under Discussion"
Discussion thread: TBD
JIRA: TBD
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods especially when init(Some params)
, close()
methods are needed.
Public Interfaces
We assume to build this KIP on top of KIP-149, meaning, we build rich functions on top of "withKey" interfaces. So, there is no public interface change apart from the ones shown in KIP-149.
The reason is that, once withKey
interfaces are part of public API, we will not need any overloaded methods for rich functions.
Moreover, we separated onlyValue (ValueMapper)
and withKey (ValueMapperWithKey)
interfaces (no inheritance relation) to enable lambdas. Therefore, while introducing the rich functios, we have to chose one of them to extend.
It is better to select the more general interface (withKey
in this case) for rich functions.
Proposed Changes
There 2 main issues to consider while introducing rich functions: 1. Limiting the ProcessorContext for init(Some param) method and introducing rich functions in a backwards compatible way.
Limiting the ProcessorContext - RecordContext interface
We create a subset of features from ProcessorContext
and put into RecordContext
interface
public interface RecordContext { String applicationId(); TaskId taskId(); StreamsMetrics metrics(); String topic(); int partition(); long offset(); long timestamp(); Map<String, Object> appConfigs(); Map<String, Object> appConfigsWithPrefix(String prefix); } public interface ProcessorContext extends RecordContext { // all methods but the ones in RecordContext }
Once we need a conversion from ProcessorContext
and RecordContext, we just cast:
private class KStreamMapProcessor extends AbstractProcessor<K, V> { @Override public void init(ProcessorContext processorContext) { super.init(processorContext); richMapper.init((RecordContext) processorContext); // here } @Override public void process(final K key, final V value) { V1 newValue = mapper.apply(key, value); context().forward(key, newValue); } @Override public void close() { super.close(); mapper.close(); } }
Rich Interfaces
If the interface is value-only (like ValueJoiner, ValueMapper
) we extend its rich interface from its withKey'ed counterpart.
If the interface is key-value (KeyValueMapper) we extend its rich interface from itself.
public interface RichFunction { void init(RecordContext recordContext); void close(); } public interface ValueMapperWithKey<K, V, VR> { /** * Map the given value to a new value. * * @param value the value to be mapped * @return the new value */ VR apply(final K key, final V value); } public interface RichValueMapper<K, V, VR> extends ValueMapperWithKey<K, V, VR>, RichFunction { } public interface RichKeyValueMapper<K, V, VR> extends KeyValueMapper<K, V, VR>, RichFunction { }
The same semantics apply to other interfaces as well.
So we don't need to add any overloaded methods for public APIs. Internally we perform 2 changes:
- Change the constructor type of all related Processors to accept rich interfaces
- Create converters from non-rich to rich interfaces
class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> { private final RichValueMapper<K, V, V1> mapper; public KStreamMapValues(RichValueMapper<K, V, V1> mapper) { this.mapper = mapper; } @Override public Processor<K, V> get() { return new KStreamMapProcessor(); } private class KStreamMapProcessor extends AbstractProcessor<K, V> { @Override public void init(ProcessorContext processorContext) { super.init(processorContext); mapper.init((RecordContext) processorContext); } @Override public void process(final K key, final V value) { V1 newValue = mapper.apply(key, value); context().forward(key, newValue); } @Override public void close() { super.close(); mapper.close(); } } } static <K, T1, T2, R> RichValueJoiner<K, T1, T2, R> convertToRichValueJoiner(final ValueJoinerWithKey<K, T1, T2, R> valueJoinerWithKey) { Objects.requireNonNull(valueJoinerWithKey, "valueJoiner can't be null"); if (valueJoinerWithKey instanceof RichValueJoiner) { return (RichValueJoiner<K, T1, T2, R>) valueJoinerWithKey; } else { return new RichValueJoiner<K, T1, T2, R>() { @Override public void init() {} @Override public void close() {} @Override public R apply(K key, T1 value1, T2 value2) { return valueJoinerWithKey.apply(key, value1, value2); } }; } } static <K, T1, T2, R> ValueJoinerWithKey<K, T1, T2, R> convertToValueJoinerWithKey(final ValueJoiner<T1, T2, R> valueJoiner) { Objects.requireNonNull(valueJoiner, "valueJoiner can't be null"); return new ValueJoinerWithKey<K, T1, T2, R>() { @Override public R apply(K key, T1 value1, T2 value2) { return valueJoiner.apply(value1, value2); } }; }
Rejected Alternatives
Not yet.