Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP is aimed to include support for nested structures on the existing SMTs where nested structures are used.

Public Interfaces

From the existing list of SMTs, there are the following to be impacted by this change:

New configuration flags

...

Permitted values: plain , nested. Defines how to traverse a record structure to apply a transformation. If set to "plain", then the transformations will only apply to the elements located at the root of the message. If set to "nested", then nested elements will be affected by the transformations as well. To access nested elements, dotted notation is used. If dots are already included in the field name, then dots themselves can be used to represent dots part of the field name. e.g. to access elements from a struct/map named "same.field", the following format can be used to access its elements: "same..field.element"

Proposed Changes

Nested notation

Using dots tends to be the most intuitive way to access the nested record structures, e.g. jq tooling already uses it[1] and will cover most of the scenarios.

Dots are already allowed as part of element names on JSON (i.e. Schemaless) records(e.g. {'nested.key': {'val':42}}). Instead of escaping them with backslashes, which in JSON configurations will lead to unfriendly configurations, it's proposed to follow a similar approach as CSV to escape double quotes by preceding it with the same character (double quotes in this case).

Then, for transform configuration, double dots can be used to escape existing dots that are part of the field name.


[1] https://stedolan.github.io/jq/manual/#Basicfilters

[2] https://datatracker.ietf.org/doc/html/rfc4180 2.7

> If double quotes are used to enclose fields, then a double-quote appearing inside a field must be escaped by preceding it with another double quote.

Accessing multiple values with deep-scan

There are scenarios where either we want to target multiple fields with the same name at different levels, e.g. arrays or dynamic structures.

For these cases, an asterisk can be used to search all elements within a path:

  • a.*.b  will access a  and then search all child objects/arrays for the field b .

If deep-scan is used, it must have only one field after the asterisk level.

Deep scans are expected to return multiple values. The SMT has to define how to proceed when multiple fields are found.

Accessing Arrays

Arrays can be accessed in different ways and at different levels.

  • Accessing the whole array: if a path points to an array and the SMT supports it as input, then a.b  can be used where b  is an array.
  • Accessing all elements of the array: if a path points to an array, and its elements are not objects, e.g. string. then the SMT can access all the elements of the array at once using a.b  where b is an array.
  • Accessing child elements on all array objects: if a path access an array and its elements are objects, we can access all the objects by providing a path of its child elements, e.g. a.b.c  access array b  and element c in all the items of the array.
  • Accessing a single item by index: if a path points to an array and then uses an index, then it gets that specific element. if no additional child element is provided, then it accesses the whole object/element.e.g. a.b.1  accesses the second item of the array.
  • Accessing elements within a single item by index. If the item of the array is an object, we can access its elements, e.g. a.b.1.c  to access the second item of the array, and access the field c  


//TODO add examples to SMTs

Public Interfaces

From the existing list of SMTs, there are the following to be impacted by this change:

New configuration flags

NameTypeDefaultImportanceDocumentation
field.syntax.version STRING v1HIGH 

Permitted values: v1 , v2 . Defines the version of the syntax to access fields. If set to "v1", then the field paths are limited to access the elements at the root level of the struct or map. If set to "v2", the syntax will support accessing nested elements. o access nested elements, dotted notation is used. If dots are already included in the field name, then dots themselves can be used to represent dots part of the field name. e.g. to access elements from a struct/map named "same.field", the following format can be used to access its elements: "same..field.element".

This configuration will affect all the field paths used by the transform.

These flags will be added conditionally to some SMTs, as described below.

Affected SMTs

Cast

Changes:

  • Extend spec to support nested notation.
  • Supports arrays and deep-scan to access multiple fields.

Examples:

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 

These flags will be added conditionally to some SMTs, described below.

Affected SMTs

Cast

Changes:

  • Extend spec to support nested notation.

Examples:

scenarioinputsmtoutput
1. Nested field.
Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.smt1.field.stylesyntax.version": "nestedv2",
"transforms.smt1.spec": "k1:string,parent.child.k2:int64"
}



Code Block
languagejs
{
  "k1": "123",
  "parent": {
    "child": {
      "k2": 123    
    }
  }
}


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.smt1.field.syntax.styleversion": "nestedv2",
"transforms.smt1.spec": "k1:string,parent..child.k2:int64"
}



Code Block
languagejs
{
  "k1": "123",
  "parent.child": {
    "k2": 123
  }
}


...

  • Extend field to support nested notation.
  • Does not support multiple values (e.g. deep scan or array)

Example:

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.stylesyntax.version": "nestedv2",
"transforms.smt1.field": "parent.child.k2"
}



Code Block
languagejs
"123"


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.syntax.styleversion": "nestedv2",
"transforms.smt1.field": "parent..child.k2"
}



Code Block
languagejs
"123"


