Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The SMT org.apache.kafka.connect.transforms.DropHeaders method apply is proposed to be extended . Depending on the final decision on the implementation, an optional property might be addedby the new functionality. The existing interface will not be changed.

Proposed Changes

to the apply method instead of a set-based contains. Link to the relevant code:
https://github.com/apache/kafka/blob/7b5d640cc656443a078bda096d01910b3edfdb37/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java#L54 

...

Code Block
languagejava
@Override
public R apply(R record) {
    Headers updatedHeaders = new ConnectHeaders();
    for (Header header : record.headers()) {
        if (!toBeDropped(header)) {
            updatedHeaders.add(header);
        }
    }
    return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
            record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}

private boolean toBeDropped(Header header) {
    return headers.contains(header.key()) || headersMatchAnyPattern(header.key());
}

private boolean headersMatchAnyPattern(String key) {
    return headersMatchers.stream().anyMatch(pattern ->
        pattern.matcher(key).matches()
    );
}

@Override
public void configure(Map<String, ?> props) {
    final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    headers = new HashSet<>(config.getList(HEADERS_FIELD));

    final List<String> headerPatternList = config.getList(HEADERS_PATTERN_FIELD);
    headersMatchers = headerPatternList.stream().map(entry -> Pattern.compile(entry)).collect(Collectors.toList());
}

An example implementation was done already for the option replacing the set-based method - link to PR: https://github.com/apache/kafka/pull/14536

...


Compatibility, Deprecation, and Migration Plan

If the functionality is added with regexp-matching in addition (Option B), no breaking changes need to be done. In case the contains-implementation is replaced (Option A), there might be breaking changes in rare cases of headers that contain characters which are use in regular expressions. 

Test Plan

which can be interpreted as Java regular expressions like e.g. "headers.*".

In case of Option B, no migration needs to be done as the additional feature comes with a standalone configuration option. In case of Option A only changes in configuration are required in case the configured header-names contain characters which can be interpreted as Java regular expressionsUnit-Testing of the SMT with additional tests covering backwards-compatibility.

Rejected Alternatives

None.