...
Discussion thread: here
JIRA: here
Jira |
---|
server | ASF JIRA |
---|
columnIds | issuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution |
---|
columns | key,summary,type,created,updated,due,assignee,reporter,priority,status,resolution |
---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
---|
key | KAFKA-13656 |
---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Single Message Transforms (SMT), introduced withKIP-66, have greatly improved Connector's usability by enabling transforming the transformation of input/output data format formats and content contents without the need for additional streaming applications.
ThoughHowever, these benefits have been limited by SMTs constraint SMT's limitation to only lookup for act on fields available on at the root of the data structure.
Here are some tickets/comments related to this limitation:
This KIP is aimed aims to include support for nested structures on the existing SMTs.
Proposed Changes
Nested notation
Using dots Dotted notation tends to be the most intuitive way to describe the path paths to nested fields in a record structures, structure and will cover most of the scenarios. e.g. jq
already uses it[1], and will cover most of the scenarios.
However, dots are already allowed as part of element names on JSON (i.e. Schemaless) records(e.field names in JSON could include dots(e.g. {
'nested.keyfield': {'valvalue':42}}
).
Therefore, the nested notation must support escaping dots that could be valid field names.
Instead of escaping them dots with backslashes — which in JSON configurations will lead leads to unfriendly configurations — it's proposed to follow a similar approach as the CSV formatJSONata[2] to escape double-quotes by preceding it with the same character (double quotes in this case).Then, for transform configurations, double-dots can be used to escape existing dots that are part of the field name.where backticks are used to define field names with dots, e.g. `nested.field`
[1] https://stedolan.github.io/jq/manual/#Basicfilters
[2] https://datatrackerdocs.ietfjsonata.org/doc/html/rfc4180 2.7
> If double quotes are used to enclose fields, then a double-quote appearing inside a field must be escaped by preceding it with another double quote.
Public Interfaces
...
simple#examples
> Field references containing whitespace or reserved tokens can be enclosed in backticks
Rules
- 1. If field names do not contain dots (.), then only use dots to represent nested field paths.
- 2. If field names contain dots, then:
- wrap the field name with a backtick pair (`...`) by
- adding an opening backtick at the beginning of the field name (beginning of a path, or after a dot)
- adding a closing backtick at the end of the field name (end of the path, or before the next dot)
- If a field is wrapped and doesn't contain dots, is processed the same way: field name within the wrapping backticks is used
- 3. If a field name includes backticks, then:
- If a backtick is followed by a dot in the field name, then the backtick should be escaped with a backslash to signal that the backtick is part of the name and not closing a backtick pair.
- Backslashes (\) do not need to be escaped. If the backslash happens to be part of the field name and before a backtick is to be escaped, then add another backslash.
- else, backticks do not require escape
- 4. If wrapping backtick pairs are incomplete, the Connect configuration must fail fast to avoid getting ambiguous paths deployed.
Examples
Scenario | Nested struct | Path |
---|
- Normal (no dots or backticks on field names)
| foo: bar: baz: val | OK: foo.bar.baz |
2. Field names including dots | foo: bar.baz: val | OK: foo.`bar.baz` |
2.1 Using backticks within a field name without dots | foo: bar: baz: val | OK: foo.bar.baz ERROR: foo.`bar.baz: no pair ERROR: foo.bar`.baz: no pair |
3. Field names including backticks | foo: ba`r: baz: val | OK: foo.ba`r.baz OK: foo.`ba`r`.baz |
3.1. Field names including backticks at the wrapping position | foo: bar`.`baz: val | OK: foo.`bar\`.\`baz` ERROR: foo.`bar\`.`baz`: no pair ERROR: foo.`bar`.`baz`: valid but different path (see 2.1) |
3.2. Field names including dots and backticks between a backtick pair | foo: b`ar.baz: val | OK: foo.`b`ar.baz` |
3.3. Field names including backslash and backticks at the wrapping position | foo: bar\`.`baz: val | OK: foo.`bar\\`.\`baz` |
3.4. Field names wrapped by backticks | foo: `bar`: baz: val | OK: foo.`\`bar\``.baz ERROR: foo.`bar`.baz: valid but different path (see 2.1) |
Affected SMTs
These SMTs will include support for nested structures:
Cast
ExtractField
HeaderFrom
MaskField
ReplaceField
TimestampConverter
ValueToKey
InsertField
HoistField
Non-affected SMTs
These SMTs do not require nested structure support:
DropHeaders
: Drop one or multiple headers.Filter
: Drops the whole message based on a predicate.InsertHeader
: Insert a specific message to the header.RegexRouter
: Acts on the topic name.SetSchemaMetadata
: Acts on root schema.TimestampRouter
: Acts on timestamp.Flatten
: Acts on the whole key or message.
Public Interfaces
New configuration flags
Name | Type | Default | Importance | Documentation |
---|
field.syntax.version | STRING | v1V1 | HIGH | Permitted values: v1 V1 , v2 V2 . Defines the version of the syntax to access fields. If set to "v1V1", then the field paths are limited to access the elements at the root level of the struct or map. If set to "v2V2", the syntax will support accessing nested elements. o To access nested elements, dotted notation is used. If dots are already included in the field name, then dots themselves backtick pairs can be used to represent dots part of the field namewrap field names containing dots. e.g. to access elements from a struct/map named "samefoo.fieldbar", the following format can be used to access its elements: "same`foo.bar`.field.elementbaz". This configuration will affect all the field paths used by the transform. |
These flags This flag will be added conditionally to some SMTs, as described below.
Cast
Changes:
- Extend
spec
to support nested notation.
Examples:
scenario | input | smtSMT | output |
---|
1. Nested field. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.spec": "k1:string,parent.child.k2:int64"
} |
|
Code Block |
---|
| {
"k1": "123",
"parent": {
"child": {
"k2": 123
}
}
} |
|
2. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.spec": "k1:string,parent`parent.child`.child.k2:int64"
} |
|
Code Block |
---|
| {
"k1": "123",
"parent.child": {
"k2": 123
}
} |
|
...
- Extend
field
to support nested notation.
Example:
scenario | input | smtSMT | output |
---|
1. Nested field. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.field": "parent.child.k2"
} |
| |
2. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.field": "parent`parent..childchild`.k2"
} |
| |
3. Nested field, an object returned. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.field": "parent.child"
} |
|
Code Block |
---|
| { "k2": "123" } |
|
...
- Extend
fields
to support nested notation.
Example:
scenario | input | smtSMT | output |
---|
1. Nested field. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "k1,parent.child.k2",
"transforms.smt1.headers": "k1,k2"
} |
|
Code Block |
---|
| headers:
- k1=123
- k2="123" |
|
2. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "k1,parent`parent.child`.child.k2",
"transforms.smt1.headers": "k1,k2"
} |
|
Code Block |
---|
| headers:
- k1=123
- k2="123" |
|
...
- Extend
fields
to support nested notation.Supports arrays and deep-scan to access multiple fieldsnotation.
Example:
scenario | input | smtSMT | output |
---|
1. Nested field. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.MaskField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "parent.child.k2"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": ""
}
}
} |
|
2. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.MaskField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "parent`parent..childchild`.k2"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": ""
}
} |
|
...
- Extend the
include
and exclude
lists
Example:
scenario | input | smtSMT | output |
---|
1. Nested field. Drop field |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.exclude": "parent.child.k2"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
}
}
} |
|
2. Nested field. Drop struct |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.exclude": "parent.child"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent": {
}
} |
|
3. Nested field. Include field |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123",
"k3": "234"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.include": "parent.child.k2"
} |
|
Code Block |
---|
| {
"parent": {
"child": {
"k2": "123"
}
}
} |
|
4. Nested field. Include struct |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123",
"k3": "234"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.include": "parent.child"
} |
|
Code Block |
---|
| {
"parent": {
"child": {
"k2": "123",
"k3": "234"
}
}
} |
|
5. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.renames": "parent`parent..childchild`.k2:field2"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"field2": "123"
}
}
|
|
...
- Extend
fields
to support nested notation.
Example:
scenario | input | smtSMT | output |
---|
1. Nested field. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": 1556204536000 }
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.field": "parent.child.k2",
"transforms.smt1.format": "yyyy-MM-dd",
"transforms.smt1.target.type": "string"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "2014-04-25" }
}
} |
|
2. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": 1556204536000 }
}
} |
|
Code Block |
---|
|
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.field": "parent`parent..childchild`.k2",
"transforms.smt1.format": "yyyy-MM-dd",
"transforms.smt1.target.type": "string"
}
|
|
Code Block |
---|
| {
"k1": 123,
"parent.child": { "k2": "2014-04-25" }
} |
|
...
- Extend
fields
to support nested notation.
Example:
scenario | input | smtSMT | output |
---|
1. Nested field. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "parent.child.k2"
} |
| |
2. Nested struct to Key. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "parent.child"
} |
|
Code Block |
---|
{
"k2": "123"
} |
|
3. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "parent`parent..childchild`.k2"
} |
| |
...
Name | Type | Default | Importance | Documentation |
---|
field.on.missing.parent | STRING | create | MEDIUM | Permitted values: create , ignore . Defines how to react when the field to act on does not have a parent and "field.style" is "nested". If set to "create", then the SMT will create the parent struct/map when it does not exist. If set to "ignore", then it will SMT have no effect. |
field.on.existing.field | STRING | overwrite | MEDIUM | Permitted values: overwrite , ignore . Defines how to react when the field to act on already exists. If set to "overwrite", then the SMT will be applied to the existing field. If set to "ignore", then it will SMT have no effect. |
Example:
scenario | input | smtSMT | output |
---|
1. Nested field. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent.child.k3"
"transforms.smt1.static.value": "v3"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123",
"k3": "v3"
}
}
} |
|
2. Nested field, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent`parent..childchild`.k3"
"transforms.smt1.static.value": "v3"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123",
"k3": "v3"
}
} |
|
3. Nested field with the parent missing |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent.other.k3"
"transforms.smt1.static.value": "v3"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
},
"other": {
"k3": "v3"
}
}
} |
|
4. Nested field with the parent missing, and ignore is set |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent.other.k3"
"transforms.smt1.static.value": "v3",
"transforms.smt1.field.on.missing.parent": "ignore"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
5. Nested field with the parent missing |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent.child.k2"
"transforms.smt1.static.value": "456"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "456"
}
}
} |
|
6. Nested field with the parent missing, and ignore is set |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent.child.k2"
"transforms.smt1.static.value": "456",
"transforms.smt1.field.on.existing.field": "ignore"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
...
Name | Type | Default | Importance | Documentation |
---|
hoisted | STRING | <empty> | MEDIUM | Path to the element to be hoisted. If empty, the root struct/map is hoisted. |
Examples:
scenario | input | smtSMT | output |
---|
1. Nested field. |
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"k2": "123"
}
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HoistFIeld$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.hoisted": "parent.child.k2",
"transforms.smt1.field": "other"
} |
|
Code Block |
---|
| {
"k1": 123,
"parent": {
"child": {
"other": {
"k2": "123"
}
}
}
} |
|
2. Nested struct, when field names include dots |
Code Block |
---|
| {
"k1": 123,
"parent.child": {
"k2": "123"
}
} |
|
Code Block |
---|
| {
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HoistFIeld$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.hoisted": "parent..child",
"transforms.smt1.field": "other"
}
| Code Block |
---|
| {
"k1": 123,
"other": {
"parent`parent.childchild`": {
"k2,
"transforms.smt1.field": "123other"
}
}
} |
|
Non-affected SMTs
These SMT do not require nested structure support:
...
|
Code Block |
---|
| {
"k1": 123,
"other": {
"parent.child": {
"k2": "123"
}
}
} |
|
...
Compatibility, Deprecation, and Migration Plan
Existing SMT configurations will not be affected by these changes as the default field.style
is plain
, which represents the current behavior and users will need to opt-in the the new notation.
Rejected Alternatives
Keep ExtractField
as it is and use it multiple times until reaching nested fields
...
However, backslashes are also used by JSON. This could lead to unfriendly configurations like "this\\\\.is\\\\.not\\\\.very\\\\.readable"
...
Even if using custom separators represent represents a more explicit configuration, there is always the possibility that all the separators are already included as part of the field name, leading to issues and request requests for changes.
To avoid this, this KIP proposes using the approach to precede dots with another to escape itself.
...
- The JSONPath spec is too extensive for the use - cases included in this KIP.
- A sub-set of JSONPath was proposed, but the custom spec ends up being more complex than the notation proposed here.
- A sub-set will imply not using existing dependencies. Though However, adding an existing dependency would also reduce the chance of the KIP being accepted as the risk for external vulnerabilities will increase.
- The sub-set will require users to learn JSONPath, and then what's covered and what's not by the custom implementation.
Given these cons, the KIP is preferring prefers the dotted notation.
[1] https://github.com/json-path/JsonPath
...
field.style
with valid values: "plain", and "nested".
Even though this configuration is self-describing, it limits the semantics of the values.
...
Instead of adding a configuration under each field config, e.g. include.syntax.version
, the KIP proposed to have a single configuration per SMT, to affect all the input fields.
Use Double-dots to escape dots included on field names
Double dot is often used in JSON Path as a descendant selector, see https://www.ietf.org/id/draft-ietf-jsonpath-base-05.html
This may confuse users. To avoid this, the backtick approach is proposed in this KIP.
Potential Improvements (out of scope)
Support Array access
Adding notation for arrays (e.g. [], or array.<offset>) to access to array elements and apply SMTs to fields within the array.
...