Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Changed Filter

...

Code Block
languagejava
linenumberstrue
/**
* A condition on ConnectRecords.
* Implementations of this interface can be used for filtering records and conditionally applying Transformations.
* Implementations must be public and have a public constructor with no parameters.
*/
public Predicate<R extends ConnectRecord<R>> implements Configurable, Closeable {

    /** 
     * Configuration specification for this predicate. 
     */
    ConfigDef config();

    /**
     * Validate the predicate configuration values against configuration definitions.
     * @param predicateConfigs the provided configuration values
     * @return List of Config, each Config contains the updated configuration information given
     * the current configuration values.
     */
    default Config validate(Map<String, String> predicateConfigs) {
        ConfigDef configDef = config();
        if (null == configDef) {
            throw new ConnectException(
                String.format("%s.config() must return a ConfigDef that is not null.", this.getClass().getName())
            );
        }
        List<ConfigValue> configValues = configDef.validate(predicateConfigs);
        return new Config(configValues);
    }

    /**
     * Returns whether the given record satisfies this predicate.
     */
    boolean test(R record);

    @Override
    void close();
}

...

A new Filter transformation will be added in the existing org.apache.kafka.connect.transforms package. This will return null from apply(ConnectRecord) when a configured Prediate.test(ConnectRecord) returns true (or false when the negate config is true). This is not of much use on its own, but is intended to applied conditionally as described above. This will allow messages to be filtered according to the predicate.

Consider the following example of a transformation chain with a single Filter SMT:

Code Block
transforms: filter
transforms.filter.type: Filter
transforms.filter.condition?type: org.apache.kafka.connect.transforms.predicates.TopicNameMatch
transforms.filter.condition.?pattern: foo|bar
transforms.filter.?negate: false

The predicate class is org.apache.kafka.connect.predicates.TopicNameMatch and it takes a single configuration parameter, pattern. Records having a topic name "foo" or "bar" match the predicate, so apply(ConnectRecord) so the filter SMT will be evaluated, will return null and therefore those records are filtered out.

If during processing the predicate throws an exception this will be handled in the same way as errors in transformations.

Changes to Transformation

Currently Transformation has a public ConfigDef config(). This is not flexible enough for the Filter SMT, which needs to support arbitrary configuration parameters in order to configure the predicate. To support this we will add a validate() method to Transformation.

...

Compatibility, Deprecation, and Migration Plan

...