...
Discussion thread: here
JIRA: KAFKA-3209
Released: 0.10.2.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Note | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
The framework for Single Message Transforms was released on 0.10.2.0 but only some of the built-in transformations were included with that version. The table below indicates what version each transformation was or will be released with. A few don't have the exact name as listed in the KIP because they were found to be slightly inaccurate during code review.
The Kafka documentation also includes references for each transformation. |
Motivation
This proposal is for adding a record transformation API to Kafka Connect as well as certain bundled transformations. At the same time, we should not extend Connect's area of focus beyond moving data between Kafka and other systems. We will only support simple 1:{0,1} transformations – i.e. map and filter operations.
...
Code Block | ||
---|---|---|
| ||
// Existing base class for SourceRecord and SinkRecord, new self type parameter. public abstract class ConnectRecord<R extends ConnectRecord<R>> { // ... // New abstract method: /** Generate a new record of the same type as itself, with the specified parameter values. **/ public abstract R newRecord(String topic, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp); } public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable { /** Initialize with the provided configuration properties. **/ void init/ via Configurable base interface: // void configure(Map<String, String>?> configconfigs); /** * Apply transformation to the {@code record} and return another record object (which may be {@code record} itself) or {@code null}, * corresponding to a map or filter operation respectively. The implementation Mustmust be thread-safe. * */ R apply(R record); /** Configuration Signalspecification thatfor this transformation instance will no longer will be used. **/ voidConfigDef closeconfig(); /** ConfigurationSignal specification forthat this transformation instance will no longer will be used. **/ ConfigDef config@Override void close(); } |
Configuration
A transformation chain will be configured at the connector-level. The order of transformations is defined by the transforms
config which represents a list of aliases. An alias in transforms
implies that some additional keys are configurable:
- transforms.$alias.type
– fully qualified class name for the transformation
- transforms.$alias.*
– all other keys as defined in Transformation.config()
are embedded with this prefix
...
Name | Functionality | Rationale | Configuration |
---|---|---|---|
Mask{Key,Value} | Mask or replace the specified primitive fields, assuming there is a top-level Struct . | Obscure sensitive info like credit card numbers. |
|
InsertIn{Key,Value} | Insert specified fields with given name, assuming there is a top-level Struct . | Widely applicable to insert certain record metadata. |
|
TimestampRouter | Timestamp-based routing. | Useful for temporal data e.g. application log data being indexed to a search system with a sink connector can be routed to a daily index. |
|
RegexRouter | Regex-based routing. | There are too many inconsistent configs to route in different connectors. |
See http://docs.oracle.com/javase/7/docs/api/java/util/regex/Matcher.html#replaceFirst(java.lang.String) |
ValueToKey | Create or replace record key with data from record value. | Useful when a source connector does not populate the record key but only the value with a |
|
Flatten | Flatten nested | Useful for sink connectors that can only deal with flat Struct s. |
|
Replace | Filter and rename fields. | Useful for lightweight data munging. |
|
NumericCasts | Casting of numeric field to some specified numeric type. | Useful in conjunction with source connectors that don't have enough information and utilize an unnecessarily wide data type. |
|
TimestampConverter | Convert datatype of a timestamp field. | Timestamps are represented in a ton of different ways, provide a transformation from going between strings, epoch times as longs, and Connect date/time types. |
|
Hoist{Key,Value}ToStruct | Wrap data in a | Useful when a transformation or sink connector expects Struct but the data is a primitive type. |
|
Extract{Key,Value}FromStruct | Extract a specific field from a Struct . | The inverse of Hoist{Key,Value}ToStruct |
|
Set{Key,Value}SchemaMetadata | Set/clobber Schema name or version. | Allow setting or overriding the schema name and/or version where necessary. |
|
...