Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces

New methods

Processors without forcing repartitioning:

  • KStream<K,VOut> KStream#processValues(ProcessorSupplier<K, V, K, VOut> processorSupplier, String... stateStoreNames)
  • KStream<K,VOut> KStream#processValues(ProcessorSupplier<K, V, VoidK, VOut> processorSupplier, Named named, String... stateStoreNames)

...

Code Block
languagejava
  private Topology topology() {
    final var builder = new StreamsBuilder();
    final var input = builder.stream("words", Consumed.with(Serdes.String(), Serdes.String()));

     input.processValues(() -> new ContextualProcessor<String, String, String, String>() {
      @Override
      public void process(Record<String, String> record) {
        for (var s : record.value().split(",")) {
          context().forward(record.withValue("Hello " + s));
        }
      }
    }, Named.as("test")) // doesn't flag repartitioning 

    input.process(() -> new ContextualProcessor<String, String, Integer, String>() {
      @Override
      public void process(Record<String, String> record) {
        for (var s : record.value().split(",")) {
          context().forward(record.withKey(record.value().lenght()).withValue("Hello " + s));
        }
      }
    }, Named.as("test")) // flags repartitioning

    .to("output", Produced.with(Serdes.String(), Serdes.String()));
    return builder.build();
  } 

...

Code Block
languagejava
class ValueProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward> {
//...
    private KForward key;
    
    public void setRecordKey(KForward initialKey) {
        this.key = initialKey;
    }
    
    public void clearRecordKey() {
        this.key = null;
    }

    @Override
    public <K extends KForward, V extends VForward> void forward(Record<K, V> record) {
         delegateif (key != null) {
            if (record.key() != key) {  // referencial equility is enough to enforce that key is not updated
                 throw new IllegalArgumentException("Key has changed while processing the record and requires enforcing repartitioning.");
             }
        }
        delegate.forward(record.withKey(key));
     }
//...
}

...

This ValueProcessorContextis to be used internally at the Processor level, and the method setRecordKey(K) must be used before calling user-defined Processor, and the clearRecordKey()  right after.

NOTE: This API is not meant to be a public interface. Only used internally by the processValues DSL operation to enforce at runtime that the key hasn't changed.

...

Based on the previous example, if the key changes a runtime exception will have to be changed to void to prove that it hasn't changed during the processorbe thrown:

Code Block
  private Topology topology() {
    final var builder = new StreamsBuilder();
    final var input = builder.stream("words", Consumed.with(Serdes.String(), Serdes.String()));
    input.processValues(() -> new ContextualProcessor<String, String, String, String>() {
      @Override
      public void process(Record<String, String> record) {
        for (var s : record.value().split(",")) {
          context().forward(record.withKey((Void) null)"FAIL!").withValue("Hello " + s)); // will throw a Runtime Exception because key has changed on `processValues`
        }
      }
    }, Named.as("test"))
    .to("output", Produced.with(Serdes.String(), Serdes.String()));
    return builder.build();
  } 

...

  • Transform API is broadly adopted for custom processing. Even though most functionality is possible to implement with new   Processor API, this migration is not straightforward and will require reimplementing custom processors.
  • KTable#transformValues is an interesting use-case for ValueTransformerWithKey as the internal value is Change<V> — an internal type to handle record changes on KTable — not exposed as public API. An approach would be to deprecate this method in favor of .toStream().processValues().toTable().

...

This would involve extending KStream/KTable APIs even more with specific Transform variants. If a new Processor is able to can support most Transform in the long-term becoming the adopted option to transform values, flat transform, etc.; then we should start getting the new Processor properly adopted in the DSL and let the usage drive the next steps.

Runtime key validation on KStream#processValues

...

vs. key output Void type

On one hand, with Void type for key output, we force the users to cast to Void and change the key to null, though this can be documented on the API, so the users are aware of the peculiarity of forwarding within processValues. 

On the other hand, keeping the key type as output doesn't require to do any change of keys, but this could lead to key-checking runtime exceptions when changing keys.

This KIP proposes the

...

second option and does runtime key validation to avoid allocation of two Record with withKey() (one for setting to Void null, and another for setting the previous key). Users will need to be aware that using processValues  will lead to runtime exceptions when changing the keys when not supposed to.

References