Status
Current state: draft [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here
JIRA: here
Released: [0.10.1.0]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Certain datasets require modifications to the messages either coming into Kafka (source) or going to another data store (sink). For example, we may require to remove user identifiable information like SSN from Kafka messages while exporting data into a Hadoop cluster for analysis.
Currently, the "data transfer" logic is locked within a single method of the Task classes, thus Kafka Connect API is inflexible for making any modifications. This is more so when a Kafka Connect API is provided via third party where the user cannot modify the source. Custom modifications from user to user would require modifying the source of Connect API.
Public Interfaces
For the transformers, we require to introduce an abstract class which others can extend to create their own custom Transformer.
public abstract class Transformer<T1, T2> {
public abstract T2 transform(T1 t1);
public void initialize(Map<String, String> props) {}
}
Also, this would need a :
transformers=org.AbcTransformer,org2.XyzTransformer
property in the Kafka Connect config files to initialise the transformers, in the order specified in the above property.
To accommodate the transformers, we can split the “functional” methods into separate parts from a user logical POV:
Proposed Changes
Besides breaking the functions push() and poll() of SinkTask and SourceTask respectively, the change introduced is the optional Transformers chain that users can introduce to operate on the message when being passed from one source to another, one of them being Kafka (as consistent with the Kafka Connect API.)
Users can define and add their own Transformers and the chain based on what modifications they would be interested in performing on the messages.
Compatibility, Deprecation, and Migration Plan
This change will break the interface for SourceTask and SinkTask.
If we do not want to deprecate this, we could use another class to derive the API from, and do a runtime check of type and call the methods accordingly. This way neither the old code nor the newer API would break, and newer API will support Transformers.
Test Plan
Tests could include executing Kafka Connect code with and without transformers. With transformers, the modifications that the transformers make in order of the chain should appear in the output compared to the input messages. Without transformers, there should be no change in the messages.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.