Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Single Message Transformations (SMTs) in Kafka Connect provide a convenient,  codecode-free way to modify records from source connectors before they get sent to a Kafka topic. In some more complex use cases it can be desired to apply a list of SMTs conditionally, dependent on the topic the records are destined for. some aspect of the record being processed.

For example, with Debezium there are topics which represent a schema change and topics which represent a data change, and users might want to apply transformations selectively, based on the topic type. SMTs cannot currently do this, since they're applied to all records produced by a source connector, irrespective of their intended topic. This problem would be solved if it was possible to apply an SMT according to the name of the topic (See KAFKA-7052 for further details).

This KIP proposes to add a new SMT to Apache Kafka which can conditionally apply a list of other transformations based on a few simple predicates on recordsa way to only apply a particular transformation if the resource matches some condition. The condition is defined by a new interface and the implementations for common conditions will be provided. Connector authors and users will also be able to provide their own condition implementations for special cases, but this is not expected to be a common need.

A new Filter SMT will also be implemented. This will filter records which do, or do not, satisfy a given condition. This is advantageous for users who do not want to incur the storage costs of consuming everything from a source connector, for example. Instead they can chose to ingest only the records of interest. Likewise for sink connectors it will enable exporting a subset of data without needing to resort to a Kafka Streams application to filter it first.


Public Interfaces

A new If SMT Predicate interface will be added .The existing Transformation interface will have a default method added in order to provide the more flexible SMT configuration which this SMT will requirethe new org.apache.kafka.connect.transforms.predicates package.

Code Block
languagejava
nopanellinenumberstrue
public interface Transformation<R/**
* A condition on ConnectRecords.
* Implementations of this interface can be used for filtering records and conditionally applying Transformations.
* Implementations must be public and have a public constructor with no parameters.
*/
public Predicate<R extends ConnectRecord<R>> extendsimplements Configurable, Closeable {

    //** ...

     /** Configuration specification for this transformationpredicate. *
     */
    ConfigDef config();

    /**
   already exists */

 Validate the predicate configuration values against configuration definitions.
     * @param predicateConfigs the provided configuration values
     /** Configuration specification for this transformation. *** @return List of Config, each Config contains the updated configuration information given
     * the current configuration values.
     */
    default ConfigDefConfig configvalidate(Map<String, String> propspredicateConfigs) { /* new method */
        ConfigDef configDef = config();
        returnif config();null == configDef) {
    }

        throw new ConnectException(
               // ...
}

Proposed Changes

Add the If SMT

The If SMT will apply an arbitrary transformation (or transformation chain) according to some predicate on the record. 

  • The if.type config parameter names the concrete class implementing RecordPredicate (see below).
  • Other config parameters starting with the if prefix defines config parameters for the RecordPredicate .
  • The then config parameter is a list of aliases for the transforms to be applied when the predicate is satisfied.
  • For each of the named aliases configs can be supplied starting with the then.${alias} prefix. This is entirely analogous to to the way as transforms themselves are configured, just using the SMT's config prefix.

Example configuration

The following shows the configuration for applying the ExtractField$Key SMT on topics whose name matches the Java regular expression my-topic.* 

No Format
nopaneltrue
transforms: conditionalExtract
transforms.conditionalExtract.type: If
transforms.conditionalExtract.if.type: TopicMatches
transforms.conditionalExtract.if.regex: my-prefix-.*
transforms.conditionalExtract.then: extractInt
transforms.conditionalExtract.then.extractInt.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.conditionalExtract.then.extractInt.field: c1

Conditions

The value of the if.type config of the SMT would be the name of a class implementing the new RecordPredicate interface:

Code Block
interface RecordPredicate<R extends ConnectRecord<R>> extends Configurable, Closeable {String.format("%s.config() must return a ConfigDef that is not null.", this.getClass().getName())
            );
        }
        List<ConfigValue> configValues = configDef.validate(predicateConfigs);
        return new Config(configValues);
    }

    /**
     * Returns Testwhether the given record againstsatisfies this predicate.
     */
    boolean test(ConnectRecord<R>R record);

    @Override
    void close();
}

As part of this KIP, two implementations of RecordPredicate would be provided:

TopicMatches condition

All transformations will gain new implict configuration parameters which will be consumed by the connect runtime and not passed to the Transformation.configure() method. These parameters will start with a question mark (?) so it's extremely unlikely they will collide with configuration parameters used by existing connectors.

A new Filter SMT will be added to enable record filtering.

Proposed Changes

Predicates

The Predicate interface is described above.

In order to negate the result of a predicate all predicates will implicitly support a boolean negate configuration parameter, which defaults to false.

In addition to the Predicate interface described above, this KIP will provide the following implementations:

TopicNameMatches

