Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: draft [One of "Under Discussion", "Accepted", "Rejected"]

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Certain datasets require modifications to the messages either coming into Kafka (source) or going to another data store (sink). For example, we may require to remove user identifiable information like SSN from Kafka messages while exporting data into a Hadoop cluster for analysis.

Currently, the "data transfer" logic is locked within a single method of the Task classes, thus Kafka Connect API is inflexible for making any modifications. This is more so when a Kafka Connect jar is provided via third party where the user cannot modify the source. Custom modifications from user to user would require modifying the source of Connect API. 

Public Interfaces

For the transformers, we require to introduce an abstract class which others can extend to create their own custom Transformer. 

...

void put(Collection<SinkRecord> records) { 
   obj = pull(records);
   for(transformer : transformers) 
      obj = transformer.transform(obj);
   store(obj);
}

Passing the complete collection from Transformer to Transformer will allow modification across a set of messages. For example if someone wants to filter messages based on certain messages received before the current one, that would be possible easily.
Alternatively, we could use a message to message transformation.

Proposed Changes

Besides breaking the functions push() and poll() of SinkTask and SourceTask respectively, the change introduced is the optional Transformers chain that users can introduce to operate on the message when being passed from one source to another, one of them being Kafka (as consistent with the Kafka Connect API.) 

Users can define and add their own Transformers and the chain based on what modifications they would be interested in performing on the messages.

Compatibility, Deprecation, and Migration Plan

This change will break the interface for SourceTask and SinkTask. 

If we do not want to deprecate this, we could use another class to derive the API from, and do a runtime check of type and call the methods accordingly. This way neither the old code nor the newer API would break, and newer API will support Transformers. 

Test Plan

Tests could include executing Kafka Connect code with and without transformers. With transformers, the modifications that the transformers make in order of the chain should appear in the output compared to the input messages. Without transformers, there should be no change in the messages.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.