Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 Kafka        →        Converter.toConnectData()        →        Transformation.apply()*        →       SinkTask.put()

  

Features

  • Backwards compatible - no breaking change in the current APIs is required. Transformation is an additional layer at the edge of record exchange between the framework and connectors.
  • Pluggable - initialized and configured somewhat similarly to Converter
  • Stackable - can be chained in a defined order
  • Fairly flexible - within the constraints of the TransformableRecord API and 1:{0,1} mapping
    • Any kind of filtering, renaming, masking operations on the data, adding fields, etc.
    • Filtering of records from the stream.
    • Routing for both source and sink - sink connectors can also just operate on the TransformableRecord.topic since the target 'bucket' (table, index, etc.) in always a function of that.
    • For any transformation that requires access to certain fields not exposed on the TransformableRecord i.e. {SourceRecord,SinkRecord}.kafkaPartitionSinkRecord.kafkaOffset, or SinkRecord.timestampType – it can set the R type parameter to specifically be SourceRecord and SinkRecord and use the relevant constructors instead of newRecord(). It can also just cast internally if some an optional functionality requires access to such a field.

Example transformations

List of example transformations to demonstrate broad applicability - not in any particular order, and some more thought-through than others. We may want to include some of these with Connect itself to provide some useful.

  • Mask
    • Masks primitive fields: obscure sensitive info like credit card numbers.
    • Configure with list of fields to randomize or clobber.
  • Flatten
    • Flatten nested Structs inside a top-level Struct, omitting all other non-primitive fields. Useful for connectors that can only deal with flat Structs like Confluent's JDBC Sink.
    • Configure with delimiter to use when flattening field names.
  • Replace
    • Filter and rename fields. Useful for lightweight data munging.
    • Configure with whitelist and/or blacklist, map of fields to rename.
  • NumericCasts
    • Casting of numeric field to some numeric type, useful in conjunction with source connectors that don't have enough information.
    • Configure with map of field to type (i.e. boolean, int8, int16, int32, int64, float32, float64).
  • TimestampRouter
    • Useful for temporal data e.g. application log data being indexed to Elasticsearch with a sink connector can be routed to a daily index.
    • Configure with SimpleDateFormat-compatible timestamp format string, and a format string for the renamed topic that can have placeholders for original topic and the timestamp.
  • Insert
    • Allow inserting into a top-level Struct record-level fields like the topicpartition, offset, timestamp. Can also allow a UUID field to be inserted.
    • Configure with names for desired fields.
  • RegexRouter
    • Regex-based routing. There are too many inconsistent configs to route in different connectors.
    • Configure with matcher regex and replacement that can contain capture references.
  • TimestampConverter
    • Timestamps are represented in a ton of different ways; provide a transformation from going between strings, epoch times as longs, and Connect date/time types.
    • Configure with field name and desired type.
  • HoistToStruct
    • Wrap data in a Struct.
    • Configure with field name to insert the original data as.
  • ExtractFromStruct
    • Extract a specific field from a Struct.
    • Configure with field name.

Patterns for data transformations

  • Data transformations could be applicable to the key or the value of the record. We could have *Key and *Value variants for these transformations that reuse the common functionality.
  • Some common utilities for data transformations will probably shape up:
    • Cache the changes they make to Schema objects, possibly only preserving last-seen one as the likelihood of source data Schema changing is low.
    • Copying of Schema objects with the possible exclusion of some fields, which they are modifying.
    • Likewise, copying of Struct object to another Struct having a different Schema with the exception of some fields, which they are modifying.