Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Java API

Code Block
languagejava
public interface TransformableRecord<R extends TransformableRecord<R>> { // Implemented by SourceRecord and SinkRecord
 
    String topic();
 
    Schema keySchema();
 
    Object key();
 
    Schema valueSchema();
 
    Object value();
 
    Long timestamp();
 
    R newRecord(String topic, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp);
 
}
 
public interface Transformation<R extends TransformableRecord<R>> {
 
    void init(Map<String, String> config);
 
    R apply(R record);
 
    void close();
 
    ConfigDef config();

}


Configuration

A transformation chain will be configured at the connector-level. The order of transformations is defined by the transforms config which represents a list of aliases.

...


It may make sense to internally model the full transformation chain as a single composite Transformation.

Application

For source connectors, transformations are applied on results from SourceTask.poll().

...

 Kafka        →        Converter.toConnectData()        →        Transformation.apply()*        →       SinkTask.put()

  

Features

  • Pluggable - initialized and configured somewhat similarly to Converter
  • Stackable - can be chained in a defined order
  • Fairly flexible - within the constraints of the TransformableRecord API and 1:{0,1} mapping
    • Any kind of filtering, renaming, masking operations on the data, adding fields, etc.
    • Filtering of records from the stream.
    • Routing for both source and sink - sink connectors can also just operate on the TransformableRecord.topic since the target 'bucket' in always a function of that.
    • For any transformation that requires access to certain fields not exposed on the TransformableRecord i.e. {SourceRecord,SinkRecord}.kafkaPartitionSinkRecord.kafkaOffset, or SinkRecord.timestampType – it can set the R type parameter to specifically be SourceRecord and SinkRecord and use the relevant constructors instead of newRecord(). It can also just cast internally if some optional functionality requires access to such a field.

Example transformations

List of example transformations to demonstrate broad applicability - not in any particular order, and some more thought-through than others. We may want to include some of these with Connect itself to provide some useful.

  • Mask
    • Masks primitive fields: obscure sensitive info like credit card numbers
    • Configure with list of fields to randomize or clobber
  • Flatten
    • Flatten nested Structs inside a top-level Struct, omitting all other non-primitive fields. Useful for connectors that can only deal with flat Structs like Confluent's JDBC Sink.
    • Configure with delimiter to use when flattening field names.
  • Replace
    • Filter and rename fields. Useful for lightweight data munging.
    • Configure with whitelist and/or blacklist, map of fields to rename.
  • NumericCasts
    • Casting of numeric field to some numeric type, useful in conjunction with source connectors that don't have enough information.
    • Configure with map of field to type (i.e. boolean, int8, int16, int32, int64, float32, float64).
  • TimestampRouter
    • Useful for temporal data e.g. application log data being indexed to Elasticsearch with a sink connector can be routed to a daily index.
    • Configure with SimpleDateFormat-compatible timestamp format string, and a format string for the renamed topic that can have placeholders for original topic and the timestamp.
  • Insert
    • Allow inserting into a top-level Struct record-level fields like the topicpartition, offset, timestamp. Can also allow a UUID field to be inserted.
  • RegexRouter
    • Regex-based routing. There are too many inconsistent configs to route in different connectors.
    • Configure with matcher regex and replacement that can contain capture references.
  • TimestampConverter
    • Timestamps are represented in a ton of different ways; provide a transformation from going between strings, epoch times as longs, and Connect date/time types.
  • HoistToStruct
    • Wrap data in a Struct.
    • Configure with field name to insert the original data as.
  • ExtractFromStruct
    • Extract a specific field from a Struct.
    • Configure with field name.

Patterns for data transformations

  • Data transformations could be applicable to the key or the value of the record. We could have *Key and *Value variants for these transformations that reuse the common functionality.
  • Some common utilities for data transformations will probably shape up:
    • Cache the changes they make to Schema objects, possibly only preserving last-seen one as the likelihood of source data Schema changing is low.
    • Copying of Schema objects with the possible exclusion of some fields, which they are modifying.
    • Likewise, copying of Struct object to another Struct having a different Schema with the exception of some fields, which they are modifying.