...
Code Block | ||
---|---|---|
| ||
// Existing base class for SourceRecord and SinkRecord, new self type parameter. public abstract class ConnectRecord<R extends ConnectRecord<R>> { // ... // New abstract method: /** Generate a new record of the same type as itself, with the specified parameter values. **/ public abstract R newRecord(String topic, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp); } public interface Transformation<R extends ConnectRecord<R>> { /** Initialize with the provided configuration properties. **/ void init(Map<String, String> config); /** Apply transformation to the {@code record} and return another record object (which may be {@code record} itself). Must be thread-safe. **/ R apply(R record); /** Signal that this transformation instance will no longer will be used. **/ void close(); /** Configuration specification for this transformation. **/ ConfigDef config(); } |
Configuration
A transformation chain will be configured at the connector-level. The order of transformations is defined by the transforms
config which represents a list of aliases. An alias in transforms
implies that some additional keys are configurable:
- transforms.$alias.type
– fully qualified class name for the transformation
- transforms.$alias.*
– all other keys as defined in Transformation.config()
are embedded with this prefix
...
Code Block |
---|
transforms=tsRouter,insertKafkaCoordinates transforms.tsRouter.type=org.apache.kafka.connect.transforms.TimestampRouter transforms.tsRouter.topic.format=${topic}-${timestamp} transforms.tsRouter.timestamp.format=yyyyMMdd transforms.insertKafkaCoordinates.type=org.apache.kafka.connect.transforms.InsertInValue transforms.insertKafkaCoordinates.topic=kafka_topic transforms.insertKafkaCoordinates.partition=kafka_partition transforms.insertKafkaCoordinates.offset=kafka_offset |
Runtime changes
For source connectors, transformations are applied on the collection of SourceRecord
retrieved from SourceTask.poll()
.
...
Unit tests for runtime changes and each bundled transformation, as well as system test exercising a few different transformation chains.
Rejected Alternatives
Not including any transformations with Connect
...
Transformation chains as top-level construct
The current proposal is to have transformation chains be configured in the connector config under the prefix transforms
. An alternative would be to reference a transformation chain by a name in the connector configuration, with the transformation chain specification managed separately by Connect.
...