Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

List<SourceRecord> poll() { 
   obj = fetch();
   for(transformer : transformers) { 
       obj = transformer.transform(obj);
  }  push(obj);
}

SinkTask’s push() method can be broken down into:

...

void put(Collection<SinkRecord> records) { 
   obj = pull(records);
   for(transformer : transformers) { 
      obj  obj = transformer.transform(obj);
  }  store(obj);
}

Passing the complete collection from Transformer to Transformer will allow modification across a set of messages. For example if someone wants to filter messages based on certain messages received before the current one, that would be possible easily.
Alternatively, we could use a message to message transformation.

...