Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4125
 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3907
                      
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4218
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4726
 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
// the below code snippet already exists, this is just for background. 
private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) {
    processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
    processorContext.setCurrentNode(currNode);
}

 

 

 

 

Sample processor should look like this:Thus, the record context is not available in ProcessorContext interface. As a result, we make the following changes to make it "public"

 

Code Block
languagejava
 
public interface ProcessorContext {
  ...
  ...
  RecordContext recordContext(); 		    // this line is added in this KIP
}
 
 
public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
  ...
  @Override
  public RecordContext recordContext() {					// only this method is added in this KIP
    return this.recordContext();
  }
}
 			

 

 

  • Add commit() to RecordContext

Currently RecordContext interface have most of the desired set of methods required in this KIP.

However, it does not have commit() method. In this KIP we add commit() method to  RecordContext interface.

 

Because ProcessorRecordContext implements RecordContext, we inherit newly added commit() method in ProcessorRecordContext interface. 

However, call to a commit() method, is valid only within RecordContext interface (at least for now), we throw an exception in ProcessorRecordContext.commit().

 

 

Code Block
languagejava
public interface RecordContext {
	. . .
    void commit ();    // this line is added in this KIP
}
 
public class ProcessorRecordContext implements RecordContext { 
   . . .
   @Override
   void commit () {                // this method is added in this KIP
     throw new UnsupportedOperationException("commit() is not supported in this context");
   }
}
 

 

 

Sample processor should look like this:

 

Code Block
languagejava
class KStreamKTableJoinProcessor<K1, K2, V1,class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {

    ...
    private RecordContext recordContext;       // this line is added in this KIP

    ...

    @Override
	public void process(final K1 key, final V1 value) {

    	recordContext = new RecordContext() {               // recordContext initialization is added in this KIP
    		@Override
    		public void commit() {
        		context().commit();
    		}

    		@Override
    		public long offset() {
        		return context().recordContext().offset();
    		}

    		@Override
    		public long timestamp() {
        		return context().recordContext().timestamp();
    		}

    		@Override
    		public String topic() {
        		return context().recordContext().topic();
    		}

    		@Override
    		public int partition() {
        		return context().recordContext().partition();
    		}
      };

	
 
    if (key != null && value != null) {
        final V2 value2 = valueGetter.get(keyMapper.apply(key, value));
        if (leftJoin || value2 != null) {
            context().forward(key, joiner.apply(value, value2, recordContext));    
        }
    }
}


}

...