test() will return true when the ConnectRecord.topic() (i.e. it's name) matches a given Java regular expression pattern.

Config nameTypeDefaultRequired
patternStringnullyes

HasHeaderKey

test() will return true when the ConnectRecord.headers() has 1 or more headers with a given keyThe TopicMatches  predicate would match a record if the record's name matched the regular expression configured with the regex  config parameter. An example is shown above.

Config nameTypeDefaultValidationRequired
regexnameStringValid according to java.util.regex.Pattern.compile() 

Not condition

nullyes

RecordIsTombstone

test() will return true when the ConnectRecord represents a tombstone (i.e. has a null value). This predicate has no configuration parameters.

Conditionally applying an SMT

When a Transformation is configured with the new ?type parameter its application will happen conditionally. The value of the ?type parameter will be the name of a concrete class implementing the Predicate interface. Configuration for the predicate will come from all other configuration parameters which start with '?'. These will be supplied to the configure(Map) method, but with the initial '?' in the parameter name removed.

If during processing the predicate throws an exception the condition will be treated as not having been matched, the guarded SMT will not be applied and the exception logged. Note this behaviour is independent of the negate parameter.

Consider the following example of a transformation chain with a single conditionally applied SMTThis predicate would negate the result of another predicate, named via an alias. For example, the following will apply the transformation on records which did not start with "my-prefix-":

Code Block
transforms: conditionalExtractt2
transforms.conditionalExtractt2.?type: Iforg.apache.kafka.connect.transforms.predicates.TopicNameMatch
transforms.conditionalExtract.if.typet2.?negate: Nottrue
transforms.conditionalExtract.if.operand: hasPrefix
transforms.conditionalExtract.if.operand.hasPrefix.type: TopicMatches
transforms.conditionalExtract.if.operand.hasPrefix.regex: my-prefix-.*
transforms.conditionalExtract.then: extractInt
transforms.conditionalExtract.then.extractInt.typet2.?pattern: my-prefix-.*
transforms.t2.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.t2.field: c1

The predicate class is org.apache.kafka.connect.predicates.TopicNameMatch. The ExtractField$Key SMT will be applied only to records where the topic name does not  (the negate parameter) start with my-prefix-  (the pattern parameter).

The Filter SMT

A new Filter transformation will be added in the existing org.apache.kafka.connect.transforms package. This will return null from apply(ConnectRecord) when a configured Prediate.test(ConnectRecord) returns true (or false when the negate config is true).

Consider the following example of a transformation chain with a single Filter SMT:

Code Block
transforms: filter
transforms.filter.type: Filter
transforms.filter.condition: org.apache.kafka.connect.transforms.predicates.ExtractField$KeyTopicNameMatch
transforms.conditionalExtract.then.extractInt.field: c1

...

filter.condition.pattern: foo|bar
transforms.filter.negate: false

The predicate class is org.apache.kafka.connect.predicates.TopicNameMatch and it takes a single configuration parameter, pattern. Records having a topic name "foo" or "bar" match the predicate, so apply(ConnectRecord) will return null and therefore those records are filtered out.

If during processing the predicate throws an exception the condition will be treated as not having been matched, the record will be filtered and the exception logged. Note this behaviour is independent of the negate parameter

...

Other config parameters beginning with the if.operand.${alias} alias would be used to configured the negated predicate.

Changes to Transformation

Currently Transformation has a public ConfigDef config() method which means that the ConfigDef cannot depend on the configs which are present in the provided configs. In order to support the then config listing the SMT aliases whose configs are then prefixed by their alias it is necessary to make this interface more flexible. This will be done by adding a default ConfigDef config(Map<String, String> props) method to Transformation . The default implementation will just call the nullary version of the method.The returned ConfigDef would include the configs for the given transformations using the existing ConfigDef.embed() mechanism.. This is not flexible enough for the Filter SMT, which needs to support arbitrary configuration parameters in order to configure the predicate. To support this we will add a validate() method to Transformation.

Code Block
public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable {

    // ... existing methods ...

    /**
     * Validate the transformation configuration values against configuration definitions.
     * @param transformationConfigs the provided configuration values
     * @return List of Config, each Config contains the updated configuration information given
     * the current configuration values.
     */
    default Config validate(Map<String, String> transformationConfigs) {
        ConfigDef configDef = config();
        if (null == configDef) {
            throw new ConnectException(
                String.format("%s.config() must return a ConfigDef that is not null.", this.getClass().getName())
            );
        }
        List<ConfigValue> configValues = configDef.validate(transformationConfigs);
        return new Config(configValues);
    }

}

Compatibility, Deprecation, and Migration Plan

The change to Transformation uses a default method so is backwards compatible with existing implementations.

Rejected Alternatives

Users will need to perform a rolling upgrade of a distributed connect cluster before they can start using the new Filter SMT or conditional SMTs.

The ?-prefixing of configuration parameters does not remove the possibility of a collision with existing connectors, but it's unlikely that any existing connectors are using configuration parameters which start with '?'.

Rejected Alternatives

  • Alternative ways to configure conditional SMTs which removed the possibility of collision with existing connectors were considered. They were more verbose and difficult to understand
  • Adding a new config to all connectors to allow conditionality based on a topic name regex. The new config name could potentially collide with a config used by some existing SMT. It's less flexible (or multiple configs are required to add flexibility). It can be a little awkward to configure conditionality for a whole chain of SMTs.
  • Using an expression language/DSL for specifying the condition. There's no reason to treat the condition differently from the conditionally applied transformations. It's likely if such syntax sugar were desirable for the condition it would also be desirable for configuring the guarded transformations too. If a DSL were to be implemented, it would be better to allow it to configure the whole transformation chain, rather than just conditions.