...

  • Extend fields to support nested notation.
  • As this SMT affects only existing fields, additional configurations will not be required.
  • Does not support multiple values (e.g. deep scan or array)

Example:

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.smt1.field.syntax.styleversion": "nestedv2",
"transforms.smt1.fields": "k1,parent.child.k2",
"transforms.smt1.headers": "k1,k2"
}



Code Block
languagejs
headers:
- k1=123
- k2="123"


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
 "transforms.smt1.field.stylesyntax.version": "nestedv2",
"transforms.smt1.fields": "k1,parent..child.k2",
"transforms.smt1.headers": "k1,k2"
}



Code Block
languagejs
headers:
- k1=123
- k2="123"


...

  • Extend fields to support nested notation.
  • Supports arrays and deep-scan to access multiple fields.

Example:

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.MaskField$Value",
 "transforms.smt1.field.stylesyntax.version": "nestedv2",
"transforms.smt1.fields": "parent.child.k2"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": ""    
    }
  }
}


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.MaskField$Value",
 "transforms.smt1.field.syntax.styleversion": "nestedv2",
"transforms.smt1.fields": "parent..child.k2"
}



Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": ""
  }
}


...

  • Extend theinclude and exclude lists
  • Supports arrays and deep-scan to access multiple fields.

Example:

scenarioinputsmtoutput
1. Nested field. Drop field


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
 "transforms.smt1.field.stylesyntax.version": "nestedv2",
"transforms.smt1.exclude": "parent.child.k2"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
    }
  }
}


2. Nested field. Drop struct


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
 "transforms.smt1.field.stylesyntax.version": "nestedv2",
"transforms.smt1.exclude": "parent.child"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
  }
}


3. Nested field. Include field


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123",
      "k3": "234"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
 "transforms.smt1.field.stylesyntax.version": "nestedv2",
"transforms.smt1.include": "parent.child.k2"
}



Code Block
languagejs
{
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}


4. Nested field. Include struct


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123",
      "k3": "234"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
 "transforms.smt1.field.syntax.styleversion": "nestedv2",
"transforms.smt1.include": "parent.child"
}



Code Block
languagejs
{
  "parent": {
    "child": {
      "k2": "123",
      "k3": "234"    
    }
  }
}


5. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
 "transforms.smt1.field.syntax.styleversion": "nestedv2",
"transforms.smt1.renames": "parent..child.k2:field2"
}



Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
   "field2": "123"
  }
}


...

  • Extend fields to support nested notation.
  • Does not support multiple values (e.g. deep scan or array)

Example:

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": 1556204536000         }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
 "transforms.smt1.field.syntax.styleversion": "nestedv2",
"transforms.smt1.field": "parent.child.k2",
"transforms.smt1.format": "yyyy-MM-dd",
"transforms.smt1.target.type": "string"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "2014-04-25"         }
  }
}


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
      "k2": 1556204536000         }
  }
}



Code Block
languagejs


{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
 "transforms.smt1.field.syntax.styleversion": "nestedv2",
"transforms.smt1.field": "parent..child.k2",
"transforms.smt1.format": "yyyy-MM-dd",
"transforms.smt1.target.type": "string"
}




Code Block
languagejs
{
  "k1": 123,
  "parent.child": {      "k2": "2014-04-25"   }
}


...

  • Extend fields to support nested notation.
  • Does not support multiple values (e.g. deep scan or array)

Example:

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey",
 "transforms.smt1.field.stylesyntax.version": "nestedv2",
"transforms.smt1.fields": "parent.child.k2"
}



Code Block
"123"



2. Nested struct to Key.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey",
 "transforms.smt1.field.syntax.styleversion": "nestedv2",
"transforms.smt1.fields": "parent.child"
}



Code Block
{
  "k2": "123"    
}



3. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey",
 "transforms.smt1.field.stylesyntax.version": "nestedv2",
"transforms.smt1.fields": "parent..child.k2"
}



Code Block
languagejs
"123"


...

  • Extend *.field to support nested notation.
  • Does not support multiple values (e.g. deep scan or array)

New configurations (additional to field.style  described above):

...

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value",
 "transforms.smt1.field.stylesyntax.version": "nestedv2",
"transforms.smt1.static.field": "parent.child.k3"
"transforms.smt1.static.value": "v3" 
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123",
      "k3": "v3"   
    }
  }
}


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", 
 "transforms.smt1.field.stylesyntax.version": "nestedv2",
"transforms.smt1.static.field": "parent..child.k3"
"transforms.smt1.static.value": "v3" 
}



Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123",
    "k3": "v3"
  }
}


3. Nested field with the parent missing


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value",
 "transforms.smt1.field.syntax.styleversion": "nestedv2",
"transforms.smt1.static.field": "parent.other.k3"
"transforms.smt1.static.value": "v3" 
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"  
    },
    "other": {
      "k3": "v3"  
    }
  }
}


