THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
To accommodate the transformers, we can split the “functional” methods into separate parts from a user logical POV:
SourceTask’s poll() method can broken into:
- fetch(): Fetch the data from the source, potentially as a list of objects in the native representation of the source
- Optional transformer phase.
- push(): Convert the list of object or any other format to List<SourceRecord> which gets saved in Kafka
Code POV:
List<SourceRecord> poll() {
...
obj = transformer.transform(obj);
}
push(obj);
}
SinkTask’s push() method can be broken down into:
- pull(): Transform the Collection<SinkRecord> to a native type that the destination recognises
- Optional transformer phase.
- store(): Get the transformed or the native type from pull to store into the destination
Code POV:
void put(Collection<SinkRecord> records) {
obj = pull(records);
for(transformer : transformers) {
obj = transformer.transform(obj);
}
store(obj);
}
Passing the complete collection from Transformer to Transformer will allow modification across a set of messages. For example if someone wants to filter messages based on certain messages received before the current one, that would be possible easily.
Alternatively, we could use a message to message transformation.
...