Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionVoting

Discussion thread: here

JIRA: here

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-13656

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Single Message Transforms (SMT),  introduced withKIP-66, have greatly improved Connector's usability by enabling processing the transformation of input/output data formats and contents without the need for additional streaming applications. 

ThoughHowever, these benefits have been limited by SMTs limited to SMT's limitation to only act on fields available on at the root of the data structure.

Here are some tickets/comments related to this limitation:

This KIP is aimed to include support for nested structures on the existing SMTs where nested structures are used.

Proposed Changes

Nested notation

This KIP aims to include support for nested structures on the existing SMTs.

Proposed Changes

Nested notation

Dotted notation Using dots tends to be the most intuitive way to access the nested record structures, describe paths to nested fields in a record structure and will cover most of the scenarios. e.g. jq tooling already uses it[1] and will cover most of the scenarios. .

However, field names in JSON could include dots(e.Dots are already allowed as part of element names on JSON (i.e. Schemaless) records(e.g. {'nested.keyfield': {'valvalue':42}}).

Therefore, the nested notation must support escaping dots that could be valid field names.

Instead of escaping them dots with backslashes , which in JSON configurations will lead leads to unfriendly configurations , it's proposed to follow a similar approach as CSV to escape double quotes by preceding it with the same character (double quotes in this case).Then, for transform configuration, double dots can be used to escape existing dots that are part of the field name.the JSONata[2] where backticks are used to define field names with dots, e.g. `nested.field`


[1] https://stedolan.github.io/jq/manual/#Basicfilters

[2] https://datatrackerdocs.ietfjsonata.org/doc/html/rfc4180 2.7

> If double quotes are used to enclose fields, then a double-quote appearing inside a field must be escaped by preceding it with another double quote.

Accessing multiple values with deep-scan

There are scenarios where either we want to target multiple fields with the same name at different levels, e.g. arrays or dynamic structures.

For these cases, an asterisk can be used to search all elements within a path:

  • a.*.b  will access a  and then search all child objects/arrays for the field b .

If deep-scan is used, it must have only one field after the asterisk level.

Deep scans are expected to return multiple values. The SMT has to define how to proceed when multiple fields are found.

Accessing Arrays

Arrays can be accessed in different ways and at different levels.

  • Accessing the whole array: if a path points to an array and the SMT supports it as input, then a.b  can be used where b  is an array.
  • Accessing all elements of the array: if a path points to an array, and its elements are not objects, e.g. string. then the SMT can access all the elements of the array at once using a.b  where b is an array.
  • Accessing child elements on all array objects: if a path access an array and its elements are objects, we can access all the objects by providing a path of its child elements, e.g. a.b.c  access array b  and element c in all the items of the array.
  • Accessing a single item by index: if a path points to an array and then uses an index, then it gets that specific element. if no additional child element is provided, then it accesses the whole object/element.e.g. a.b.1  accesses the second item of the array.
  • Accessing elements within a single item by index. If the item of the array is an object, we can access its elements, e.g. a.b.1.c  to access the second item of the array, and access the field c  

//TODO add examples to SMTs

Public Interfaces

From the existing list of SMTs, there are the following to be impacted by this change:

New configuration flags

...

Permitted values: v1 , v2 . Defines the version of the syntax to access fields. If set to "v1", then the field paths are limited to access the elements at the root level of the struct or map. If set to "v2", the syntax will support accessing nested elements. o access nested elements, dotted notation is used. If dots are already included in the field name, then dots themselves can be used to represent dots part of the field name. e.g. to access elements from a struct/map named "same.field", the following format can be used to access its elements: "same..field.element".

This configuration will affect all the field paths used by the transform.

simple#examples

  > Field references containing whitespace or reserved tokens can be enclosed in backticks

Rules

  • 1. If field names do not contain dots (.), then only use dots to represent nested field paths.
  • 2. If field names contain dots, then:
    • wrap the field name with a backtick pair (`...`) by
      • adding an opening backtick at the beginning of the field name (beginning of a path, or after a dot)
      • adding a closing backtick at the end of the field name (end of the path, or before the next dot)
    • If a field is wrapped and doesn't contain dots, is processed the same way: field name within the wrapping backticks is used
  • 3. If a field name includes backticks, then:
    • If a backtick is followed by a dot in the field name, then the backtick should be escaped with a backslash to signal that the backtick is part of the name and not closing a backtick pair.
      • Backslashes (\) do not need to be escaped. If the backslash happens to be part of the field name and before a backtick is to be escaped, then add another backslash.
    • else, backticks do not require escape
  • 4. If wrapping backtick pairs are incomplete, the Connect configuration must fail fast to avoid getting ambiguous paths deployed.

Examples

ScenarioNested structPath
  1. Normal (no dots or backticks on field names)
foo:
  bar:
    baz: val
OK: foo.bar.baz
2. Field names including dots
foo:
  bar.baz: val
OK: foo.`bar.baz`
2.1 Using backticks within a field name without dots
foo:
  bar:
    baz: val
OK: foo.bar.baz
ERROR: foo.`bar.baz: no pair
ERROR: foo.bar`.baz: no pair
3. Field names including backticks
foo:
  ba`r:
    baz: val
OK: foo.ba`r.baz
OK: foo.`ba`r`.baz
3.1. Field names including backticks at the wrapping position
foo:
  bar`.`baz: val
OK: foo.`bar\`.\`baz`
ERROR: foo.`bar\`.`baz`: no pair
ERROR: foo.`bar`.`baz`: valid but different path (see 2.1)
3.2. Field names including dots and backticks between a backtick pair
foo:
  b`ar.baz: val
OK: foo.`b`ar.baz`
3.3. Field names including backslash and backticks at the wrapping position
foo:
  bar\`.`baz: val
OK: foo.`bar\\`.\`baz`
3.4. Field names wrapped by backticks
foo:
  `bar`:
      baz: val
OK: foo.`\`bar\``.baz
ERROR: foo.`bar`.baz: valid but different path (see 2.1)

Affected SMTs

These SMTs will include support for nested structures:

  • Cast
  • ExtractField
  • HeaderFrom
  • MaskField
  • ReplaceField
  • TimestampConverter
  • ValueToKey
  • InsertField
  • HoistField

Non-affected SMTs

These SMTs do not require nested structure support:

  • DropHeaders: Drop one or multiple headers.
  • Filter: Drops the whole message based on a predicate.
  • InsertHeader: Insert a specific message to the header.
  • RegexRouter: Acts on the topic name.
  • SetSchemaMetadata: Acts on root schema.
  • TimestampRouter: Acts on timestamp.
  • Flatten: Acts on the whole key or message. 

Public Interfaces

New configuration flags

NameTypeDefaultImportanceDocumentation
field.syntax.version STRING V1HIGH 

Permitted values: V1 , V2 . Defines the version of the syntax to access fields. If set to "V1", then the field paths are limited to access the elements at the root level of the struct or map. If set to "V2", the syntax will support accessing nested elements. To access nested elements, dotted notation is used. If dots are already included in the field name, then backtick pairs can be used to wrap field names containing dots. e.g. to access elements from a struct/map named "foo.bar", the following format can be used to access its elements: "`foo.bar`.baz".

This configuration will affect all the field paths used by the transform.

This flag will be added conditionally to some SMTs, as described below.

Affected SMTs

Cast

Changes:

  • Extend spec to support nested notation.

Examples:

scenarioinputSMToutput
1. Nested field.

These flags will be added conditionally to some SMTs, as described below.

Affected SMTs

Cast

Changes:

  • Extend spec to support nested notation.
  • Supports arrays and deep-scan to access multiple fields.

Examples:

scenarioinputsmtoutput1. Nested field.
Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}
Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.spec": "k1:string,parent.child.k2:int64"
}
Code Block
languagejs
{
  "k1": "123",
  "parent": {
    "child": {
      "k2": 123    
    }
  }
}
2. Nested field, when field names include dots Code Block
languagejs
{ "k1": 123, "parent.child": { "k2": "123" } }


