Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

NOTE: Rejected, see KIP-821: Connect Transforms support for nested structures

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Single Message Transforms (SMT), KIP-66, have greatly improved Connector's usability by enabling processing input/output data without the need for additional streaming applications. 

...

This KIP is aimed to include support for nested structures on the existing SMTs where nested structures are used.

Public Interfaces

From the existing list of SMTs, there are the following to be impacted by this change:

New configuration flags

NameTypeDefaultImportanceDocumentation
field.style STRING plain HIGH

Permitted values: plain , nested. Defines how to traverse a record structure to apply a transformation. If set to "plain", then the transformations will only apply to the elements located at the root of the message. If set to "nested", then nested elements will be affected by the transformations as well. To access nested elements, dotted notation is used. If dots are already included in the field name, then dots themselves can be used to represent dots part of the field name. e.g. to access elements from a struct/map named "same.field", the following format can be used to access its elements: "same..field.element"

These flags will be added conditionally to some SMTs, described below.

Affected SMTs

Cast

Changes:

  • Extend spec to support nested notation.

...

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.smt1.spec.paths": "$[k1]:string,$[parent][child][k2]:int64"
}



Code Block
languagejs
{
  "k1": "123",
  "parent": {
    "child": {
      "k2": 123    
    }
  }
}


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.smt1.spec.paths": "$['k1']:string,$['parent.child']['k2']:int64"
}



Code Block
languagejs
{
  "k1": "123",
  "parent.child": {
    "k2": 123
  }
}


ExtractField

Changes:

  • Extend field to support nested notation.

...

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field.path": "$[parent][child][k2]"
}



Code Block
languagejs
"123"


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.smt1.field": "$['parent.child'][k2]"
}



Code Block
languagejs
"123"


HeaderFrom

Changes:

  • Extend fields to support nested notation.
  • As this SMT affects only existing fields, additional configurations will not be required.

...

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.smt1.fields.pahts": "$[k1],$[parent][child][k2]",
"transforms.smt1.headers": "k1,k2"
}



Code Block
languagejs
headers:
- k1=123
- k2="123"


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.smt1.fields": "$[k1],$[parent.child][k2]",
"transforms.smt1.headers": "k1,k2"
}



Code Block
languagejs
headers:
- k1=123
- k2="123"


MaskField

Changes:

  • Extend fields to support nested notation.

...

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.smt1.fields.paths": "$[parent][child][k2]"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": ""    
    }
  }
}


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.smt1.fields": "$[parent.child][k2]"
}



Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": ""
  }
}


ReplaceField

Changes:

  • Extend theinclude and exclude lists

...

scenarioinputsmtoutput
1. Nested field. Drop field


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.smt1.exclude.path": "$[parent][child][k2]"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
    }
  }
}


2. Nested field. Drop struct


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.smt1.exclude": "$[parent][child]"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
  }
}


3. Nested field. Include field


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123",
      "k3": "234"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.smt1.include": "$[parent][child][k2]"
}



Code Block
languagejs
{
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}


4. Nested field. Include struct


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123",
      "k3": "234"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.smt1.include": "$[parent][child]"
}



Code Block
languagejs
{
  "parent": {
    "child": {
      "k2": "123",
      "k3": "234"    
    }
  }
}


5. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.smt1.renames.paths": "$[parent.child][k2]:field2"
}



Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
   "field2": "123"
  }
}


TimestampConverter

Changes:

  • Extend fields to support nested notation.

...

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": 1556204536000         }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.smt1.field": "$[parent][child][k2]",
"transforms.smt1.format": "yyyy-MM-dd",
"transforms.smt1.target.type": "string"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "2014-04-25"         }
  }
}


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
      "k2": 1556204536000         }
  }
}



Code Block
languagejs


{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.smt1.field.style": "nested",
"transforms.smt1.field": "$[parent.child][k2]",
"transforms.smt1.format": "yyyy-MM-dd",
"transforms.smt1.target.type": "string"
}




Code Block
languagejs
{
  "k1": 123,
  "parent.child": {      "k2": "2014-04-25"   }
}


ValueToKey

Changes:

  • Extend fields to support nested notation.

...

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.smt1.fields.paths": "$[parent][child][k2]"
}



Code Block
"123"



2. Nested struct to Key.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.smt1.fields.paths": "$[parent][child]"
}



Code Block
{
  "k2": "123"    
}



3. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.smt1.fields.paths": "$[parent.child][k2]"
}



Code Block
languagejs
"123"


InsertField

Changes:

  • Extend *.field to support nested notation.

...

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.smt1.static.field.path": "$[parent][child][k3]"
"transforms.smt1.static.value": "v3" 
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123",
      "k3": "v3"   
    }
  }
}


2. Nested field, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", 
"transforms.smt1.static.field.path": "$[parent.child][k3]"
"transforms.smt1.static.value": "v3" 
}



Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123",
    "k3": "v3"
  }
}


3. Nested field with the parent missing


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.smt1.static.field.path": "$[parent][other][k3]"
"transforms.smt1.static.value": "v3" 
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"  
    },
    "other": {
      "k3": "v3"  
    }
  }
}


4. Nested field with the parent missing, and ignore is set


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.smt1.static.field.path": "$[parent][other][k3]"
"transforms.smt1.static.value": "v3",
"transforms.smt1.field.on.missing.parent": "ignore"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"  
    }
  }
}


5. Nested field with the parent missing


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", 
"transforms.smt1.static.field.path": "$[parent][child][k2]"
"transforms.smt1.static.value": "456"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "456"  
    }
  }
}


6. Nested field with the parent missing, and ignore is set


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.InsertField$Value", 
"transforms.smt1.static.field.path": "$[parent][child][k2]"
"transforms.smt1.static.value": "456",
"transforms.smt1.field.on.existing.field": "ignore"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"  
    }
  }
}


HoistField

Changes:

  • Add a hoisted config to point to a specific path to hoist.

...

scenarioinputsmtoutput
1. Nested field.


Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "k2": "123"    
    }
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HoistFIeld$Value",
"transforms.smt1.hoisted": "$[parent][child][k2]",
"transforms.smt1.field": "other"
}



Code Block
languagejs
{
  "k1": 123,
  "parent": {
    "child": {
      "other": {
        "k2": "123"
      }    
    }
  }
}


2. Nested struct, when field names include dots


Code Block
languagejs
{
  "k1": 123,
  "parent.child": {
    "k2": "123"
  }
}



Code Block
languagejs
{
"transforms": "smt1",
"transforms.smt1.type": "org.apache.kafka.connect.transforms.HoistFIeld$Value",
"transforms.smt1.field.style": "nested",
"transforms.smt1.hoisted": "$[parent.child]",
"transforms.smt1.field": "other"
}



Code Block
languagejs
{
  "k1": 123,
  "other": {
    "parent.child": {
      "k2": "123"
    }
  }
}



Non-affected SMTs

These SMT do not require nested structure support:

  • DropHeaders: Drop one or multiple headers.
  • Filter: Drops the whole message based on a predicate.
  • InsertHeader: Insert a specific message to the header.
  • RegexRouter: Acts on the topic name.
  • SetSchemaMetadata: Acts on root schema.
  • TimestampRouter: Acts on timestamp.
  • Flatten: Acts on the whole key or message. 

Proposed Changes

Nested notation

Using dots tends to be the most intuitive way to access the nested record structures, e.g. jq tooling already uses it[1] and will cover most of the scenarios.

...

> If double-quotes are used to enclose fields, then a double-quote appearing inside a field must be escaped by preceding it with another double quote.

Examples:


Compatibility, Deprecation, and Migration Plan

Existing SMT configurations will not be affected by these changes as the default field.style  is plain, which represents the current behavior.

Rejected Alternatives

Keep ExtractField as it is and use it multiple times until reaching nested fields

This KIP proposes to simplify this configuration by replacing multiple invocations with only one nested one.

Use dots as the only separator and escape with backslashes when collides

Trying to keep only one separator, one of the alternatives is to use dots to separate; if it collides with the existing field names use backslashes "\" to represent dots that are part of the name e.g.  "this.field" (which would refer to the nested field "field" under the top-level "this" field), and "this\.field" (which would refer to the field named "this.field").

However, backslashes are also used by JSON. This could lead unfriendly configurations like "this\\\\.is\\\\.not\\\\.very\\\\.readable"

Use custom separators for edge cases

Using double dots to escape separators is another alternative to try sticking to using only dots as a field separator.

...

To avoid this, this KIP is proposing to use the approach to precede dots with another do escape itself.

Potential KIPs

Future KIPs could extend this support for:

...