Status

Current stateUnder Discussion

Discussion thread: here

JIRA:

Pull Request: #9493

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Common patterns for transforming "real-world" messages that are anything but the most simple schemas will often include a combination of using the Cast, ReplaceField, and Flatten transforms. However, none of these support any kind of transformation on Map or Array fields and Flatten does not even support them at all (the transformation will just fail with an error).

Some of this makes sense – Connect is considered a single message transporter (my own words), and single message transformations are supposed to be a 1-in and 1-out kind of a flow.  Sometimes in reality, however, we will have payloads attached as children in the messages.  I think KSQL is a good tool to approach this problem with on a large scale, but if your Kafka environment is so large or so small that adding KSQL for just one or few exceptions might not make sense. 

I have solved some of these problem with customizations to Cast and ReplaceField transforms and I hope that the community could maybe get some value from the enhancements which I have made.

So based on my experimentation and running different scenarios with customized transforms, the key things that help to give a bit more flexibility with transforming more complicated messages are:

After this, then it works quite well to chain transforms together in a pattern sort of like this (or other variations you can think of):

Public Interfaces

Cast

New configuration parameters will be added to the Cast Connect transform:

The default value is false so any existing connectors should continue to work as they did before.  If a user wants to begin using these options, then they can update their connector config files or set a PUT request to the Connect Rest API for whichever connectors they desire to update.

ReplaceField

New configuration parameters will be added to the ReplaceField Connect transform:

The default value is false so any existing connectors should continue to work as they did before.  If a user wants to begin using these options, then they can update their connector config files or set a PUT request to the Connect Rest API for whichever connectors they desire to update.

Proposed Changes

In order to make this change, a fair amount of the structure and flow of both transforms must be altered. Namely, that instead of just looking for fields based on the configuration or doing a loop through only the top level of the message, both the Schema update and the Value update will now need to call recursive methods which build the schema and value from top to bottom based on the type of fields that are encountered.

Also, to add this ability to cast a complex string as a JSON-formatted string, then we will need to introduce some new dependencies for the transform project. The initial thought is to use an instance of Connect's own JsonConverter and JsonDeserializer so that the usage and dependencies can be self-contained within Kafka and the code required to handle this can be significantly minimized.

In my own experimentation (and what you can see in PR #9493) it has included the following for both transforms:

Compatibility, Deprecation, and Migration Plan

The only impact should be that new functionality is added, which users must update their configuration in order to use it.  Existing functionality should not be impacted and no updated should be needed in order to keep using these transforms the same as before.

Rejected Alternatives

One possible alternative to the "recursive" idea is support for nested field selection via some kind of dotted or path-like notation in the configuration.  This could potentially also be added but at this time the current proposal is to only search out child field names which match what is given in the configuration at no matter which level or how many times they appear within the schema or structure of the message.

Casting complex strings to JSON adds a bit of a variation to what the Cast transform is already doing, however the value which is added is quite large and makes the data much easier to use downstream if it can be given in a more standardized format like this.  It could be done in other ways (such as traversing the message in a loop and use some of the Jackson classes to build new JSON objects etc) which removes the dependencies on the Kafka Connect JSON project, however I feel that this would duplicate the effort which has already been done there and it is easier to implement and support if we just use what is already available in Kafka.