Code Block
languagejs
{
  "
transforms
k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.spec": "k1:string,parent.
.
child.k2:int64"
}



Code Block
languagejs
{
  "k1": "123",
  "parent
.
": {
    "child": {
      "k2": 123    
    }
  }

ExtractField

Changes:

  • Extend field to support nested notation.
  • Does not support multiple values (e.g. deep scan or array)

Example:


}


2. Nested field, when field names include dots
scenarioinputsmtoutput1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent
": { "
.child": {
    "k2": "123"

  
}
}
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.
ExtractField$Value
Cast$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.
field
spec": "
parent.child
k1:string,`parent.child`.k2:int64"
}



Code Block
languagejs
{
  "k1": "123",
  "parent.child": {
    "k2"
: 123
  }
}


ExtractField

Changes:

  • Extend field to support nested notation.

Example:

scenarioinputSMToutput
1. Nested field.
2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent
.
": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.field": "parent.
.scenarioinputsmtoutput1. Nested field.
child.k2"
}



Code Block
languagejs
"123"

HeaderFrom

Changes:

  • Extend fields to support nested notation.
  • As this SMT affects only existing fields, additional configurations will not be required.
  • Does not support multiple values (e.g. deep scan or array)

Example:


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent
": { "
.child": {
    
"k2": "123"

  }
}
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.
HeaderFrom$Value
ExtractField$Value",
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.
fields
field": "
k1,parent
`parent.
child
child`.k2"
,

