Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Revised configuration, renamed Conditional to If

...

Single Message Transformations (SMTs) in Kafka Connect provide a convenient, code-free way to modify records from source connectors before they get sent to a Kafka topic. In some more complex use cases it can be desired to apply a list of SMTs conditionally, dependent on the topic the records are destined for. For example, with Debezium there are topics which represent a schema change and topics which represent a data change, and users might want to apply transformation transformations selectively, based on the topic type. SMTs cannot currently do this, since they're applied to all records produced by a source connector, irrespective of their intended topic.

This KIP proposes to add a new SMT to Apache Kafka which can conditionally apply a list of other transformations based on a few simple predicates on records.

Public Interfaces

A new Conditional If SMT will be added.

The existing Transformation  interface will have a default  method added in order to provide the more flexible SMT configuration which Conditional  which this SMT will require.

Code Block
languagejava
nopaneltrue
public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable {
    // ...

    /** Configuration specification for this transformation. **/
    ConfigDef config(); /* already exists */

    /** Configuration specification for this transformation. **/
    default ConfigDef config(Map<String, String> props) { /* new method */
        return config();
    }

    // ...
}

Proposed Changes

...

Add the If SMT

The Conditional  If SMT will apply an arbitrary transformation (or transformation chain) according to some predicate on the record

  • The if.type config parameter names the concrete class implementing RecordPredicate (see below).
  • Other config parameters starting with the if prefix defines config parameters for the RecordPredicate .
  • The then config parameter is a list of aliases for the transforms to be applied when the predicate is satisfied.
  • For each of the named aliases configs can be supplied starting with the then.${alias} prefix. This is entirely analogous to to the Conditional SMT has its own list of transformsto apply. The transformation chain is configured in exactly the same way as transforms themselves are configured, just using the Conditionalthe SMT's config prefix.The condition config defines the predicate for when the transforms are applied to a record using a <condition-type>:<parameters> syntax

Example configuration

The following shows the Conditional  SMT the configuration for applying the ExtractField$Key SMT on topics whose name matches the Java regular expression my-topic.* 

No Format
nopaneltrue
transforms: conditionalExtract
transforms.conditionalExtract.type: If
transforms.conditionalExtract.if.type: ConditionalTopicMatches
transforms.conditionalExtract.if.regex: my-prefix-.*
transforms.conditionalExtract.then: extractInt
transforms.conditionalExtract.transformsthen.extractInt.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.conditionalExtract.transformsthen.extractInt.field: c1
transforms.conditionalExtract.condition: topic-matches:my-topic.*

Conditions

...

topic-matches:<pattern> The transformation would be applied if the record's topic name matched the given regular expression pattern. For example, the following would apply the transformation on records being sent to any topic with a name beginning with "my-prefix-":

       transforms.conditionalExtract.condition: topic-matches:my-prefix-.*

has-header:<header-name> The transformation would be applied if the record had at least one header with the given name. For example, the following will apply the transformation on records with at least one header with the name "my-header":

       transforms.conditionalExtract.condition: has-header:my-header

The value of the if.type config of the SMT would be the name of a class implementing the new RecordPredicate interface:

Code Block
interface RecordPredicate<R extends ConnectRecord<R>> extends Configurable, Closeable {
    /**
     * Test the given record against this predicate.
     */
    boolean test(ConnectRecord<R> record);
}

As part of this KIP, two implementations of RecordPredicate would be provided:

TopicMatches condition

The TopicMatches  predicate would match a record if the record's name matched the regular expression configured with the regex  config parameter. An example is shown above.

Config nameTypeValidation
regexStringValid according to java.util.regex.Pattern.compile() 

Not condition

This predicate would negate the result of another predicate, named via an aliasnot:<condition-name> This would negate the result of another named condition using the condition config prefix. For example, the following will apply the transformation on records which lack any header with the name my-header:

      transforms.conditionalExtract.condition: not:hasMyHeader
      transforms.conditionalExtract.condition.hasMyHeader: has-header:my-header

did not start with "my-prefix-":

Code Block
transforms: conditionalExtract
transforms.conditionalExtract.type: If
transforms.conditionalExtract.if.type: Not
transforms.conditionalExtract.if.operand: hasPrefix
transforms.conditionalExtract.if.operand.hasPrefix.type: TopicMatches
transforms.conditionalExtract.if.operand.hasPrefix.regex: my-prefix-.*
transforms.conditionalExtract.then: extractInt
transforms.conditionalExtract.then.extractInt.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.conditionalExtract.then.extractInt.field: c1


Config nameTypeValidation
operandString

Other config parameters beginning with the if.operand.${alias} alias would be used to configured the negated predicateThe condition design is flexible to allow other conditions to be added in the future if necessary. For example it could support conjunction and disjunction predicates via and:<condition-name-1>,<condition-name-2>  and or:<condition-name-1>,<condition-name-2> in a similar was to not: .

Changes to Transformation

Currently Transformation  has a public ConfigDef config()  method which means that the ConfigDef  is static and cannot depend on the configs which are present in the provided configs. In order to support the transformations  the then config listing the SMT aliases whose configs are then prefixed by their alias it is necessary to make this interface more flexible. This will be done by adding a default ConfigDef config(Map<String, String> props)  method to Transformation . The default implementation will just call the nullary version of the method.

For the Conditional  SMT the ConfigDef built will depend on the configs present, though the following would always be required:

...

.

...

...

The returned ConfigDef  would include the configs for the given transformations using the existing ConfigDef.embed() mechanism.

...

  • Adding a new config to all connectors to allow conditionality based on a topic name regex. The new config name could potentially collide with a config used by some existing SMT. It's less flexible (or multiple configs are required to add flexibility). It can be a little awkward to configure conditionality for a whole chain of SMTs.
  • Using an expression language/DSL for specifying the condition. There's no reason to treat the condition differently from the conditionally applied transformations. It's likely if such syntax sugar were desirable for the condition it would also be desirable for configuring the guarded transformations too. If a DSL were to be implemented, it would be better to allow it to configure the whole transformation chain, rather than just conditions.