Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

void put(Collection<SinkRecord> records) { 
 obj = pull(records);
 for(transformer : transformers) {
 obj = transformer.transform(obj);
 }
 store(obj);
}

Passing the complete collection from Transformer to Transformer will allow modification across a set of messages. For example if someone wants to filter messages based on certain messages received before the current one, that would be possible easily.
Alternatively, we could use a message to message transformation.

Proposed Changes

Besides breaking the functions push() and poll() of SinkTask and SourceTask respectively, the change introduced is the optional Transformers chain that users can introduce to operate on the message when being passed from one source to another, one of them being Kafka (as consistent with the Kafka Connect API.) 

...