"transforms.smt1.headers": "k1,k2"
}



Code Block
languagejs
headers: - k1=123 - k2=
"123"
2


3. Nested field,
when field names include dots
an object returned.


Code Block
languagejs
{
  "k1": 123,
  "parent
.
": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.
HeaderFrom$Value
ExtractField$Value",

"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.
fields
field": "
k1,
parent.
.
child
.k2
"
, "transforms.smt1.headers": "k1,k2" }
Code Block
languagejs
headers:
- k1=123
- k2="123"

...


}



Code Block
languagejs
{ "k2": "123" }


HeaderFrom

Changes:

  • Extend fields to support nested notation.
  • Supports arrays and deep-scan to access multiple fields.

Example:

scenarioinput
smt
SMToutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.
MaskField$Value
HeaderFrom$Value",

"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "k1,parent.child.k2",
"transforms.smt1.headers": "k1,k2"
}



Code Block
languagejs
{
headers:
- 
"
k1
":
=123
,

- 
k2="
parent": { "child": { "k2": "" } } }
123"


2
2
. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.
MaskField$Value
HeaderFrom$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "
parent
k1,`parent.
.child.
child`.k2",
"transforms.smt1.headers": "k1,k2"
}



Code Block
languagejs
{
headers:
- 
"
k1
":
=123
,

- 
"parent.child": { "k2": "" } }

ReplaceField

Changes:

k2="123"


MaskField

Changes:

  • Extend fields to support nested notationExtend theinclude and exclude listsSupports arrays and deep-scan to access multiple fields.

Example:

scenarioinput
smt
SMToutput
1. Nested field.
Drop field


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.
ReplaceField$Value
MaskField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.
exclude
fields": "parent.child.k2"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": ""    
    }
  }
}


2. Nested field
. Drop struct
, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "
child
k2": 
{
"123"
  
"k2": "123" } }
}
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.
ReplaceField$Value
MaskField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.
exclude
fields": "
parent
`parent.child`.
child
k2"
}



Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": ""
  }
}


ReplaceField

Changes:

  • Extend theinclude and exclude lists

Example:

scenarioinputSMToutput
1
3
. Nested field.
Include
Drop field


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"
,
    
  
"k3":
 
"234"   }
 }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.
include
exclude": "parent.child.k2"
}



Code Block
languagejs
{
  "
parent
k1": 
{
123,
  
"
child
parent": {
    
"
k2
child": 
"123"  
{
    }
  }
}
4


2. Nested field.
Include
Drop struct


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"
,
    
"k3": "234"  

    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.
include
exclude": "parent.child"
}



Code Block
languagejs
{
  "
parent
k1": 
{
123,
  "
child
parent": {
  
}
}


3. Nested field. Include field


Code Block
languagejs
{
  
"
k2
k1": 
"
123
"
,
  
"parent": {
    "
k3
child": 
"234"  
{
    
} } }5. Nested field, when field names include dots Code Block
languagejs
{
  "
k1
k2": "123",
 
"parent.child":
 
{
    "
k2
k3": "
123
234"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.
renames
include": "parent.
.
child.k2
:field2
"
}



Code Block
languagejs
{
 
  "
k1
parent": 
123,  
{
    "
parent.
child": {
  
      "
field2
k2": "123"
 
    
    }
  }

TimestampConverter

Changes:

  • Extend fields to support nested notation.
  • Does not support multiple values (e.g. deep scan or array)

Example:

}


