THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
public interface ProcessorContext {
...
all existing methods
...
RecordContext recordContext();
}
public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
...
@Override
public RecordContext recordContext() {
return this.recordContext();
}
}
|
- ExtendedRecordContext
Add commit() to RecordContext
We already have RecordContext
class class with desired set of methods. However, it does not have commit()
method as this is related with ProcessorContext. In our ExtendedRecordContext class, we construct this KIP we add commit()
method from ProcessorContext object. to RecordContext
interface.
Code Block | ||
---|---|---|
| ||
public interface ExtendedRecordContext extendsRecordContext { . . . void commit (); } public class ProcessorRecordContext implements RecordContext { . . . @Override void commit () { throw new UnsupportedOperationException("commit() is not supported in this context"); } } |
Sample processor should look like this:
Code Block | ||
---|---|---|
| ||
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> { ... private ExtendedRecordContextRecordContext extendedRecordContextrecordContext; ... @Override public void process(final K1 key, final V1 value) { extendedRecordContextrecordContext = new ExtendedRecordContextRecordContext() { @Override public void commit() { context().commit(); } @Override public long offset() { return context().recordContext().offset(); } @Override public long timestamp() { return context().recordContext().timestamp(); } @Override public String topic() { return context().recordContext().topic(); } @Override public int partition() { return context().recordContext().partition(); } }; if (key != null && value != null) { final V2 value2 = valueGetter.get(keyMapper.apply(key, value)); if (leftJoin || value2 != null) { context().forward(key, joiner.apply(value, value2, extendedRecordContextrecordContext)); } } } } |
Rich Interfaces
Code Block | ||
---|---|---|
| ||
public interface RichValueMapper<V, VR> { VR apply(final V value, final ExtendedRecordContextRecordContext recordContext); } public interface RichValueJoiner<V1, V2, VR> { VR apply(final V1 value1, final V2 value2, final ExtendedRecordContextRecordContext recordContext); } public interface RichKeyValueMapper<K, V, VR> { VR apply(final K key, final V value, final ExtendedRecordContextRecordContext recordContext); } public interface RichReducer<V> { V apply(final V value1, final V value2, final ExtendedRecordContextRecordContext recordContext); } public interface RichAggregator<K, V, VA> { VA apply(final K key, final V value, final VA aggregate, final ExtendedRecordContextRecordContext recordContext); } public interface RichForeachAction<K, V> { void apply(final K key, final V value, final ExtendedRecordContextRecordContext recordContext); } public interface RichPredicate<K, V> { boolean test(final K key, final V value, final ExtendedRecordContextRecordContext recordContext); } |
Rejected Alternatives
...