...
This KIP is aimed to include support for nested structures on the existing SMTs — where this makes sense — and to include the abstractions to reuse this in future SMTswhere nested structures are used.
Public Interfaces
From the existing list of SMTs, there are the following to be impacted by this change:
...
Name | Type | Default | Importance | Documentation |
---|
transforms.<name>.field.style | STRING | plain | HIGH
| Permitted values: plain , nested . Defines how to traverse a record structure to apply a transformation. If set to "plain ", then the transformations will only apply to the elements located at the root of the message. If set to "nested", then nested elements will be affected by the transformations as well. To access nested elements, dotted notation is used. If dots are already included in the field name, then dots itself themselves can be used to represent dots part of the field name. e.g. to access elements from a struct/map named "same.field", the following format can be used to access its elements: "same..field.element" |
Example:
In this Cast transform, to cast access an element "country" inside a struct/map named "address.personal":
...
These flags will be added conditionally to some SMTs, described below.
Affected SMTs
Cast
Changes:
- Extend
spec
to support nested notation.
Examples:
scenario | input | smt | output |
---|
1. Nested field. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.smt1.field.style": "nested",
"transforms.smt1.spec": "k1:string,parent.child.k2:int64"
} |
|
Code Block |
---|
| {
"k1": "123",
"parent": {
"child": {
"k2": 123
}
}
} |
|
2. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": " |
|
...
...
"transforms.smt1.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms. |
|
...
smt1.field.style": "nested",
"transforms.smt1.spec": "k1:string,parent..child.k2:int64"
} |
|
Code Block |
---|
| {
"k1": "123",
"parent.child": {
"k2": 123
}
} |
|
Changes:
- Extend
field
to support nested notation.
Example:
scenario | input | smt | output |
---|
1. Nested field. | |
...
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.style": "nested",
"transforms.smt1.field": "parent.child.k2"
} |
| |
2. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
| |
...
": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.style": "nested",
"transforms.smt1.field": "parent..child.k2"
} |
| |
Changes:
- Extend
fields
to support nested notation. - As this SMT affects only existing fields, additional configurations will not be required.
Example:
scenario | input | smt | output |
---|
1. Nested field. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.fields": "k1,parent.child.k2",
"transforms.smt1.headers": "k1,k2"
} |
|
Code Block |
---|
| headers:
- k1=123
- k2="123" |
|
2. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.fields": "k1,parent..child.k2",
"transforms.smt1.headers": "k1,k2"
} |
|
Code Block |
---|
| headers:
- k1=123
- k2="123" |
|
MaskField
Changes:
- Extend
fields
to support nested notation.
Example:
scenario | input | smt | output |
---|
1. Nested field. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.fields": "parent.child.k2"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": ""
}
}
} |
|
2. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.fields": "parent..child.k2"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": ""
}
} |
|
ReplaceField
Changes:
- Extend the
include
and exclude
lists
Example:
scenario | input | smt | output |
---|
1. Nested field. Drop field |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.exclude": "parent.child.k2"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
}
}
} |
|
2. Nested field. Drop struct |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.exclude": "parent.child"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent": {
}
} |
|
3. Nested field. Include field |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123",
"k3": "234"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.include": "parent.child.k2"
} |
|
Code Block |
---|
| {
"parent": {
"child": {
"k2": "123"
}
}
} |
|
4. Nested field. Include struct |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123",
"k3": "234"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.include": "parent.child"
} |
|
Code Block |
---|
| {
"parent": {
"child": {
"k2": "123",
"k3": "234"
}
}
} |
|
5. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.renames": "parent..child.k2:field2"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"field2": "123"
}
}
|
|
TimestampConverter
Changes:
- Extend
fields
to support nested notation.
Example:
ValueToKey
Changes:
- Extend
fields
to support nested notation.
Example:
InsertField
Changes:
- Extend
field
to support nested notation.
New configurations:
Name | Type | Default | Importance | Documentation |
---|
field.on_missing_parent | STRING | create | MEDIUM | Permitted values: create , fail , ingore . Defines how to react when the field to act on does not have a parent. If set to "create", then the SMT will create the parent struct/map when it does not exist. If set to "fail", then the SMT will throw an exception. If set to "ignore", then it will SMT have no effect. |
field.on_existing_field | STRING | overwrite | MEDIUM | Permitted values: overwrite , fail , ingore . Defines how to react when the field to act on already exists. If set to "overwrite", then the SMT will be applied to the existing field. If set to "fail", then the SMT will throw an exception. If set to "ignore", then it will SMT have no effect. |
Example:
HoistField
Changes:
SMTs affected
Extending the support for field configuration for dotted separation:
...
Will require additional configurations:
HoistField
: add a hoisted
config to point to a specific path to hoist.
New configurations:
Name | Type | Default | Importance | Documentation |
---|
hoisted | STRING | <empty> | MEDIUM | Path to the element to be hoisted. If empty, the root struct/map is hoisted. |
For example:
Code Block |
---|
hoisted = nested.val
field = line
value (before):
{
"nested": {
"val": 42,
"other val": 96
}
}
value (after):
{
"nested": {
"line": {
"val": 42,
},
"other val": 96
}
} |
Non-affected SMTs
These SMT do not require nested structure support:
...
[2] https://datatracker.ietf.org/doc/html/rfc4180 2.7
> If > If double-quotes are used to enclose fields, then a double-quote appearing inside a field must be escaped by preceding it with another double quote.
Examples:
Compatibility, Deprecation, and Migration Plan
...