4.
scenarioinputsmtoutput1.
Nested field. Include struct


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": 
1556204536000        
"123",
      "k3": "234"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.
TimestampConverter$Value
ReplaceField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.
field
include": "parent.child
.k2", "transforms.smt1.format": "yyyy-MM-dd", "transforms.smt1.target.type":
"
string"

}



Code Block
languagejs
{
  "
k1
parent": 
123,
{
    "
parent
child": {
      "
child
k2": 
{
"123",
      "
k2
k3": "
2014-04-25
234"   
 
 
  
 
 
 
 }
  }
}
2


5. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": 
1556204536000         }
"123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.
TimestampConverter$Value
ReplaceField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.
field
renames": "
parent
`parent.
.child
child`.k2
", "transforms.smt1.format": "yyyy-MM-dd", "transforms.smt1.target.type": "string" }
:field2"
}



Code Block
languagejs
{
  
"k1": 123,
  
"parent.child": {
   

   "
k2
field2": "
2014-04-25"
123"
  }
}

...


TimestampConverter

Changes:

  • Extend fields to support nested notation.
  • Does not support multiple values (e.g. deep scan or array)

Example:

scenarioinput
smt
SMToutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": 
"123"
1556204536000       
  }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.
ValueToKey
TimestampConverter$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.
fields
field": "parent.child.k2"
}
Code Block
"123"
2. Nested struct to Key.
,
"transforms.smt1.format": "yyyy-MM-dd",
"transforms.smt1.target.type": "string"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "
123
2014-04-25"       
  
}
  }
}


2. Nested field, when field names include dots


Code Block
languagejs
{
  "
transforms
k1": 
"smt1"
123,
  "
transforms.
parent.child": {
      "k2": 1556204536000         }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.
ValueToKey
TimestampConverter$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.
fields
field": "
parent
`parent.child`.
child
k2"
} Code Block{ "k2
,
"transforms.smt1.format": "
123" }3. Nested field, when field names include dots
yyyy-MM-dd",
"transforms.smt1.target.type": "string"
}




Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
      "k2": "
123
2014-04-25"
   }
}


ValueToKey

Changes:

  • Extend fields to support nested notation.

Example:

scenarioinputSMToutput
1. Nested field.


Code Block
languagejs
{
  "
transforms
k1": 
"smt1"
123,
"transforms.smt1.type": "org.
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "parent.
.
child.k2"
}



Code Block
language
js
"123"

InsertField

Changes:

  • Extend *.field to support nested notation.
  • Does not support multiple values (e.g. deep scan or array)

New configurations (additional to field.style  described above):

...

Name

...



2. Nested struct to Key.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.fields": "parent.child"
}



Code Block
{
  "k2": "123"    
}



3. Nested field, when field names include dots

Example:

scenarioinputsmtoutput1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent
": { "
.child": {
    "k2": "123"

  
}
}
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.
InsertField$Value
ValueToKey", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.
static.field
fields": "
parent
`parent.
child.k3" "transforms.smt1.static.value": "v3"
child`.k2"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123",
      "k3": "v3"   
    }
  }
}
2. Nested field, when field names include dots
Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}
"123"


InsertField

Changes:

  • Extend *.field to support nested notation.

New configurations (additional to field.syntax.version described above):

Name

TypeDefaultImportanceDocumentation
field.on.missing.parentSTRINGcreateMEDIUMPermitted values: create, ignore. Defines how to react when the field to act on does not have a parent and "field.style" is "nested". If set to "create", then the SMT will create the parent struct/map when it does not exist. If set to "ignore", then it will SMT have no effect.
field.on.existing.fieldSTRINGoverwriteMEDIUMPermitted values: overwrite, ignore. Defines how to react when the field to act on already exists. If set to "overwrite", then the SMT will be applied to the existing field. If set to "ignore", then it will SMT have no effect.


Example:

scenarioinputSMToutput
1. Nested field.
Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value",  "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent..child.k3"
"transforms.smt1.static.value": "v3" 
}
Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123",
    "k3": "v3"
  }
}
3. Nested field with the parent missing


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent.
other
child.k3"
"transforms.smt1.static.value": "v3" 
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"
}
,
"other": {

      "k3": "v3"   
    }
  }
}
4


2. Nested field
with the parent missing, and ignore is set
, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent
": { "
.child": {
    
