Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Comments

...

Code Block
languagejava
linenumberstrue
/**
* A condition on ConnectRecords.
* Implementations of this interface can be used for filtering records and conditionally applying Transformations.
* Implementations must be public and have a public constructor with no parameters.
*/
public Predicate<R extends ConnectRecord<R>> extends Configurable, CloseableAutoCloseable {

    /** 
     * Configuration specification for this predicate. 
     */
    ConfigDef config();

    /**
     * Returns whether the given record satisfies this predicate.
     */
    boolean test(R record);

    @Override
    void close();
}

All transformations will gain new implict implicit configuration parameters which will be consumed by the connect runtime and not passed to the Transformation.configure() method. These parameters will start with a question mark (?) so it's extremely unlikely they will collide with configuration parameters used by existing connectors.

A new Filter SMT will be added to enable record filtering.

...

The Predicate interface is described above. The interface will be a worker pluginplug-in, loaded in the same way as other worker plugins plug-ins such as converters, connectors, and REST extensions. This would includes include aliasing behavior allowings behaviour allowing users to specify predicates using their simple class names as long as no two predicate plugins plug-ins with the same simple name are available on the worker.

In order to apply a transformation conditionally, all transformations will implicitly support  a String predicate configuration parameter, which names a particular predicate.

To negate the result of a predicate all predicates , all transformations will implicitly support a boolean negate configuration parameter, which defaults to false.

...

When a Transformation is configured with the new ?new predicate parameter its application will happen conditionally. The value of the ?the predicate parameter will be the name of a predicate defined under the ?the predicates prefix. Configuration for the predicate will come from all other configuration parameters starting with the same ?same predicates... analogous to how the transformations in a transformation chain are configured. These will be supplied to the the Predicate.configure(Map) method, but with the initial '?' in the parameter name removed.

If during processing the predicate throws an exception this will be handled in the same way as errors in transformations.

Consider the following example of a transformation chain with a single conditionally applied ExtractField$Key SMT:

Code Block
transforms: =t2
transforms.t2?.predicate: =has-my-prefix
transforms.t2?.negate: =true
transforms.t2.type: =org.apache.kafka.connect.transforms.ExtractField$Key
transforms.t2.field: =c1
?predicates: =has-my-prefix
?predicates.has-my-prefix.type: =org.apache.kafka.connect.predicates.TopicNameMatch
?predicates.has-my-prefix.pattern: =my-prefix-.*

The transform t2 is only evaluated when the predicate has-my-prefix is false (the negate parameter). That predicate is configured by the keys with prefix ?predicates.has-my-prefix. The predicate class is org.apache.kafka.connect.predicates.TopicNameMatch and it's pattern parameter has the value my-prefix-.* . Thus the SMT will be applied only to records where the topic name does not  start with my-prefix-.

...

Consider the following example of a transformation chain with a single Filter SMT:

Code Block
transforms: =filter
transforms.filter.type: =org.apache.kafka.connect.transforms.Filter
transforms.filter?.predicate: =foo-or-bar
?predicates: =foo-or-bar
?predicates.foo-or-bar.type: =org.apache.kafka.connect.transforms.predicates.TopicNameMatch
?predicates.foo-or-bar.pattern: =foo|bar

The predicate class is org.apache.kafka.connect.predicates.TopicNameMatch and it takes a single configuration parameter, pattern. Records having a topic name "foo" or "bar" match the predicate, so the filter SMT will be evaluated, will return null and therefore those records are filtered out.

...

Users will need to perform a rolling upgrade of a distributed connect cluster before they can start using the new Filter SMT or conditional SMTs.

The ?-prefixing of configuration parameters does not remove the possibility of a collision with existing connectors, but it's unlikely that any existing connectors are using configuration parameters which start with '?'Adding the new implicit predicate and negate parameters to transformations means that any existing transformation which already took config parameters of these names would not be configurable (i.e. the implicit parameters will mask the transformation parameters of the same name). Similarly existing connectors might have a configuration parameters prefixed by predicates, which would be masked by the new top-level parameter. The analogous situation arose when support for SMTs was originally added in KIP-66.

Rejected Alternatives

  • Alternative Numerous alternative ways to configure conditional SMTs which reduced or removed the possibility of collision with existing connectors were considered. They were more verbose and difficult to understand.