Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion thread: here

JIRA: here

Released: <Kafka Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

From 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3209
, certain  certain datasets require modifications to the messages either coming into Kafka (source) or going to another data store (sink). For example, we may require to remove user identifiable information like SSN from Kafka while exporting data into a Hadoop cluster for analysis.

...

For the transformers, we require to introduce an abstract class which others can extend to create their own custom Transformer. 

public abstract class Transformer<T1, T2> {
    public abstract T2 transform(T1 t1);
    public void initialize(Map<String, String> props) {}
}

I was thinking if we should restrict the signature of the transform(). We could pass a standard List<*Record> but that would restrict what the user would be interested in doing.
I propose they could pass around from any object to any object they want. (I see this is mostly going to be a List<> of custom objects, but we could open this up to other structures too, like a HashMap if that’s what they need and say the order is not important like for a Sink to a database system.) This includes modifying all fields in ConnectRecord, assuming that the transformer knows what it’s doing. 
It will be the user who takes care of ensuring that the objects that the transformer produces and the next one that consumes can handle the datatype, and we could provide helpers for standard transformations. We can use generics for that to signify compatibility.

Also, this would need a :

...

property in the Kafka Connect config files to initialise the transformers, in the order specified in the above property. 

 

SourceTask’s poll() method can broken into:
fetch(): Fetch the data from the source, potentially as a list of objects in the native representation of the source
Optional transformer phase.
push(): Convert the list of object or any other format to List<SourceRecord> which gets saved in Kafka
Code POV:
List<SourceRecord> poll() { 
 obj = fetch();
 for(transformer : transformers) {
 obj = transformer.transform(obj);
 }
 push(obj);
}
SinkTask’s push() method can be broken down into:
pull(): Transform the Collection<SinkRecord> to a native type that the destination recognises
Optional transformer phase.
store(): Get the transformed or the native type from pull to store into the destination
Code POV:
void put(Collection<SinkRecord> records) { 
 obj = pull(records);
 for(transformer : transformers) {
 obj = transformer.transform(obj);
 }
 store(obj);
}

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

...