"k2": "123"
}

  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value",  "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "
parent
`parent.
other
child`.k3"
"transforms.smt1.static.value": "v3"
, "transforms.smt1.field.on.missing.parent":
 
"ignore"

}



Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "
child
k2": 
{
"123",
    
"
k2
k3": "
123
v3"

  }
}
}
5


3. Nested field with the parent missing


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", 
 
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent.
child
other.
k2
k3"
"transforms.smt1.static.value": "
456
v3" 
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "
456
123"  
    },
    "other": {
      "k3": "v3"  
    }
  }
}
6


4. Nested field with the parent missing, and ignore is set


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", 
 
"transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent.
child
other.
k2
k3"
"transforms.smt1.static.value": "
456
v3",
"transforms.smt1.field.on.
existing
missing.
field
parent": "ignore"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"  
    }
  }
}

HoistField

Changes:

  • Add a hoisted config to point to a specific path to hoist.

  • Does not support multiple values (e.g. deep scan or array)

New configurations:

...

Name

...


5. Nested field with the parent missing


Code Block
languagejs
{

Examples:

scenarioinputsmtoutput1. Nested field. Code Block
languagejs
{

  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.
HoistFIeld$Value
InsertField$Value",  "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent.child.k2"
"transforms.smt1.static.value": "456"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "456"  
    }
  }
}


6. Nested field with the parent missing, and ignore is set


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value",  "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.static.field": "parent.child.k2"
"transforms.smt1.static.value": "456",
"transforms.smt1.field.on.existing.field": "ignore"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"  
    }
  }
}


HoistField

Changes:

  • Add a hoisted config to point to a specific path to hoist.

New configurations:

Name

TypeDefaultImportanceDocumentation
hoisted STRING <empty>MEDIUM Path to the element to be hoisted. If empty, the root struct/map is hoisted.

Examples:

scenarioinputSMToutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HoistFIeld$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.hoisted": "parent.child.k2",
"transforms.smt1.field": "other"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "other": {
        "k2": "123"
      }    
    }
  }
}


2. Nested struct, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HoistFIeld$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.hoisted": "`parent.child`",
"transforms.smt1.field": "other"
}



Code Block
languagejs
{
  "k1": 123,
  "other": {
    "parent.child": {
      "k2": "123"
    }
  }
}



Compatibility, Deprecation, and Migration Plan

Existing SMT configurations will not be affected by these changes as the default field.style  is plain and users will need to opt-in the the new notation.

Rejected Alternatives

Keep ExtractField as it is and use it multiple times until reaching nested fields

This KIP proposes simplifying this configuration by replacing multiple invocations with only one nested one.

Use dots as the only separator and escape with backslashes when collides

Trying to keep only one separator, one of the alternatives is to use dots to separate; if it collides with the existing field names use backslashes "\" to represent dots that are part of the name e.g.  "this.field" (which would refer to the nested field "field" under the top-level "this" field), and "this\.field" (which would refer to the field named "this.field").

However, backslashes are also used by JSON. This could lead to unfriendly configurations like "this\\\\.is\\\\.not\\\\.very\\\\.readable"

Use custom separators for edge cases

Using double dots to escape separators is another alternative to try sticking to using only dots as a field separator.

Comparing:

"transforms.smt1.hoisted": "parent.child.k2", "transforms.smt1.field": "other" }
With double dotsWith separator


Code Block
{
  "transforms": "cast",
  "transforms.cast
Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "other": {
        "k2": "123"
      }    
    }
  }
}
2. Nested struct, when field names include dots
Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}
Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HoistFIeld$Value", "transforms.smt1.field.syntax.version": "v2",          
  "transforms.smt1cast.hoistedtype": "parent...child",
  "transforms.smt1cast.fieldspec": "other"address..personal.country:string"
}



js
Code Block
language
{   
  "k1transforms": 123"cast",
  "other": {
    "parent.child": {
      "k2 "transforms.cast.field.syntax.version": "v2",
  "transforms.cast.field.separator": "123"
    }
  }
}

Non-affected SMTs

These SMT do not require nested structure support:

  • DropHeaders: Drop one or multiple headers.
  • Filter: Drops the whole message based on a predicate.
  • InsertHeader: Insert a specific message to the header.
  • RegexRouter: Acts on the topic name.
  • SetSchemaMetadata: Acts on root schema.
  • TimestampRouter: Acts on timestamp.
  • Flatten: Acts on the whole key or message. 

