Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
transforms=tsRouter,insertKafkaCoordinates
 
transforms.tsRouter.type=org.apache.kafka.connect.transforms.TimestampRouter
transforms.tsRouter.topic.format=${topic}-${timestamp}
transforms.tsRouter.timestamp.format=yyyyMMdd
 
transforms.insertKafkaCoordinates.type=org.apache.kafka.connect.transforms.InsertInValue
transforms.insertKafkaCoordinates.topic=kafka_topic
transforms.insertKafkaCoordinates.partition=kafka_partition
transforms.insertKafkaCoordinates.offset=kafka_offset

...

Features

  • Backwards compatible - no breaking change in the current APIs is required. Transformation is an additional layer at the edge of record exchange between the framework and connectors.
  • Pluggable - initialized and configured somewhat similarly to Converter
  • Stackable - can be chained in a defined order
  • Fairly flexible - within the constraints of the TransformableRecord API and 1:{0,1} mapping
    • Any kind of filtering, renaming, masking operations on the data, adding fields, etc.
    • Filtering of records from the stream.
    • Routing for both source and sink - sink connectors can also just operate on the TransformableRecord.topic since the target 'bucket' (table, index, etc.) in always a function of that.
    • For any transformation that requires access to certain fields not exposed on the TransformableRecord i.e. {SourceRecord,SinkRecord}.kafkaPartitionSinkRecord.kafkaOffset, or SinkRecord.timestampType – it can set the R type parameter to specifically be SourceRecord and SinkRecord and use the relevant constructors instead of newRecord(). It can also just cast internally if an optional functionality requires access to such a field.

Example transformations

List of example transformations to demonstrate broad applicability - not in any particular order, and some more thought-through than others. We may want to include some of these with Connect itself to provide some useful out-of-the-box functionality and encourage standard ways to perform these transformations.

  • Mask
    • Masks primitive fields: obscure sensitive info like credit card numbers.
    • Configure with list of fields to randomize or clobber.
  • Flatten
    • Flatten nested Structs inside a top-level Struct, omitting all other non-primitive fields. Useful for connectors that can only deal with flat Structs like Confluent's JDBC Sink.
    • Configure with delimiter to use when flattening field names.
  • Replace
    • Filter and rename fields. Useful for lightweight data munging.
    • Configure with whitelist and/or blacklist, map of fields to rename.
  • NumericCasts
    • Casting of numeric field to some numeric type, useful in conjunction with source connectors that don't have enough information.
    • Configure with map of field to type (i.e. boolean, int8, int16, int32, int64, float32, float64).
  • TimestampRouter
    • Useful for temporal data e.g. application log data being indexed to Elasticsearch with a sink connector can be routed to a daily index.
    • Configure with SimpleDateFormat-compatible timestamp format string, and a format string for the renamed topic that can have placeholders for original topic and the timestamp.
  • Insert
    • Allow inserting into a top-level Struct record-level fields like the topicpartition, offset, timestamp. Can also allow a UUID field to be inserted.
    • Configure with names for desired fields.
  • RegexRouter
    • Regex-based routing. There are too many inconsistent configs to route in different connectors.
    • Configure with matcher regex and replacement that can contain capture references.
  • TimestampConverter
    • Timestamps are represented in a ton of different ways; provide a transformation from going between strings, epoch times as longs, and Connect date/time types.
    • Configure with field name and desired type.
  • HoistToStruct
    • Wrap data in a Struct.
    • Configure with field schema name for the Struct schema and field name to insert the original data as.
  • ExtractFromStruct
    • Extract a specific field from a Struct.
    • Configure with field name.
  • ValueToKey
    • Useful when a source connector does not populate the SourceRecord key but only the value with a Struct. 
    • Configure with list of field names to hoist into the record key as a primitive (single field ) / Struct (multiple fields), and a flag to force wrapping in a Struct even when it is a single field.

Patterns for data transformations

  • Data transformations could be applicable to the key or the value of the record. We could have *Key and *Value variants for these transformations that reuse the common functionality.
  • Some common utilities for data transformations will probably shape up:
    • Cache the changes they make to Schema objects, possibly only preserving last-seen one as the likelihood of source data Schema changing is low.
    • Copying of Schema objects with the possible exclusion of some fields, which they are modifying.
    • Likewise, copying of Struct object to another Struct having a different Schema with the exception of some fields, which they are modifying.
    • Where fields are being added and a field name specified in configuration, we may want a consistent way to convey if it should be created as an optional field. E.g. a leading '?' character.
  • Where field names are expected, we may want to allow for getting at nested fields by allowing a dotted syntax which is common in such usage (and accordingly, will need some reusable utilities around accessing a field that may be nested). Also implies actual dots in field names will need escaping.