Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP is aimed to include support for nested structures on the existing SMTs   where this makes sense and to include the abstractions to reuse this in future SMTswhere nested structures are used.

Public Interfaces

From the existing list of SMTs, there are the following to be impacted by this change:

...

NameTypeDefaultImportanceDocumentation
transforms.<name>.field.style STRING plain HIGH

Permitted values: plain , nested. Defines how to traverse a record structure to apply a transformation. If set to "plain", then the transformations will only apply to the elements located at the root of the message. If set to "nested", then nested elements will be affected by the transformations as well. To access nested elements, dotted notation is used. If dots are already included in the field name, then dots itself themselves can be used to represent dots part of the field name. e.g. to access elements from a struct/map named "same.field", the following format can be used to access its elements: "same..field.element"

Example:

In this Cast transform, to cast access an element "country" inside a struct/map named "address.personal":

...

These flags will be added conditionally to some SMTs, described below.

Affected SMTs

Cast

Changes:

  • Extend spec to support nested notation.

Examples:

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.smt1.field.style": "nested",
"transforms.smt1.spec": "k1:string,parent.child.k2:int64"
}



Code Block
languagejs
{
  "k1": "123",
  "parent": {
    "child": {
      "k2": 123    
    }
  }
}


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "

...

smt1",

...

"transforms.smt1.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.

...

smt1.field.style": "nested",
"transforms.smt1.spec": "k1:string,parent..child.k2:int64"
}



Code Block
languagejs
{
  "k1": "123",
  "parent.child": {
    "k2": 123
  }
}


ExtractField

Changes:

  • Extend field to support nested notation.

Example:

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,

...


  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.style": "nested",
"transforms.smt1.field": "parent.child.k2"
}



Code Block
languagejs
"123"


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms

...

": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.style": "nested",
"transforms.smt1.field": "parent..child.k2"
}



Code Block
languagejs
"123"


HeaderFrom

Changes:

  • Extend fields to support nested notation.
  • As this SMT affects only existing fields, additional configurations will not be required.

Example:

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.fields": "k1,parent.child.k2",
"transforms.smt1.headers": "k1,k2"
}



Code Block
languagejs
headers:
- k1=123
- k2="123"


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.fields": "k1,parent..child.k2",
"transforms.smt1.headers": "k1,k2"
}



Code Block
languagejs
headers:
- k1=123
- k2="123"


MaskField

Changes:

  • Extend fields to support nested notation.

Example:

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.fields": "parent.child.k2"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": ""    
    }
  }
}


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.fields": "parent..child.k2"
}



Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": ""
  }
}


ReplaceField

Changes:

  • Extend theinclude and exclude lists

Example:

scenarioinputsmtoutput
1. Nested field. Drop field


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.exclude": "parent.child.k2"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
    }
  }
}


2. Nested field. Drop struct


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.exclude": "parent.child"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
  }
}


3. Nested field. Include field


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123",
      "k3": "234"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.include": "parent.child.k2"
}



Code Block
languagejs
{
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}


4. Nested field. Include struct


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123",
      "k3": "234"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.include": "parent.child"
}



Code Block
languagejs
{
  "parent": {
    "child": {
      "k2": "123",
      "k3": "234"    
    }
  }
}


5. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.smt1.fields.style": "nested",
"transforms.smt1.renames": "parent..child.k2:field2"
}



Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
   "field2": "123"
  }
}


TimestampConverter

Changes:

  • Extend fields to support nested notation.

Example:

ValueToKey

Changes:

  • Extend fields to support nested notation.

Example:

InsertField

Changes:

  • Extend field to support nested notation.

New configurations:

Name

TypeDefaultImportanceDocumentation
field.on_missing_parentSTRINGcreateMEDIUMPermitted values: create, fail, ingore. Defines how to react when the field to act on does not have a parent. If set to "create", then the SMT will create the parent struct/map when it does not exist. If set to "fail", then the SMT will throw an exception. If set to "ignore", then it will SMT have no effect.
field.on_existing_fieldSTRINGoverwriteMEDIUMPermitted values: overwrite, fail, ingore. Defines how to react when the field to act on already exists. If set to "overwrite", then the SMT will be applied to the existing field. If set to "fail", then the SMT will throw an exception. If set to "ignore", then it will SMT have no effect.


Example:

inputsmtoutput










HoistField

Changes:

  • Add

SMTs affected

Extending the support for field configuration for dotted separation:

...

Will require additional configurations:

  • HoistField: add a hoisted config to point to a specific path to hoist.  

New configurations:

Name

TypeDefaultImportanceDocumentation
hoisted STRING <empty>MEDIUM Path to the element to be hoisted. If empty, the root struct/map is hoisted.


  • For example:

    Code Block
       hoisted = nested.val
       field = line
    
       value (before):
       {
         "nested": {
           "val": 42,
           "other val": 96
         }
       }
    
       value (after):
       {
         "nested": {
           "line": {
             "val": 42,
           },
         "other val": 96
         }
       } 


Non-affected SMTs

These SMT do not require nested structure support:

...

[2] https://datatracker.ietf.org/doc/html/rfc4180 2.7

> If > If double-quotes are used to enclose fields, then a double-quote appearing inside a field must be escaped by preceding it with another double quote.

Examples:


Compatibility, Deprecation, and Migration Plan

...