Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Single Message Transforms (SMT), KIP-66, have greatly improved Connector's usability by enabling processing input/output data without the need for additional streaming applications.
Though, these benefits have been limited by SMTs limited to fields available on the root structure:
This KIP is aimed to include support for nested structures on the existing SMTs where nested structures are used.
Proposed Changes
Nested notation
Using dots tends to be the most intuitive way to access the nested record structures, e.g. jq
tooling already uses it[1] and will cover most of the scenarios.
Dots are already allowed as part of element names on JSON (i.e. Schemaless) records (e.g. {
'nested.key': {'val':42}}
). Instead of escaping them with backslashes, which in JSON configurations will lead to unfriendly configurations, it's proposed to follow a similar approach as CSV to escape double quotes by preceding it with the same character (double quotes in this case).
Then, for transform configuration, double dots can be used to escape existing dots that are part of the field name.
[1] https://stedolan.github.io/jq/manual/#Basicfilters
[2] https://datatracker.ietf.org/doc/html/rfc4180 2.7
> If double quotes are used to enclose fields, then a double-quote appearing inside a field must be escaped by preceding it with another double quote.
Accessing multiple values with deep-scan
There are scenarios where either we want to target multiple fields with the same name at different levels, e.g. arrays or dynamic structures.
For these cases, an asterisk can be used to search all elements within a path:
a.*.b
will accessa
and then search all child objects/arrays for the fieldb
.
If deep-scan is used, it must have only one field after the asterisk level.
Deep scans are expected to return multiple values. The SMT has to define how to proceed when multiple fields are found.
Accessing Arrays
Arrays can be accessed in different ways and at different levels.
- Accessing the whole array: if a path points to an array and the SMT supports it as input, then
a.b
can be used whereb
is an array. - Accessing all elements of the array: if a path points to an array, and its elements are not objects, e.g. string. then the SMT can access all the elements of the array at once using
a.b
where b is an array. - Accessing child elements on all array objects: if a path access an array and its elements are objects, we can access all the objects by providing a path of its child elements, e.g.
a.b.c
access arrayb
and element c in all the items of the array. - Accessing a single item by index: if a path points to an array and then uses an index, then it gets that specific element. if no additional child element is provided, then it accesses the whole object/element.e.g.
a.b.1
accesses the second item of the array. - Accessing elements within a single item by index. If the item of the array is an object, we can access its elements, e.g.
a.b.1.c
to access the second item of the array, and access the fieldc
//TODO add examples to SMTs
Public Interfaces
From the existing list of SMTs, there are the following to be impacted by this change:
New configuration flags
Name | Type | Default | Importance | Documentation |
---|---|---|---|---|
field.syntax.version | STRING | v1 | HIGH | Permitted values: This configuration will affect all the field paths used by the transform. |
These flags will be added conditionally to some SMTs, as described below.
Affected SMTs
Cast
Changes:
- Extend
spec
to support nested notation. - Supports arrays and deep-scan to access multiple fields.
Examples:
scenario | input | smt | output |
---|---|---|---|
1. Nested field. | { "k1": 123, "parent": { "child": { "k2": "123" } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.Cast$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.spec": "k1:string,parent.child.k2:int64" } | { "k1": "123", "parent": { "child": { "k2": 123 } } } |
2. Nested field, when field names include dots | { "k1": 123, "parent.child": { "k2": "123" } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.Cast$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.spec": "k1:string,parent..child.k2:int64" } | { "k1": "123", "parent.child": { "k2": 123 } } |
ExtractField
Changes:
- Extend
field
to support nested notation. - Does not support multiple values (e.g. deep scan or array)
Example:
scenario | input | smt | output |
---|---|---|---|
1. Nested field. | { "k1": 123, "parent": { "child": { "k2": "123" } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.field": "parent.child.k2" } | "123" |
2. Nested field, when field names include dots | { "k1": 123, "parent.child": { "k2": "123" } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.field": "parent..child.k2" } | "123" |
HeaderFrom
Changes:
- Extend
fields
to support nested notation. - As this SMT affects only existing fields, additional configurations will not be required.
- Does not support multiple values (e.g. deep scan or array)
Example:
scenario | input | smt | output |
---|---|---|---|
1. Nested field. | { "k1": 123, "parent": { "child": { "k2": "123" } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.fields": "k1,parent.child.k2", "transforms.smt1.headers": "k1,k2" } | headers: - k1=123 - k2="123" |
2. Nested field, when field names include dots | { "k1": 123, "parent.child": { "k2": "123" } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.fields": "k1,parent..child.k2", "transforms.smt1.headers": "k1,k2" } | headers: - k1=123 - k2="123" |
MaskField
Changes:
- Extend
fields
to support nested notation. - Supports arrays and deep-scan to access multiple fields.
Example:
scenario | input | smt | output |
---|---|---|---|
1. Nested field. | { "k1": 123, "parent": { "child": { "k2": "123" } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.MaskField$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.fields": "parent.child.k2" } | { "k1": 123, "parent": { "child": { "k2": "" } } } |
2. Nested field, when field names include dots | { "k1": 123, "parent.child": { "k2": "123" } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.MaskField$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.fields": "parent..child.k2" } | { "k1": 123, "parent.child": { "k2": "" } } |
ReplaceField
Changes:
- Extend the
include
andexclude
lists - Supports arrays and deep-scan to access multiple fields.
Example:
scenario | input | smt | output |
---|---|---|---|
1. Nested field. Drop field | { "k1": 123, "parent": { "child": { "k2": "123" } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.exclude": "parent.child.k2" } | { "k1": 123, "parent": { "child": { } } } |
2. Nested field. Drop struct | { "k1": 123, "parent": { "child": { "k2": "123" } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.exclude": "parent.child" } | { "k1": 123, "parent": { } } |
3. Nested field. Include field | { "k1": 123, "parent": { "child": { "k2": "123", "k3": "234" } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.include": "parent.child.k2" } | { "parent": { "child": { "k2": "123" } } } |
4. Nested field. Include struct | { "k1": 123, "parent": { "child": { "k2": "123", "k3": "234" } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.include": "parent.child" } | { "parent": { "child": { "k2": "123", "k3": "234" } } } |
5. Nested field, when field names include dots | { "k1": 123, "parent.child": { "k2": "123" } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.renames": "parent..child.k2:field2" } | { "k1": 123, "parent.child": { "field2": "123" } } |
TimestampConverter
Changes:
- Extend
fields
to support nested notation. - Does not support multiple values (e.g. deep scan or array)
Example:
scenario | input | smt | output |
---|---|---|---|
1. Nested field. | { "k1": 123, "parent": { "child": { "k2": 1556204536000 } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.field": "parent.child.k2", "transforms.smt1.format": "yyyy-MM-dd", "transforms.smt1.target.type": "string" } | { "k1": 123, "parent": { "child": { "k2": "2014-04-25" } } } |
2. Nested field, when field names include dots | { "k1": 123, "parent.child": { "k2": 1556204536000 } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.field": "parent..child.k2", "transforms.smt1.format": "yyyy-MM-dd", "transforms.smt1.target.type": "string" } | { "k1": 123, "parent.child": { "k2": "2014-04-25" } } |
ValueToKey
Changes:
- Extend
fields
to support nested notation. - Does not support multiple values (e.g. deep scan or array)
Example:
scenario | input | smt | output |
---|---|---|---|
1. Nested field. | { "k1": 123, "parent": { "child": { "k2": "123" } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.fields": "parent.child.k2" } | "123" |
2. Nested struct to Key. | { "k1": 123, "parent": { "child": { "k2": "123" } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.fields": "parent.child" } | { "k2": "123" } |
3. Nested field, when field names include dots | { "k1": 123, "parent.child": { "k2": "123" } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.fields": "parent..child.k2" } | "123" |
InsertField
Changes:
- Extend
*.field
to support nested notation. - Does not support multiple values (e.g. deep scan or array)
New configurations (additional to field.style
described above):
Name | Type | Default | Importance | Documentation |
---|---|---|---|---|
field.on.missing.parent | STRING | create | MEDIUM | Permitted values: create , ignore . Defines how to react when the field to act on does not have a parent and "field.style" is "nested". If set to "create", then the SMT will create the parent struct/map when it does not exist. If set to "ignore", then it will SMT have no effect. |
field.on.existing.field | STRING | overwrite | MEDIUM | Permitted values: overwrite , ignore . Defines how to react when the field to act on already exists. If set to "overwrite", then the SMT will be applied to the existing field. If set to "ignore", then it will SMT have no effect. |
Example:
scenario | input | smt | output |
---|---|---|---|
1. Nested field. | { "k1": 123, "parent": { "child": { "k2": "123" } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.static.field": "parent.child.k3" "transforms.smt1.static.value": "v3" } | { "k1": 123, "parent": { "child": { "k2": "123", "k3": "v3" } } } |
2. Nested field, when field names include dots | { "k1": 123, "parent.child": { "k2": "123" } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.static.field": "parent..child.k3" "transforms.smt1.static.value": "v3" } | { "k1": 123, "parent.child": { "k2": "123", "k3": "v3" } } |
3. Nested field with the parent missing | { "k1": 123, "parent": { "child": { "k2": "123" } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.static.field": "parent.other.k3" "transforms.smt1.static.value": "v3" } | { "k1": 123, "parent": { "child": { "k2": "123" }, "other": { "k3": "v3" } } } |
4. Nested field with the parent missing, and ignore is set | { "k1": 123, "parent": { "child": { "k2": "123" } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.static.field": "parent.other.k3" "transforms.smt1.static.value": "v3", "transforms.smt1.field.on.missing.parent": "ignore" } | { "k1": 123, "parent": { "child": { "k2": "123" } } } |
5. Nested field with the parent missing | { "k1": 123, "parent": { "child": { "k2": "123" } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.static.field": "parent.child.k2" "transforms.smt1.static.value": "456" } | { "k1": 123, "parent": { "child": { "k2": "456" } } } |
6. Nested field with the parent missing, and ignore is set | { "k1": 123, "parent": { "child": { "k2": "123" } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.static.field": "parent.child.k2" "transforms.smt1.static.value": "456", "transforms.smt1.field.on.existing.field": "ignore" } | { "k1": 123, "parent": { "child": { "k2": "123" } } } |
HoistField
Changes:
Add a
hoisted
config to point to a specific path to hoist.- Does not support multiple values (e.g. deep scan or array)
New configurations:
Name | Type | Default | Importance | Documentation |
---|---|---|---|---|
hoisted | STRING | <empty> | MEDIUM | Path to the element to be hoisted. If empty, the root struct/map is hoisted. |
Examples:
scenario | input | smt | output |
---|---|---|---|
1. Nested field. | { "k1": 123, "parent": { "child": { "k2": "123" } } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.HoistFIeld$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.hoisted": "parent.child.k2", "transforms.smt1.field": "other" } | { "k1": 123, "parent": { "child": { "other": { "k2": "123" } } } } |
2. Nested struct, when field names include dots | { "k1": 123, "parent.child": { "k2": "123" } } | { "transforms": "smt1", "transforms.smt1.type": "org.apache.kafka.connect.transforms.HoistFIeld$Value", "transforms.smt1.field.syntax.version": "v2", "transforms.smt1.hoisted": "parent..child", "transforms.smt1.field": "other" } | { "k1": 123, "other": { "parent.child": { "k2": "123" } } } |
Non-affected SMTs
These SMT do not require nested structure support:
DropHeaders
: Drop one or multiple headers.Filter
: Drops the whole message based on a predicate.InsertHeader
: Insert a specific message to the header.RegexRouter
: Acts on the topic name.SetSchemaMetadata
: Acts on root schema.TimestampRouter
: Acts on timestamp.Flatten
: Acts on the whole key or message.
Compatibility, Deprecation, and Migration Plan
Existing SMT configurations will not be affected by these changes as the default field.style
is plain
, which represents the current behavior.
Rejected Alternatives
Keep ExtractField
as it is and use it multiple times until reaching nested fields
This KIP proposes simplifying this configuration by replacing multiple invocations with only one nested one.
Use dots as the only separator and escape with backslashes when collides
Trying to keep only one separator, one of the alternatives is to use dots to separate; if it collides with the existing field names use backslashes "\" to represent dots that are part of the name e.g. "this.field" (which would refer to the nested field "field" under the top-level "this" field), and "this\.field" (which would refer to the field named "this.field").
However, backslashes are also used by JSON. This could lead unfriendly configurations like "this\\\\.is\\\\.not\\\\.very\\\\.readable"
Use custom separators for edge cases
Using double dots to escape separators is another alternative to try sticking to using only dots as a field separator.
Comparing:
With double dots | With separator |
---|---|
{ "transforms": "cast", "transforms.cast.field.syntax.version": "v2", "transforms.cast.type": "..." "transforms.cast.spec": "address..personal.country:string" } | { "transforms": "cast", "transforms.cast.field.syntax.version": "v2", "transforms.cast.field.separator": "/", "transforms.cast.type": "..." "transforms.cast.spec": "address.personal/country:string", } |
Even if using custom separators represent a more explicit configuration, there is always the possibility that all the separators are already included as part of the field name, leading to issues and request for changes.
To avoid this, this KIP proposes using the approach to precede dots with another to escape itself.
Use JSONPath notation to access nested elements
//TODO
Use named styles instead of syntax versions
//TODO
Potential KIPs
Future KIPs could extend this support for:
- Recursive notation: name a field and apply it to all fields across the schema matching that name, as proposed by
- Access to arrays: Adding notation for arrays (e.g. []) to represent access to arrays and applying SMTs to fields within an array.