THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
transforms=tsRouter,insertKafkaCoordinates transforms.tsRouter.type=org.apache.kafka.connect.transforms.TimestampRouter transforms.tsRouter.topic.format=${topic}-${timestamp} transforms.tsRouter.timestamp.format=yyyyMMdd transforms.insertKafkaCoordinates.type=org.apache.kafka.connect.transforms.InsertInValue transforms.insertKafkaCoordinates.topic=kafka_topic transforms.insertKafkaCoordinates.partition=kafka_partition transforms.insertKafkaCoordinates.offset=kafka_offset |
...
Features
- Backwards compatible - no breaking change in the current APIs is required. Transformation is an additional layer at the edge of record exchange between the framework and connectors.
- Pluggable - initialized and configured somewhat similarly to
Converter
s - Stackable - can be chained in a defined order
- Fairly flexible - within the constraints of the
TransformableRecord
API and 1:{0,1} mapping- Any kind of filtering, renaming, masking operations on the data, adding fields, etc.
- Filtering of records from the stream.
- Routing for both source and sink - sink connectors can also just operate on the
TransformableRecord.topic
since the target 'bucket' (table, index, etc.) in always a function of that. - For any transformation that requires access to certain fields not exposed on the
TransformableRecord
i.e.{SourceRecord,SinkRecord}.kafkaPartition
,SinkRecord.kafkaOffset
, orSinkRecord.timestampType
– it can set the R type parameter to specifically beSourceRecord
andSinkRecord
and use the relevant constructors instead ofnewRecord()
. It can also just cast internally if an optional functionality requires access to such a field.
Example transformations
List of example transformations to demonstrate broad applicability - not in any particular order, and some more thought-through than others. We may want to include some of these with Connect itself to provide some useful out-of-the-box functionality and encourage standard ways to perform these transformations.
Mask
- Masks primitive fields: obscure sensitive info like credit card numbers.
- Configure with list of fields to randomize or clobber.
Flatten
- Flatten nested
Struct
s inside a top-levelStruct
, omitting all other non-primitive fields. Useful for connectors that can only deal with flatStruct
s like Confluent's JDBC Sink. - Configure with delimiter to use when flattening field names.
- Flatten nested
Replace
- Filter and rename fields. Useful for lightweight data munging.
- Configure with whitelist and/or blacklist, map of fields to rename.
NumericCasts
- Casting of numeric field to some numeric type, useful in conjunction with source connectors that don't have enough information.
- Configure with map of field to type (i.e. boolean, int8, int16, int32, int64, float32, float64).
TimestampRouter
- Useful for temporal data e.g. application log data being indexed to Elasticsearch with a sink connector can be routed to a daily index.
- Configure with
SimpleDateFormat
-compatible timestamp format string, and a format string for the renamedtopic
that can have placeholders for original topic and the timestamp.
Insert
- Allow inserting into a top-level
Struct
record-level fields like thetopic
,partition
,offset
,timestamp
. Can also allow a UUID field to be inserted. - Configure with names for desired fields.
- Allow inserting into a top-level
RegexRouter
- Regex-based routing. There are too many inconsistent configs to route in different connectors.
- Configure with matcher regex and replacement that can contain capture references.
TimestampConverter
- Timestamps are represented in a ton of different ways; provide a transformation from going between strings, epoch times as longs, and Connect date/time types.
- Configure with field name and desired type.
HoistToStruct
- Wrap data in a
Struct
. - Configure with field schema name for the
Struct
schema and field name to insert the original data as.
- Wrap data in a
ExtractFromStruct
- Extract a specific field from a
Struct
. - Configure with field name.
- Extract a specific field from a
ValueToKey
- Useful when a source connector does not populate the
SourceRecord
key but only the value with aStruct.
- Configure with list of field names to hoist into the record key as a primitive (single field ) /
Struct
(multiple fields), and a flag to force wrapping in aStruct
even when it is a single field.
- Useful when a source connector does not populate the
Patterns for data transformations
- Data transformations could be applicable to the key or the value of the record. We could have *
Key
and *Value
variants for these transformations that reuse the common functionality. - Some common utilities for data transformations will probably shape up:
- Cache the changes they make to
Schema
objects, possibly only preserving last-seen one as the likelihood of source dataSchema
changing is low. - Copying of
Schema
objects with the possible exclusion of some fields, which they are modifying. - Likewise, copying of
Struct
object to anotherStruct
having a differentSchema
with the exception of some fields, which they are modifying. - Where fields are being added and a field name specified in configuration, we may want a consistent way to convey if it should be created as an optional field. E.g. a leading '?' character.
- Cache the changes they make to
- Where field names are expected, we may want to allow for getting at nested fields by allowing a dotted syntax which is common in such usage (and accordingly, will need some reusable utilities around accessing a field that may be nested). Also implies actual dots in field names will need escaping.