Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Single Message Transforms (SMT), KIP-66, have greatly improved Connector's usability by enabling processing transforming input/output data format and content without the need for additional streaming applications. 

Though, these benefits have been limited by SMTs limited constraint to only lookup for fields available on the root of the data structure:

This KIP is aimed to include support for nested structures on the existing SMTs where nested structures are used.

Proposed Changes

Nested notation

This KIP is aimed to include support for nested structures on the existing SMTs.

Proposed Changes

Nested notation

Using dots tends to be the most intuitive way to describe the path to nested fields in a Using dots tends to be the most intuitive way to access the nested record structures, e.g. jq tooling already uses it[1], and will cover most of the scenarios.

Dots However, dots are already allowed as part of element names on JSON (i.e. Schemaless) records(e.g. {'nested.key': {'val':42}}).

Instead of escaping them with backslashes , which in JSON configurations will lead to unfriendly configurations , it's proposed to follow a similar approach as the CSV format[2] to escape double-quotes by preceding it with the same character (double quotes in this case).

Then, for transform configurationconfigurations, double-dots can be used to escape existing dots that are part of the field name.

...

> If double quotes are used to enclose fields, then a double-quote appearing inside a field must be escaped by preceding it with another double quote.

Accessing

...

values by deep-scan

There are scenarios to target multiple fields with the same name at different levels, e.g. dynamic/unknown structures, in a recursive manner.

For these cases, an asterisk can be used to search all elements within a path :(e.g. similar to JsonPath[3], but using asterisk instead of .. ).

  • a.*.b  will access a  and then search all child objects for the field b, including arrays.

...

  • .

Deep scans are expected to return multiple paths. If only one element is found, a list of one path is returned. If no path is found, an empty list is returned.

SMTs have values. The SMT has to define how to proceed when multiple fields are foundprocess paths lists.

scenariodatapathaccessfields
Accessing nested elements
  1. a. Deep-scan as prefix to a field


Code Block
{
  "k1": { "b": "b1" },
  "k2": { "b": "b2" }, 
  "k3": { "b": "b3" },
  "b": "b4"
}


*.b All the data structure is scanned to find a b  field.
  • k1.b
  • k2.b
  • k3.b
Accessing nested objects and their elements
  • b
  1. b. Deep-scan as prefix to a nested field


Code Block
{
  "k1": { "b": { "c": "c1" } },
  "k2": { "b": { "c": "c2" } }, 
  "k3": { "b": { "c": "c3" } },
  "b": { "c": "b4" } 
}


*.b.c All the data structure is scanned to find a b  field.
  • k1.b.c
  • k2.b.c
  • k3.b.c
Starting at an element
  • b.c
2. Deep-scan in the mid of a path expression


