Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

Code Block
languagejava
// Existing base class for SourceRecord and SinkRecord, new self type parameter.
public abstract class ConnectRecord<R extends ConnectRecord<R>> {
 
    // ...
 
    // New abstract method:
       
    /** Generate a new record of the same type as itself, with the specified parameter values. **/
    public abstract R newRecord(String topic, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp);
 
}

public interface Transformation<R extends ConnectRecord<R>> {
    /** Initialize with the provided configuration properties. **/
    void init(Map<String, String> config);
 
    /** Apply transformation to the {@code record} and return another record object (which may be {@code record} itself). Must be thread-safe. **/
    R apply(R record);
 
    /** Signal that this transformation instance will no longer will be used. **/
    void close();
 
    /** Configuration specification for this transformation. **/
    ConfigDef config();
 
}

 

Configuration

A transformation chain will be configured at the connector-level. The order of transformations is defined by the transforms config which represents a list of aliases. An alias in transforms implies that some additional keys are configurable:
transforms.$alias.type – fully qualified class name for the transformation
transforms.$alias.* – all other keys as defined in Transformation.config() are embedded with this prefix

...

 

Code Block
transforms=tsRouter,insertKafkaCoordinates

transforms.tsRouter.type=org.apache.kafka.connect.transforms.TimestampRouter
transforms.tsRouter.topic.format=${topic}-${timestamp}
transforms.tsRouter.timestamp.format=yyyyMMdd
 
transforms.insertKafkaCoordinates.type=org.apache.kafka.connect.transforms.InsertInValue
transforms.insertKafkaCoordinates.topic=kafka_topic
transforms.insertKafkaCoordinates.partition=kafka_partition
transforms.insertKafkaCoordinates.offset=kafka_offset

 

Runtime changes

For source connectors, transformations are applied on the collection of SourceRecord retrieved from SourceTask.poll().

...

Unit tests for runtime changes and each bundled transformation, as well as system test exercising a few different transformation chains.

Rejected Alternatives

Not including any transformations with Connect

...

Transformation chains as top-level construct

The current proposal is to have transformation chains be configured in the connector config under the prefix transforms. An alternative would be to reference a transformation chain by a name in the connector configuration, with the transformation chain specification managed separately by Connect.

...