Compatibility, Deprecation, and Migration Plan

Existing SMT configurations will not be affected by these changes as the default field.style  is plain, which represents the current behavior.

Rejected Alternatives

Keep ExtractField as it is and use it multiple times until reaching nested fields

This KIP proposes simplifying this configuration by replacing multiple invocations with only one nested one.

Use dots as the only separator and escape with backslashes when collides

Trying to keep only one separator, one of the alternatives is to use dots to separate; if it collides with the existing field names use backslashes "\" to represent dots that are part of the name e.g.  "this.field" (which would refer to the nested field "field" under the top-level "this" field), and "this\.field" (which would refer to the field named "this.field").

However, backslashes are also used by JSON. This could lead unfriendly configurations like "this\\\\.is\\\\.not\\\\.very\\\\.readable"

Use custom separators for edge cases

Using double dots to escape separators is another alternative to try sticking to using only dots as a field separator.

Comparing:

...

/", 
  "transforms.cast.type": "..."
  "transforms.cast.spec": "address.personal/country:string",
}


Even if using custom separators represents a more explicit configuration, there is always the possibility that all the separators are already included as part of the field name, leading to issues and requests for changes.

To avoid this, this KIP proposes using the approach to precede dots with another to escape itself.

Use JSONPath notation to access nested elements

JSONPath[1] was a proposed alternative to the nested notation. A drafted version of the KIP with examples using the proposed notation is outlined here: [DRAFT] KIP-821: Connect Transforms support for nested structures (JsonPath-based draft)

The following limitations were found:

  • The JSONPath spec is too extensive for the use cases included in this KIP.
  • A sub-set of JSONPath was proposed, but the custom spec ends up being more complex than the notation proposed here.
    • A sub-set will imply not using existing dependencies. However, adding an existing dependency would also reduce the chance of the KIP being accepted as the risk for external vulnerabilities will increase.
    • The sub-set will require users to learn JSONPath, and then what's covered and what's not by the custom implementation.

Given these cons, the KIP prefers the dotted notation.

[1] https://github.com/json-path/JsonPath

Use named styles instead of syntax versions

Was considered to use a configuration to name the styles to target fields:

  • field.style  with valid values: "plain", and "nested".

Even though this configuration is self-describing, it limits the semantics of the values.

Instead, the KIP is considering a versioned configuration "field.syntax.version" to avoid affecting current behavior and make it easier to extend by including compatible changes on the same version.

Use configuration flag per SMT instead of per-field configuration

Instead of adding a configuration under each field config, e.g. include.syntax.version , the KIP proposed to have a single configuration per SMT, to affect all the input fields.

Use Double-dots to escape dots included on field names

Double dot is often used in JSON Path as a descendant selector, see https://www.ietf.org/id/draft-ietf-jsonpath-base-05.html

This may confuse users. To avoid this, the backtick approach is proposed in this KIP.

Potential Improvements (out of scope)

Support Array access

Adding notation for arrays (e.g. [], or array.<offset>) to access array elements and apply SMTs to fields within the array.

This has to consider fields that could be including [, ] , or numbers as part of their names and how to escape them.

Support Deep-Scan

Supported by JsonPath, could allow applying SMTs to multiple fields with the same name at different locations of the structure.

At the moment is not clear how to escape the character used for deep-scan. e.g. if using *

Code Block
{
  "transforms": "cast",
  "transforms.cast.field.syntax.version": "v2",          
  "transforms.cast.type": "..."
  "transforms.cast.spec": "address..personal.country:string"
}
Code Block
{   
  "transforms": "cast",
  "transforms.cast.field.syntax.version": "v2",
  "transforms.cast.field.separator": "/", 
  "transforms.cast.type": "..."
  "transforms.cast.spec": "address.personal/country:string",
}

Even if using custom separators represent a more explicit configuration, there is always the possibility that all the separators are already included as part of the field name, leading to issues and request for changes.

To avoid this, this KIP proposes using the approach to precede dots with another to escape itself.

Use JSONPath notation to access nested elements

//TODO

Use named styles instead of syntax versions

//TODO

Potential KIPs

Future KIPs could extend this support for:

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10640

...

.