Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
 
public interface ProcessorContext {
  ...
  all existing methods
  ...
 
  RecordContext recordContext();
}
 
 
public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
  ...
  @Override
  public RecordContext recordContext() {
    return this.recordContext();
  }
}
 

 

 

  • ExtendedRecordContext

    Add commit() to RecordContext

We already have RecordContext class  class with desired set of methods. However, it does not have commit() method as this is related with ProcessorContext. In our ExtendedRecordContext class, we construct this KIP we add commit() method from ProcessorContext object. to  RecordContext interface.

Code Block
languagejava
public interface ExtendedRecordContext extendsRecordContext {
	. . .
    void commit ();
}
 
public class ProcessorRecordContext implements RecordContext { 
   . . .
   @Override
   void commit () {
     throw new UnsupportedOperationException("commit() is not supported in this context");
   }
}
 

 

 

Sample processor should look like this:

 

Code Block
languagejava

 
 
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {

    ...
    private ExtendedRecordContextRecordContext extendedRecordContextrecordContext;

    ...

    @Override
	public void process(final K1 key, final V1 value) {

    	extendedRecordContextrecordContext = new ExtendedRecordContextRecordContext() {
    		@Override
    		public void commit() {
        		context().commit();
    		}

    		@Override
    		public long offset() {
        		return context().recordContext().offset();
    		}

    		@Override
    		public long timestamp() {
        		return context().recordContext().timestamp();
    		}

    		@Override
    		public String topic() {
        		return context().recordContext().topic();
    		}

    		@Override
    		public int partition() {
        		return context().recordContext().partition();
    		}
      };

	
  
    if (key != null && value != null) {
        final V2 value2 = valueGetter.get(keyMapper.apply(key, value));
        if (leftJoin || value2 != null) {
            context().forward(key, joiner.apply(value, value2, extendedRecordContextrecordContext));    
        }
    }
}


}

 

  • Rich Interfaces

 

 

 

Code Block
languagejava
public interface RichValueMapper<V, VR> {
    VR apply(final V value, final ExtendedRecordContextRecordContext recordContext);
}

public interface RichValueJoiner<V1, V2, VR> {
    VR apply(final V1 value1, final V2 value2, final ExtendedRecordContextRecordContext recordContext);
}

public interface RichKeyValueMapper<K, V, VR> {
    VR apply(final K key, final V value, final ExtendedRecordContextRecordContext recordContext);
}

public interface RichReducer<V> {
    V apply(final V value1, final V value2, final ExtendedRecordContextRecordContext recordContext);
}

public interface RichAggregator<K, V, VA> {
    VA apply(final K key, final V value, final VA aggregate, final ExtendedRecordContextRecordContext recordContext);
}
 
public interface RichForeachAction<K, V> {
    void apply(final K key, final V value, final ExtendedRecordContextRecordContext recordContext);
}


public interface RichPredicate<K, V> {
    boolean test(final K key, final V value, final ExtendedRecordContextRecordContext recordContext);
}

 

Rejected Alternatives

...