Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Changes

  • Add overloaded aggregate method, which accepts additional lateMessagesTopicName as last parameter:

...

  • Code Block
    languagejava
    <VR> KTable<Windowed<K>, VR> aggregate(final 

...

  • Initializer<VR> initializer,

...

  • 
                                           final 

...

  • Aggregator<? super K, ? super V, 

...

  • VR> aggregator,

...

  • 
                                           final Named named,

...

  • 
                                           final 

...

  • Materialized<K, VR, 

...

  • WindowStore<Bytes, byte[]>> materialized,

...

  • 
                                           final String lateMessagesTopicName);


  • Optionally create additional SinkNode, if respective parameter is filled.
  • Conditionally forward messages to the SinkNode
  • Minor change to the forward-implementation: by default each message is sent to all sub-nodes, however new node-for-late-messages should be excluded from generic processing.

...

Alternative approach would be providing api-users ability to define additional handler for late-messages, e.g.

...

Code Block
languagejava
<VR> KTable<Windowed<K>, VR> aggregate(final 

...

Initializer<VR> initializer,

...


                                       final 

...

Aggregator<? super K, ? super V, 

...

VR> aggregator,

...


                                       final Named named,

...


                                       final 

...

Materialized<K, VR, 

...

WindowStore<Bytes, byte[]>> materialized,

...


                                       final 

...

BiConsumer<? super K, ? super 

...

V> lateMessageConsumer);



However, common use-case of kafka-streams is writing data to topics.

...