...
[2] https://datatracker.ietf.org/doc/html/rfc4180 2.7
> If double quotes are used to enclose fields, then a double-quote appearing inside a field must be escaped by preceding it with another double quote.
Accessing values by deep-scan
There are scenarios to target multiple fields with the same name at different levels, e.g. dynamic/unknown structures, in a recursive manner.
For these cases, an asterisk can be used to search all elements within a path (e.g. similar to JsonPath[3], but using asterisk instead of ..
).
a.*.b
will access a
and then search all child objects for the field b
, including arrays.
Deep scans are expected to return multiple paths. If only one element is found, a list of one path is returned. If no path is found, an empty list is returned.
SMTs have to define how to process paths lists.
...
- a. Deep-scan as prefix to a field
Code Block |
---|
{
"k1": { "b": "b1" },
"k2": { "b": "b2" },
"k3": { "b": "b3" },
"b": "b4"
} |
...
...
- b. Deep-scan as prefix to a nested field
Code Block |
---|
{
"k1": { "b": { "c": "c1" } },
"k2": { "b": { "c": "c2" } },
"k3": { "b": { "c": "c3" } },
"b": { "c": "b4" }
} |
...
...
Code Block |
---|
{ "a": {
"k1": { "b": { "c": "c1" } },
"k2": { "b": { "c": "c2" } }
}, "a2": {
"k3": { "b": { "c": "c3" } }
}} |
Public Interfaces
From the existing list of the SMTs, there are the following to be impacted by this change:
New configuration flags
Name | Type | Default | Importance | Documentation |
---|
field.syntax.version | STRING | v1 | HIGH | Permitted values: v1 , v2 . Defines the version of the syntax to access fields. If set to "v1", then the field paths are limited to access the elements at the root level of the struct or map. If set to "v2", the syntax will support accessing nested elements. o access nested elements, dotted notation is used. If dots are already included in the field name, then dots themselves can be used to represent dots part of the field name. e.g. to access elements from a struct/map named "same.field", the following format can be used to access its elements: "same..field.element". This configuration will affect all the field paths used by the transform. |
These flags will be added conditionally to some SMTs, as described below.
Affected SMTs
Cast
Changes:
- Extend
spec
to support nested notation.
Examples:
scenario | input | smt | output |
---|
1. Nested field. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.spec": "k1:string,parent.child.k2:int64"
} |
|
Code Block |
---|
| {
"k1": "123",
"parent": {
"child": {
"k2": 123
}
}
} |
|
2. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.spec": "k1:string,parent..child.k2:int64"
} |
|
Code Block |
---|
| {
"k1": "123",
"parent.child": {
"k2": 123
}
} |
|
Changes:
- Extend
field
to support nested notation.
Example:
scenario | input | smt | output |
---|
1. Nested field. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.field": "parent.child.k2"
} |
| |
2. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.field": "parent..child.k2"
} |
| |
3. Nested field, an object returned. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.field": "parent.child"
} |
|
Code Block |
---|
| { "k2": "123" } |
|
Changes:
- Extend
fields
to support nested notation.
Example:
scenario | input | smt | output |
---|
1. Nested field. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "k1,parent.child.k2",
"transforms.smt1.headers": "k1,k2"
} |
|
Code Block |
---|
| headers:
- k1=123
- k2="123" |
|
2. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "k1,parent..child.k2",
"transforms.smt1.headers": "k1,k2"
} |
|
Code Block |
---|
| headers:
- k1=123
- k2="123" |
|
MaskField
Changes:
...
Find the prefix element (a).
If found, scan the nested structure as in the scenarios 1.a and 1.b.
...
...
Code Block |
---|
{ "a": [ "v1", "v2", "v3"] } |
...
...
Code Block |
---|
{ "a": {
"k1": "v1",
"k2": "v1",
"k3": { "b": "c" }
} |
...
...
Code Block |
---|
{ "a": {
"k1": "v1",
"k2": "v1"
} |
...
Code Block |
---|
{ "*": {
"k1": "v1",
"k2": "v1"
} |
...
If an asterisk is already being used as a field name, the to target that field, a double-asterisk is used.
This is only needed when the field's full name is only asterisks.
...
...
Code Block |
---|
{ "**": {
"k1": "v1",
"k2": "v1"
} |
...
Accessing Arrays
Arrays can be accessed in different ways and at different levels.
...
Code Block |
---|
{ "a": [ "a1", "a2", "a3"] |
...
...
Code Block |
---|
{ "a": [ "a1", "a2", "a3"] |
...
...
Code Block |
---|
{ "a": [ "a1", "a2", "a3"] |
...
a.<index>
a.0
...
...
Code Block |
---|
{ "a": [ { "b": "b1" }, { "b": "b2" } ] |
...
...
Code Block |
---|
{ "a": [ { "b": "b1" }, { "b": "b2" } ] |
...
Public Interfaces
From the existing list of the SMTs, there are the following to be impacted by this change:
New configuration flags
...
Permitted values: v1
, v2
. Defines the version of the syntax to access fields. If set to "v1", then the field paths are limited to access the elements at the root level of the struct or map. If set to "v2", the syntax will support accessing nested elements. o access nested elements, dotted notation is used. If dots are already included in the field name, then dots themselves can be used to represent dots part of the field name. e.g. to access elements from a struct/map named "same.field", the following format can be used to access its elements: "same..field.element".
This configuration will affect all the field paths used by the transform.
These flags will be added conditionally to some SMTs, as described below.
Affected SMTs
Cast
Changes:
...
- to support nested notation.
- Supports arrays and deep-scan to access multiple fields.
- If the paths returned do not match a supported type to be converted by spec, then ignores.
Example:
Examples:
scenario | input | smt | output |
---|
1. Nested field. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
Cast$Value
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1. |
|
speck1:string,:int64"",
"parent": {
"child": {
"k2": |
|
123 |
2. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
Cast$Value
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1. |
|
speck1:string,:int64"",
"parent.child": {
"k2": |
|
1233. Multiple paths found | ReplaceField
Changes:
- Extend the
include
and exclude
lists
Example:
scenario | input | smt | output |
---|
1. Nested field. Drop field | |
codeparent1parent": {
"child": {
"k2": "123"
}
} |
|
,
"parent2": {
"child": {
"k2": "123"
}
}
} Code Block |
---|
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
Cast$Value
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1. |
|
speck1:string,*:int64""parent1 "k2": 123
},
"parent2": {
"child": {
"k2": 123
}
}
}4. Multiple paths found, but some types do not match and are ignored | |
2. Nested field. Drop struct | |
codeparent1parent": {
"child": {
"k2": "123"
}
} |
|
,
"parent2": {
"child": {
"k2": {}
}
}
} Code Block |
{
"transforms": "smt1" |
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
Cast$Value
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1. |
|
speck1:string,*.k2:int64""parent1 |
3. Nested field. Include field | |
child{ "k2": 123 }
},
"parent2child{k2{} Changes:
...
Example:
scenario | input | smt | output |
---|
1. Nested field. | ...
- The result produces an array, even for one or no field is found.
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} Code Block |
---|
|
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
ExtractField$Value
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1. |
|
fieldinclude": "parent.child.k2"
} |
|
2. Nested field, when field names include dotsk1123,parent.child": {
"k2": "123"
}
}
} |
|
4. Nested field. Include struct | |
transforms"smt1""transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.syntax.version "parent": {
"child": {
"k2": " |
|
v2"transforms.smt1.field"parent..child.k2"
}3. Nested field, an object returned. | Code Block |
---|
|
{
"k1": 123,
"parent": {
"child": {
"k2": "123"
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
ExtractField$Value
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1. |
|
fieldinclude": "parent.child"
} |
| |
k2"123" }3. Nested field, an array returned. | Code Block |
---|
|
{
k1123, "parent1":{
child{k2123 |
5. Nested field, when field names include dots | |
parent2{ 234"
} |
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
ExtractField$Value
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1. |
|
field*parent..child.k2:field2"
} |
| |
[", "234" ] ...
,
"parent.child": {
"field2": "123"
}
}
|
|
TimestampConverter
Changes:
- Extend
fields
to support nested notation.Does not support multiple values (e.g. deep scan or array), if multiple paths are found, only the first one is used.
Example:
scenario | input | smt | output |
---|
1. Nested field. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": |
|
"123"
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
HeaderFrom$ValueTimestampConverter$Value", |
|
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1. |
|
fieldsk1,parent.child.k2",
"transforms.smt1. |
|
headersk1,k2"
yyyy-MM-dd",
"transforms.smt1.target.type": "string"
} |
| |
headers:-=123
- k2="123"": 123,
"parent": {
"child": {
"k2": "2014-04-25" }
}
} |
|
2. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": |
|
"123" |
Code Block |
---|
|
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
HeaderFrom$ValueTimestampConverter$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1. |
|
fieldsk1,parent..child.k2",
"transforms.smt1. |
|
headers Code Block |
---|
|
headers:
- k1=123
- k2="123" |
3. Nested field, an array returned. | k1,k2"
}yyyy-MM-dd",
"transforms.smt1.target.type": "string"
}
|
| |
parent1": {
"
123
}
}ValueToKey
Changes:
- Extend
fields
to support nested notation.
Example:
scenario | input | smt | output |
---|
1. Nested field. | |
parent2parent": {
"child": {
"k2": " |
|
234 |
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
ExtractField$Value
fieldsk1,*.child.k2headersk1,headers:-k1=123
- k2="123"MaskField
Changes:
- Extend
fields
to support nested notation. - Supports arrays and deep-scan to access multiple fields.
Example:
scenario | input | smt | output |
---|
1. Nested field. | Code Block |
---|
|
{
"k1""k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
MaskField$ValueValueToKey", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "parent.child |
|
.k2language | jsk1,
"parent: {
"child":{
"k2": ""
}
}
}2. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
MaskField$ValueValueToKey", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "parent..child.k2"
} |
| |
{
"k1: ,
"parent.child": {
"k2": ""
}
}1. Nested field. | Code Block |
---|
|
{
"k1": 123,
"parent1": {
"child": {
"k2": "123"
}
},
"parent2": {
"child": {
"k2": "234"
}
}
} |
Code Block |
---|
|
InsertField
Changes:
- Extend
*.field
to support nested notation.
New configurations (additional to field.syntax.version
described above):
Name | Type | Default | Importance | Documentation |
---|
field.on.missing.parent | STRING | create | MEDIUM | Permitted values: create , ignore . Defines how to react when the field to act on does not have a parent and "field.style" is "nested". If set to "create", then the SMT will create the parent struct/map when it does not exist. If set to "ignore", then it will SMT have no effect. |
field.on.existing.field | STRING | overwrite | MEDIUM | Permitted values: overwrite , ignore . Defines how to react when the field to act on already exists. If set to "overwrite", then the SMT will be applied to the existing field. If set to "ignore", then it will SMT have no effect. |
Example:
scenario | input | smt | output |
---|
1. Nested field. |
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.MaskField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "*.child.k2"
}parent1parent": {
"child": {
"k2": "123"
}
|
|
},
"parent2 |
Code Block |
---|
| {
"transforms": |
|
{
"child": {
"k2": ""
}
}
}ReplaceField
Changes:
...
Example:
scenario | input | smt | output |
---|
1. Nested field. Drop field | "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent.child.k3"
"transforms.smt1.static.value": "v3"
} |
| |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123",
"k3": "v3"
}
}
} |
|
2. Nested field, when field names include dots | |
transforms"123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
ReplaceField$ValueInsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static. |
|
excludek2"k3"
"transforms.smt1.static.value": "v3"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent.child": {
" |
|
child{ }2. Drop structwith the parent missing |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
ReplaceField$ValueInsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static. |
|
excludechild} Code Block |
---|
|
{
"k1": 123,
"parent": {
}
} |
3. Nested field. Include field"transforms.smt1.static.value": "v3"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
},
"other": {
"k3": " |
|
234 |
4. Nested field with the parent missing, and ignore is set | |
transforms"smt1""transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.include": "parent.child.k2"
} Code Block |
---|
|
{
"parent" "parent": {
"child": {
"k2": "123" |
|
4. Nested field. Include struct | k1123 "parent": {
"child": {
"k2": "123",
"k3": "234"
}
}
} Code Block |
---|
|
{
"transforms": "smt1",
"transforms.smt1.type": ""transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
ReplaceField$ValueInsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static. |
|
includechild"
k3"
"transforms.smt1.static.value": "v3",
"transforms.smt1.field.on.missing.parent": "ignore"
} |
| |
parent{ child k2"123",k3234 , when field names include dotswith the parent missing |
Code Block |
---|
| {
"k1": 123,
"parent |
|
.": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
ReplaceField$ValueInsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static. |
|
renames.:field2} Code Block |
---|
|
{
"k1": 123,
"parent.child": {
"field2"transforms.smt1.static.value": " |
|
123
}
6. Multiple fields | parent1parent": {
"child": {
"k2": " |
|
123
|
6. Nested field with the parent missing, and ignore is set | |
}parent2parent": {
"child": {
"k2": " |
|
234 |
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
ReplaceField$ValueInsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1. |
|
renames*:field2} Code Block |
---|
|
{
"k1"transforms.smt1.static.value": |
|
123 "parent1": {
"child": {
"field2": "123"
}
}"transforms.smt1.field.on.existing.field": "ignore"
} |
| |
parent2field2234 ...
HoistField
Changes:
- Extend
fields
to support nested notation. - Supports arrays and deep-scan to access multiple fields.
...
Add a hoisted
config to point to a specific path to hoist.
New configurations:
Name | Type | Default | Importance | Documentation |
---|
hoisted | STRING | <empty> | MEDIUM | Path to the element to be hoisted. If empty, the root struct/map is hoisted. |
Examples:
scenario | input | smt | output |
---|
1. Nested field. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": |
|
1556204536000 |
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
TimestampConverter$ValueHoistFIeld$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1. |
|
fieldhoisted": "parent.child.k2",
"transforms.smt1. |
|
formatyyyy-MM-dd,
"transforms.smt1.target.type": "string"
} |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"other": {
"k2": " |
|
2014-04-25" fieldstruct, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
|
|
1556204536000 }
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms. |
|
TimestampConverter$ValueHoistFIeld$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1. |
|
field.k2format Code Block |
yyyy-MM-dd",
"transforms.smt1.target.type": "string"
}
| {
"k1": 123,
"other": {
"parent.child": {
|
|
2014-04-25" ValueToKey
Changes:
- Extend
fields
to support nested notation. - Supports arrays and deep-scan to access multiple fields.
- The result produces an array, even for one or no field is found.
Example:
Non-affected SMTs
These SMT do not require nested structure support:
DropHeaders
: Drop one or multiple headers.Filter
: Drops the whole message based on a predicate.InsertHeader
: Insert a specific message to the header.RegexRouter
: Acts on the topic name.SetSchemaMetadata
: Acts on root schema.TimestampRouter
: Acts on timestamp.Flatten
: Acts on the whole key or message.
Compatibility, Deprecation, and Migration Plan
Existing SMT configurations will not be affected by these changes as the default field.style
is plain
, which represents the current behavior.
Rejected Alternatives
Keep ExtractField
as it is and use it multiple times until reaching nested fields
This KIP proposes simplifying this configuration by replacing multiple invocations with only one nested one.
Use dots as the only separator and escape with backslashes when collides
Trying to keep only one separator, one of the alternatives is to use dots to separate; if it collides with the existing field names use backslashes "\" to represent dots that are part of the name e.g. "this.field" (which would refer to the nested field "field" under the top-level "this" field), and "this\.field" (which would refer to the field named "this.field").
However, backslashes are also used by JSON. This could lead unfriendly configurations like "this\\\\.is\\\\.not\\\\.very\\\\.readable"
Use custom separators for edge cases
Using double dots to escape separators is another alternative to try sticking to using only dots as a field separator.
Comparing:
With double dots | With separator |
---|
Code Block |
---|
{
"transforms": "cast",
"transforms.cast.field.syntax.version": "v2",
"transforms.cast.type": "..."
"transforms.cast.spec": "address..personal.country:string"
} |
|
Code Block |
---|
{
"transforms": "cast",
"transforms.cast |
|
scenario | input | smt | output |
---|
1. Nested field. | Code Block |
---|
|
{
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
Code Block |
---|
|
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "parent.child.k2"
} |
2. Nested struct to Key. | Code Block |
---|
|
{
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
Code Block |
---|
|
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.smt1.field.syntax.version": "v2",
"transforms.cast. |
|
smt1fieldsparent.child
} Code Block |
{
"k2,
"transforms.cast.type": " |
|
123
}3. Nested field, when field names include dots | Code Block |
---|
|
{
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
Code Block |
---|
|
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "parent..child.k2"
} |
4. Multiple values to key | Code Block |
---|
|
{
"k1": 123,
"parent1": {
"child": {
"k2": "123"
}
},
"parent2": {
"child": {
"k2": "234"
}
}
} |
Code Block |
---|
|
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "*.child.k2"
} |
Code Block |
---|
[ "123", "234 ] |
InsertField
Changes:
- Extend
*.field
to support nested notation. - Does not support multiple values (e.g. deep scan or array)
New configurations (additional to field.syntax.version
described above):
...
Name
...
Example:
...
Code Block |
---|
|
{
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
Code Block |
---|
|
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent.child.k3"
"transforms.smt1.static.value": "v3"
} |
Code Block |
---|
|
{
"k1": 123,
"parent": {
"child": {
"k2": "123",
"k3": "v3"
}
}
} |
...
Code Block |
---|
|
{
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
Code Block |
---|
|
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent..child.k3"
"transforms.smt1.static.value": "v3"
} |
Code Block |
---|
|
{
"k1": 123,
"parent.child": {
"k2": "123",
"k3": "v3"
}
} |
...
Code Block |
---|
|
{
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
Code Block |
---|
|
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent.other.k3"
"transforms.smt1.static.value": "v3"
} |
Code Block |
---|
|
{
"k1": 123,
"parent": {
"child": {
"k2": "123"
},
"other": {
"k3": "v3"
}
}
} |
...
Code Block |
---|
|
{
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
Code Block |
---|
|
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent.other.k3"
"transforms.smt1.static.value": "v3",
"transforms.smt1.field.on.missing.parent": "ignore"
} |
Code Block |
---|
|
{
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
...
Code Block |
---|
|
{
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
Code Block |
---|
|
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent.child.k2"
"transforms.smt1.static.value": "456"
} |
Code Block |
---|
|
{
"k1": 123,
"parent": {
"child": {
"k2": "456"
}
}
} |
...
Code Block |
---|
|
{
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
Code Block |
---|
|
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent.child.k2"
"transforms.smt1.static.value": "456",
"transforms.smt1.field.on.existing.field": "ignore"
} |
Code Block |
---|
|
{
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
HoistField
Changes:
Add a hoisted
config to point to a specific path to hoist.
- Does not support multiple values (e.g. deep scan or array)
New configurations:
...
Name
...
Examples:
...
"transforms.cast.spec": "address.personal/country:string",
} |
|
Even if using custom separators represent a more explicit configuration, there is always the possibility that all the separators are already included as part of the field name, leading to issues and request for changes.
To avoid this, this KIP proposes using the approach to precede dots with another to escape itself.
Use JSONPath notation to access nested elements
JSONPath[1] was a proposed alternative to the nested notation. A drafted version of the KIP with examples using the proposed notation is outlined here: [DRAFT] KIP-821: Connect Transforms support for nested structures (JsonPath-based draft)
The following limitations were found:
- The JSONPath spec is too extensive for the use-cases included in this KIP.
- A sub-set of JSONPath was proposed, but the custom spec ends up being more complex than the notation proposed here.
- A sub-set will imply not using existing dependencies. Though adding an existing dependency would also reduce the chance of the KIP being accepted as the risk for external vulnerabilities will increase.
- The sub-set will require users to learn JSONPath, and then what's covered and what's not by the custom implementation.
Given these cons, the KIP is preferring the dotted notation.
[1] https://github.com/json-path/JsonPath
Use named styles instead of syntax versions
Was considered to use a configuration to name the styles to target fields:
field.style
with valid values: "plain", "nested".
Even though this configuration is self-describing, it limits the semantics of the values.
Instead, the KIP is considering a versioned configuration "field.syntax.version" to avoid affecting current behavior and make it easier to extend by including compatible changes on the same version.
Use configuration flag per SMT instead of per-field configuration
Instead of adding a configuration under each field config, e.g. include.syntax.version
, the KIP proposed to have a single configuration per SMT, to affect all the input fields.
Potential Improvements
Support Array access
Adding notation for arrays (e.g. [], or array.<offset>) to access to array elements and apply SMTs to fields within the array.
This has to consider fields that could be including [
, ]
, or numbers as part of their names and how to escape them.
Support Deep-Scan
Supported by JsonPath, could allow applying SMTs to multiple fields with the same name at different locations of the structure.
At the moment is not clear how to escape the character used for deep-scan. e.g. if using *
...
Code Block |
---|
|
{
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
Code Block |
---|
|
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HoistFIeld$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.hoisted": "parent.child.k2",
"transforms.smt1.field": "other"
} |
Code Block |
---|
|
{
"k1": 123,
"parent": {
"child": {
"other": {
"k2": "123"
}
}
}
} |
...
Code Block |
---|
|
{
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
Code Block |
---|
|
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HoistFIeld$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.hoisted": "parent..child",
"transforms.smt1.field": "other"
}
|
Code Block |
---|
|
{
"k1": 123,
"other": {
"parent.child": {
"k2": "123"
}
}
} |
Non-affected SMTs
These SMT do not require nested structure support:
DropHeaders
: Drop one or multiple headers.Filter
: Drops the whole message based on a predicate.InsertHeader
: Insert a specific message to the header.RegexRouter
: Acts on the topic name.SetSchemaMetadata
: Acts on root schema.TimestampRouter
: Acts on timestamp.Flatten
: Acts on the whole key or message.
Compatibility, Deprecation, and Migration Plan
Existing SMT configurations will not be affected by these changes as the default field.style
is plain
, which represents the current behavior.
Rejected Alternatives
Keep ExtractField
as it is and use it multiple times until reaching nested fields
This KIP proposes simplifying this configuration by replacing multiple invocations with only one nested one.
Use dots as the only separator and escape with backslashes when collides
Trying to keep only one separator, one of the alternatives is to use dots to separate; if it collides with the existing field names use backslashes "\" to represent dots that are part of the name e.g. "this.field" (which would refer to the nested field "field" under the top-level "this" field), and "this\.field" (which would refer to the field named "this.field").
However, backslashes are also used by JSON. This could lead unfriendly configurations like "this\\\\.is\\\\.not\\\\.very\\\\.readable"
Use custom separators for edge cases
Using double dots to escape separators is another alternative to try sticking to using only dots as a field separator.
Comparing:
...
Code Block |
---|
{
"transforms": "cast",
"transforms.cast.field.syntax.version": "v2",
"transforms.cast.type": "..."
"transforms.cast.spec": "address..personal.country:string"
} |
Code Block |
---|
{
"transforms": "cast",
"transforms.cast.field.syntax.version": "v2",
"transforms.cast.field.separator": "/",
"transforms.cast.type": "..."
"transforms.cast.spec": "address.personal/country:string",
} |
Even if using custom separators represent a more explicit configuration, there is always the possibility that all the separators are already included as part of the field name, leading to issues and request for changes.
To avoid this, this KIP proposes using the approach to precede dots with another to escape itself.
Use JSONPath notation to access nested elements
JSONPath[1] was a proposed alternative to the nested notation. A drafted version of the KIP with examples using the proposed notation is outlined here: [DRAFT] KIP-821: Connect Transforms support for nested structures (JsonPath-based draft)
The following limitations were found:
- The JSONPath spec is too extensive for the use-cases included in this KIP.
- A sub-set of JSONPath was proposed, but the custom spec ends up being more complex than the notation proposed here.
- A sub-set will imply not using existing dependencies. Though adding an existing dependency would also reduce the chance of the KIP being accepted as the risk for external vulnerabilities will increase.
- The sub-set will require users to learn JSONPath, and then what's covered and what's not by the custom implementation.
Given these cons, the KIP is preferring the dotted notation.
[1] https://github.com/json-path/JsonPath
Use named styles instead of syntax versions
Was considered to use a configuration to name the styles to target fields:
field.style
with valid values: "plain", "nested".
Even though this configuration is self-describing, it limits the semantics of the values.
Instead, the KIP is considering a versioned configuration "field.syntax.version" to avoid affecting current behavior and make it easier to extend by including compatible changes on the same version.
Use configuration flag per SMT instead of per-field configuration
Instead of adding a configuration under each field config, e.g. include.syntax.version
, the KIP proposed to have a single configuration per SMT, to affect all the input fields .