Code Block
{ "a"
Code Block{ "a"
: {
  "k1": { "b": { "c": "c1" } },
  "k2": { "b": { "c": "c2" } }
}, "a2": {
 
 "k3": { "b": { "c": "c3" } }
}
, "a2": {
}
}


a.*.b 
Not allowed to finish with asterisk

Find the prefix element (a).

If found, scan the nested structure as in the scenarios 1.a and 1.b.

  • a.k1.b
  • a.k2.b
3. a. Deep-scan as suffix to an array


Code Block
{ 
{
"a":
{

"k1": { "b": { "c": "c1" } },

"k2": { "b": { "c": "c2" } },

"k3": { "b": { "c": "c3" } }

}, "a2": {}}

a.*Not allowed

Accessing Arrays

Arrays can be accessed in different ways and at different levels.

...

 [ "v1", "v2", "v3"] }


a.*If a deep-scan is the last path expression under an array, all the array items are returned
  • a.0
  • a.1
  • a.2
3. b. Deep-scan as suffix to a struct/map


Code Block
{ "a": {
"k1": "v1",
"k2": "v1",
"k3": { "b": "c" }
}


a.*If a deep-scan is the last path expression under a struct/map, all the child fields are returned
  • a.k1
  • a.k2
  • a.k3
3.c. Deep-scan as suffix to a field


Code Block
{ "a": {
"k1": "v1",
"k2": "v1"
}


a.k1.*If a deep-scan is the last path expression under a field, an empty list is returned.<empty>
4. a. Escaping asterisks


Code Block
{ "*": {
"k1": "v1",
"k2": "v1"
}


**.k1

If an asterisk is already being used as a field name, the to target that field, a double-asterisk is used.

This is only needed when the field's full name is only asterisks.

  • *.k1
4. b. Escaping multiple asterisks


Code Block
{ "**": {
"k1": "v1",
"k2": "v1"
}


***.k1If more than one asterisk is used, then the number of asterisk to escape the deep-scan expression should include an additional
  • **.k1


Accessing Arrays

Arrays can be accessed in different ways and at different levels.

...


scenariodatapathfields
Accessing struct and root elementsarray as a single element


Code Block
{ "a": [ "a1", "a2", "a3"]


a
  • a
Accessing all items


Code Block
{ "a": [ "a1", "a2", "a3"]


a.*
  • a.0
  • a.1
  • a.2
Accessing an item by index


Code Block
{ "a": [ "a1", "a2", "a3"]


a.<index>

a.0

  • a.0
Accessing elements within objects


Code Block
{ "a": [ { "b": "b1" }, { "b": "b2" } ]


a.*.b
  • a.0.b
  • a.1.b
Accessing an item by index, and its elements within an object


Code Block
{ "a": [ { "b": "b1" }, { "b": "b2" } ]


a.0.b
  • a.0.b

...

From the existing list of the SMTs, there are the following to be impacted by this change:

...

  • Extend field to support nested notation.
  • Supports arrays and deep-scan to access multiple fields.
    • If multiple paths are found, then it creates an arrayThe result produces an array, even for one or no field is found.

Example:

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.field": "parent.child.k2"
}



Code Block
languagejs
"123"


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.field": "parent..child.k2"
}



Code Block
languagejs
"123"


3. Nested field, an object returned.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.field": "parent.child"
}



Code Block
languagejs
{ "k2": "123" }


3. Nested field, an array returned.


Code Block
languagejs
{
  "k1": 123,
  "parent1": {
    "child": {
      "k2": "123"    
    }
  },
  "parent2": {
    "child": {
      "k2": "234"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.field": "*.child.k2"
}



Code Block
languagejs
[ "123", "234" ]


...

  • Extend fields to support nested notation.
  • Supports arrays and deep-scan to access multiple fields.
    • If multiple paths are found, then it creates an arrayThe result produces an array, even for one or no field is found.

Example:

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "parent.child.k2"
}



Code Block
"123"



2. Nested struct to Key.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "parent.child"
}



Code Block
{
  "k2": "123"    
}



3. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "parent..child.k2"
}



Code Block
languagejs
"123"


4. Multiple values to key


Code Block
languagejs
{
  "k1": 123,
  "parent1": {
    "child": {
      "k2": "123"    
    }
  },
  "parent2": {
    "child": {
      "k2": "234"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "*.child.k2"
}



Code Block
[ "123", "234 ]



...

New configurations (additional to field.syntax.style  version described above):

Name

TypeDefaultImportanceDocumentation
field.on.missing.parentSTRINGcreateMEDIUMPermitted values: create, ignore. Defines how to react when the field to act on does not have a parent and "field.style" is "nested". If set to "create", then the SMT will create the parent struct/map when it does not exist. If set to "ignore", then it will SMT have no effect.
field.on.existing.fieldSTRINGoverwriteMEDIUMPermitted values: overwrite, ignore. Defines how to react when the field to act on already exists. If set to "overwrite", then the SMT will be applied to the existing field. If set to "ignore", then it will SMT have no effect.

...