4. Nested field with the parent missing, and ignore is set


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value",
 "transforms.smt1.field.syntax.styleversion": "nestedv2",
"transforms.smt1.static.field": "parent.other.k3"
"transforms.smt1.static.value": "v3",
"transforms.smt1.field.on.missing.parent": "ignore"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"  
    }
  }
}


5. Nested field with the parent missing


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", 
 "transforms.smt1.field.stylesyntax.version": "nestedv2",
"transforms.smt1.static.field": "parent.child.k2"
"transforms.smt1.static.value": "456"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "456"  
    }
  }
}


6. Nested field with the parent missing, and ignore is set


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", 
 "transforms.smt1.field.stylesyntax.version": "nestedv2",
"transforms.smt1.static.field": "parent.child.k2"
"transforms.smt1.static.value": "456",
"transforms.smt1.field.on.existing.field": "ignore"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"  
    }
  }
}


...

  • Add a hoisted config to point to a specific path to hoist.hoist.

  • Does not support multiple values (e.g. deep scan or array)

New configurations:

Name

TypeDefaultImportanceDocumentation
hoisted STRING <empty>MEDIUM Path to the element to be hoisted. If empty, the root struct/map is hoisted.

...

2. Nested struct, when field names include dots
scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HoistFIeld$Value",
"transforms.smt1.field.style": "nested",
"transforms.smt1.hoisted": "parent.child.k2",
"transforms.smt1.field": "other"
}
Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "other": {
        "k2": "123"
      }    
    }
  }
}
", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.hoisted": "parent.child.k2",
"transforms.smt1.field": "other"
}



Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2child": "123"{
   }
}
Code Block
languagejs
{
"transforms   "other": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HoistFIeld$Value",
"transforms.smt1.field.style": "nested",
"transforms.smt1.hoisted": "parent..child",
"transforms.smt1.field{
        "k2": "other"
}
123"
      }    
    }
  }
}


2. Nested struct, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "other": {
    "parent.child": {
      "k2": "123"
    }
  }
}

Non-affected SMTs

These SMT do not require nested structure support:

  • DropHeaders: Drop one or multiple headers.
  • Filter: Drops the whole message based on a predicate.
  • InsertHeader: Insert a specific message to the header.
  • RegexRouter: Acts on the topic name.
  • SetSchemaMetadata: Acts on root schema.
  • TimestampRouter: Acts on timestamp.
  • Flatten: Acts on the whole key or message. 

Proposed Changes

Nested notation

Using dots tends to be the most intuitive way to access the nested record structures, e.g. jq tooling already uses it[1] and will cover most of the scenarios.

...



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HoistFIeld$Value", "transforms.smt1.field.syntax.version": "v2",
"transforms.smt1.hoisted": "parent..child",
"transforms.smt1.field": "other"
}



Code Block
languagejs
{
  "k1": 123,
  "other": {
    "parent.child": {
      "k2": "123"
    }
  }
}



Non-affected SMTs

These SMT do not require nested structure support:

  • DropHeaders: Drop one or multiple headers.
  • Filter: Drops the whole message based on a predicate.
  • InsertHeader: Insert a specific message to the header.
  • RegexRouter: Acts on the topic name.
  • SetSchemaMetadata: Acts on root schema.
  • TimestampRouter: Acts on timestamp.
  • Flatten: Acts on the whole key or message. 

Then, for transform configuration, double dots can be used to escape existing dots that are part of the field name.

[1] https://stedolan.github.io/jq/manual/#Basicfilters

[2] https://datatracker.ietf.org/doc/html/rfc4180 2.7

> If double-quotes are used to enclose fields, then a double-quote appearing inside a field must be escaped by preceding it with another double quote.

...

Compatibility, Deprecation, and Migration Plan

...

Keep ExtractField as it is and use it multiple times until reaching nested fields

This KIP proposes to simplify simplifying this configuration by replacing multiple invocations with only one nested one.

...

With double dotsWith separator


Code Block
{
  "transforms": "cast",
   "transforms.cast.field.stylesyntax.version": "nestedv2",          
  "transforms.cast.type": "..."
  "transforms.cast.spec": "address..personal.country:string"
}



Code Block
{   
  "transforms": "cast",
  "transforms.cast.field.stylesyntax.version": "nestedv2",
  "transforms.cast.field.separator": "/", 
  "transforms.cast.type": "..."
  "transforms.cast.spec": "address.personal/country:string",
}


Even if using custom separators represent a more explicit configuration, there is always the possibility that all the separators are already included as part of the field name, leading to issues and request for changes.

To avoid this, this KIP is proposing to use proposes using the approach to precede dots with another do to escape itself.

Use JSONPath notation to access nested elements

//TODO

Use named styles instead of syntax versions

//TODO

Potential KIPs

Future KIPs could extend this support for:

...