Table of Contents |
---|
Status
Current state: Under Discussion Accepted
Discussion thread: here
JIRA: KAFKA-3209
Released: 0.10.2.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Note | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
The framework for Single Message Transforms was released on 0.10.2.0 but only some of the built-in transformations were included with that version. The table below indicates what version each transformation was or will be released with. A few don't have the exact name as listed in the KIP because they were found to be slightly inaccurate during code review.
The Kafka documentation also includes references for each transformation. |
Motivation
This proposal is for adding a record transformation API to Kafka Connect as well as certain bundled transformations. At the same time, we should not extend Connect's area of focus beyond moving data between Kafka and other systems. We will only support simple 1:{0,1} transformations – i.e. map and filter operations.
...
Code Block | ||
---|---|---|
| ||
// Existing base class for SourceRecord and SinkRecord, new self type parameter. public abstract class ConnectRecord<R extends ConnectRecord<R>> { // ... // New abstract method: /** Generate a new record of the same type as itself, with the specified parameter values. **/ public abstract R newRecord(String topic, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp); } public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable { /** Initialize with the provided configuration properties. **/ void init/ via Configurable base interface: // void configure(Map<String, String>?> configconfigs); /** * Apply transformation to the {@code record} and return another record object (which may be {@code record} itself). Must or {@code null}, * corresponding to a map or filter operation respectively. The implementation must be thread-safe. **/ R apply(R record); /** SignalConfiguration specification thatfor this transformation instance will no longer will be used. **/ voidConfigDef closeconfig(); /** ConfigurationSignal specification forthat this transformation instance will no longer will be used. **/ @Override ConfigDefvoid configclose(); } |
Configuration
A transformation chain will be configured at the connector-level. The order of transformations is defined by the transforms
config which represents a list of aliases. An alias in transforms
implies that some additional keys are configurable:
- transforms.$alias.type
– fully qualified class name for the transformation
- transforms.$alias.*
– all other keys as defined in Transformation.config()
are embedded with this prefix
...
Name | Functionality | Rationale | Configuration |
---|---|---|---|
Mask{Key,Value} | Mask or replace the specified primitive fields, assuming there is a top-level Struct . | Obscure sensitive info like credit card numbers. |
|
InsertIn{Key,Value} | Insert specified fields with given name, assuming there is a top-level Struct . | Widely applicable to insert certain record metadata. |
|
TimestampRouter | Timestamp-based routing. | Useful for temporal data e.g. application log data being indexed to a search system with a sink connector can be routed to a daily index. |
|
RegexRouter | Regex-based routing. | There are too many inconsistent configs to route in different connectors. |
See http://docs.oracle.com/javase/7/docs/api/java/util/regex/Matcher.html#replaceFirst(java.lang.String) |
ValueToKey | Create or replace record key with data from record value. | Useful when a source connector does not populate the record key but only the value with a |
|
Flatten | Flatten nested | Useful for sink connectors that can only deal with flat Struct s. |
|
Replace | Filter and rename fields. | Useful for lightweight data munging. |
|
NumericCasts | Casting of numeric field to some specified numeric type. | Useful in conjunction with source connectors that don't have enough information and utilize an unnecessarily wide data type. |
|
TimestampConverter | Convert datatype of a timestamp field. | Timestamps are represented in a ton of different ways, provide a transformation from going between strings, epoch times as longs, and Connect date/time types. |
|
Hoist{Key,Value}ToStruct | Wrap data in a | Useful when a transformation or sink connector expects Struct but the data is a primitive type. |
|
Extract{Key,Value}FromStruct | Extract a specific field from a Struct . | The inverse of Hoist{Key,Value}ToStruct |
|
Set{Key,Value}SchemaMetadata | Set/clobber Schema name or version. | Allow setting or overriding the schema name and/or version where necessary. |
|
...
Data transformations could be applicable to the key or the value of the record. We will have *
Key
and *Value
variants for these transformations that reuse the common functionality from a shared base class.Some common utilities for data transformations will shape up:
Cache the changes they make to
Schema
objects, possibly only preserving last-seen one as the likelihood of source dataSchema
changing is low.Copying of
Schema
objects with the possible exclusion of some fields, which they are modifying. Likewise, copying ofStruct
object to anotherStruct
having a differentSchema
with the exception of some fields, which they are modifying.Where fields are being added and a field name specified in configuration, we will want a consistent way to convey if it should be created as an a required or optional field. We can use a leading '!' or '?' character for this purpose if the user wants to make a different choice than the default determined by the transformation.
ConfigDef
does not provide aType.MAP
, but for the time being we can piggyback on top ofType.LIST
and represent maps as a list of key-value pairs separated by:.
- Where field names are expected, in some cases we should allow for getting at nested fields by allowing a dotted syntax which is common in such usage (and accordingly, will need some utilities around accessing a field that may be nested).
- There are escaping considerations to several such configs, so we will need utilities that that assume a consistent escaping style (e.g. backslashes).
...