Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.streams.processor;


public interface ProcessorContext {
  // existing overloads of forward() that we keep
  <K, V> void forward(K key, V value);

  // existing overloads of forward() that we deprecate
  <K, V> void forward(K key, V value, int childIndex);
  <K, V> void forward(K key, V value, String childName);


  // new overloads of forward()
  <K, V> void forward(K key, V value, longTo timestampto);

   <K, V> void forward(K key, V value, long timestamp// other existing methods omitted for brevity
}

We also add a new auxiliary class to specify optional argument for ProcessorContext#forward()

Code Block
languagejava
package org.apache.kafka.stream.processor;
 
public class To {
  private To(String childName, int childIndex, long timestamp);


  <K,public V>static voidTo forwardchild(K key, V value, long timestamp, String childName);

  // other existing methods omitted for brevityString childName);
  public static To child(int childIndex);
  public static To all();
 
  public To withTimestamp(long timestamp);
}

 

Proposed Changes

We add three one new overloads overload of ProcessorContext#forward() that take an one additional timestamp parameter of type longTo. to specify optional argument like childName, childIndex, or output record timestamp. If users call those the new overloads and set a timestamp on the To object, the output record gets the specified timestamp assigned. For the existing methods or if no timestamp is specified on To, the default contract using the input record timestamp for the output record will be used.

...