Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: WIP Under Discussion

Discussion thread: here

JIRA: KAFKA-3209

...

Name
Functionality
Rationale
Configuration
Mask{Key,Value}Mask or replace the specified primitive fields, assuming there is a top-level Struct.

Obscure sensitive info like credit card numbers.

  • randomize.fields – fields to replace with random data
  • clobber.fields – map of fields to replacement string/number
InsertIn{Key,Value}Insert specified fields with given name, assuming there is a top-level Struct.Widely applicable to insert certain record metadata.
  • topic – the target field name for record topic

  • partition – the target field name for record partition

  • offset – the target field name for record offset

  • timestamp – the target field name for record timestamp

  • uuid – the target field name for a UUID (caveat: not deterministic, not really record metadata so not sure it belongs here, but implementation-wise it makes sense to have here)

TimestampRouter

Timestamp-based routing.

Useful for temporal data e.g. application log data being indexed to a search system with a sink connector can be routed to a daily index.

  • topic.format – format string which can contain ${topic} and ${timestamp} as placeholders for the original topic and the timestamp, respectively

  • timestamp.format – a format string compatible with SimpleDateFormat
RegexRouterRegex-based routing.There are too many inconsistent configs to route in different connectors.
  • regex
  • replacement

See http://docs.oracle.com/javase/7/docs/api/java/util/regex/Matcher.html#replaceFirst(java.lang.String)

ValueToKey
Create or replace record key with data from record value.

Useful when a source connector does not populate the record key but only the value with a Struct.

  • fields – list of field names to hoist into the record key as a primitive (single field ) / Struct (multiple fields)
  • force.struct – force wrapping in a Struct even when it is a single field
Flatten

Flatten nested Structs inside a top-level Struct, omitting all other non-primitive fields.

Useful for sink connectors that can only deal with flat Structs.
  • delimiter – the delimiter to use when flattening field names

TODO: specify escaping

Replace

Filter and rename fields.

Useful for lightweight data munging.
  • whitelist – fields to include
  • blacklist – fields to exclude
  • rename – map of old field names to new field names
NumericCasts

Casting of numeric field to some specified numeric type.

Useful in conjunction with source connectors that don't have enough information and utilize an unnecessarily wide data type.
  • spec – map of field name to type (i.e. boolean, int8, int16, int32, int64, float32, float64)
TimestampConverterConvert datatype of a timestamp field.Timestamps are represented in a ton of different ways, provide a transformation from going between strings, epoch times as longs, and Connect date/time types.
  • field – the field name (optional, can be left out in case of primitive data)
  • type – desired type (i.e. string, long, Date, Time, Timestamp)
  • format – in case converting to or from a string, a SimpleDateFormat-compatible format string
Hoist{Key,Value}ToStruct

Wrap data in a Struct.

 
  • schema.name – name for the new Struct schema
  • field – field name for the original data within this Struct
Extract{Key,Value}FromStruct
Extract a specific field from a Struct. 
  • field – field name to extract

...

  • Data transformations could be applicable to the key or the value of the record. We will have *Key and *Value variants for these transformations that reuse the common functionality from a shared base class.

  • Some common utilities for data transformations will shape up:

    • Cache the changes they make to Schema objects, possibly only preserving last-seen one as the likelihood of source data Schema changing is low.

    • Copying of Schema objects with the possible exclusion of some fields, which they are modifying. Likewise, copying of Struct object to another Struct having a different Schema with the exception of some fields, which they are modifying.

    • Where fields are being added and a field name specified in configuration, we will want a consistent way to convey if it should be created as an optional field. We can use a leading '?' character. TODO: specify escaping

    • ConfigDef does not provide a Type.MAP, but for the time being we can piggyback on top of Type.LIST and represent maps as a list of key-value pairs separated by : TODO: specify escaping.
    • Where field names are expected, in some cases we should allow for getting at nested fields by allowing a dotted syntax which is common in such usage (and accordingly, will need some utilities around accessing a field that may be nested). TODO: specify escaping
    • There are escaping considerations to several such configs, so we will need utilities that that assume a consistent escaping style (e.g. backslashes).

Compatibility, Deprecation, and Migration Plan

...