...
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods. For example:
...
- having access to RecordContext within an operator
- having access to a read-only key for ValueJoiner, ValueTransformer, ValueMapper interfaces
...
Code Block | ||
---|---|---|
| ||
public interface RichInitializer<V, K> {
V apply(K key);
}
public interface RichValueMapper<V, VR, K> {
VR apply(final V value, final K key, final RecordContext recordContext);
}
public interface RichValueJoiner<V1, V2, VR, K> {
VR apply(final V1 value1, final V2 value2, final K key, final RecordContext recordContext);
}
public interface RichKeyValueMapper<K, V, VR> {
VR apply(final K key, final V value, final RecordContext recordContext);
}
public interface RichReducer<V, K> {
V apply(final V value1, final V value2, final K key, final RecordContext recordContext);
}
public interface RichAggregator<K, V, VA> {
VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext);
}
public interface RichForeachAction<K, V> {
void apply(final K key, final V value, final RecordContext recordContext);
}
public interface RichPredicate<K, V> {
boolean test(final K key, final V value, final RecordContext recordContext);
}
public interface RichMerger<K, V> {
V apply(final K aggKey, final V aggOne, final V aggTwo, final RecordContext recordContext);
}
public interface RichValueTransformer<V, VR, K> {
void init(final ProcessorContext context);
VR transform(final V value, final K key);
@Deprecated
VR punctuate(final long timestamp);
void close();
}
public interface RichValueTransformerSupplier<V, VR, K> {
RichValueTransformer<V, VR, K> get();
}
|
...
Proposed changes
Move
RecordContext
from.
processor.internals
to to.processor
Make record context open to public
...
Code Block | ||
---|---|---|
| ||
// the below code snippet already exists, this is just for background. private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) { processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic())); processorContext.setCurrentNode(currNode); } |
Thus, the record context is not available in ProcessorContext interface. As a result, we make the following changes to make it "public"
Code Block | ||
---|---|---|
| ||
public interface ProcessorContext {
...
...
RecordContext recordContext(); // this line is added in this KIP
}
public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
...
@Override
public RecordContext recordContext() { // only this method is added in this KIP
return this.recordContext();
}
}
|
Add commit() to RecordContext
Currently RecordContext
interface have most of the desired set of methods required in this KIP.
However, it does not have commit()
method. In this KIP we add commit()
method to RecordContext
interface.
Because ProcessorRecordContext
implements RecordContext
, we inherit newly added commit()
method in ProcessorRecordContext
interface.
However, call to a commit()
method, is valid only within RecordContext
interface (at least for now), we throw an exception in ProcessorRecordContext.commit()
.
Code Block | ||
---|---|---|
| ||
public interface RecordContext {
. . .
void commit (); // this line is added in this KIP
}
public class ProcessorRecordContext implements RecordContext {
. . .
@Override
void commit () { // this method is added in this KIP
throw new UnsupportedOperationException("commit() is not supported in this context");
}
}
|
Sample processor should look like this:
Sample processor should look like this:
Code Block | ||
---|---|---|
| ||
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {
...
private RecordContext recordContext; // this line is added in this KIP
...
@Override
public void process(final K1 key, final V1 value) {
recordContext = new RecordContext() { // recordContext initialization is added in this KIP
@Override
public void commit() {
context().commit();
}
@Override
public long offset() {
return context().recordContext().offset();
}
@Override
public long timestamp() {
return context().recordContext().timestamp();
}
@Override
public String topic() {
return context().recordContext().topic();
}
@Override
public int partition() {
return context().recordContext().partition();
}
};
if (key != null && value != null) {
final V2 value2 = valueGetter.get(keyMapper.apply(key, value));
if (leftJoin || value2 != null) {
context().forward(key, joiner.apply(value, value2, recordContext));
}
}
}
} |
...