Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Discarded

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • Data type conversions of certain pre-identified fields for records coming across datasets only if those fields exist. [Ex: TimestampConverter can be run only if the specified date field exists irrespective of the record metadata]
  • Skip running certain transform if a given field does/does not exist. A lot of inbuilt transforms raise exceptions (Ex: InsertField transform if the field already exists) thereby breaking the task. Giving this control enable users to consciously configure for such cases.
  • Even though some inbuilt transforms explicitly handle these cases, it would still be an unnecessary pass-through iteration.
  • Considering each connector usually deals with multiple datasets (Even 100s for a database CDC connector), metadata-centric predicate checking will be somewhat limiting when we talk about such pre-identified custom metadata common fields in the records from across the datasets.

Public Interfaces

New:

  • org.apache.kafka.connect.transforms.predicates.HasField$Key
  • org.apache.kafka.connect.transforms.predicates.HasField$Value

...

  • The predicate takes a field path configuration option string (Ex: aabc.b.cxyz). Hence supports nested field checks.
  • Return true/false based on whether the path exists in the record.

...

  • predicates.<predicateName>.field.path

Examples

  • Converting a timestamp only if created_at  field exists
Code Block
"transforms": "modifyDate",
"transforms.modifyDate.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.modifyDate.target.format": "yyyy-MM-dd",
"transforms.modifyDate.target.type": "string",
"transforms.modifyDate.field": "created_at",
"transforms.modifyDate.predicate": "hasCreatedAt",
"predicates": "hasCreatedAt",
"predicates.hasCreatedAt.type": "org.apache.kafka.connect.transforms.predicates.HasField$Value",
"predicates.hasCreatedAt.field.path": "created_at"
  • Inserting only if the field testField doesn't exist
Code Block
"transforms": "insertTestField",
"transforms.insertTestField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTestField.static.field": "testField",
"transforms.insertTestField.static.value": "test val",
"transforms.insertTestField.predicate": "hasTestField",
"transforms.insertTestField.negate": "true",
"predicates": "hasTestField",
"predicates.hasTestField.type": "org.apache.kafka.connect.transforms.predicates.HasField$Value",
"predicates.hasTestField.field.path": "testField"


Compatibility, Deprecation, and Migration Plan

Fully respects the current predicate interfaces and there is no need for deprecation/migration.

Test Plan / Cases

  1. field.path setting is mandatory when the predicate is enabled
  2. Records will null key / values return a false predicate
  3. The predicate will return false for records with null keys/values.
  4. The predicate returns true Return a true predicate if the field exists in the record, even if the field is set to null.

...