Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In many use cases it is required to drop a few specific Kafka headers but a set of headers whose names can also dynamically change (e.g. when used with some end-to-end-encryption libraries, tracing etc.). To prevent those headers following a special pattern (which for example may or may not comply with downstream system format) to be further forwarded/processed downstream (e.g.header forwarding in HTTP Sinks), this KIP proposes to add a wildcard/regexp matching feature to the DropHeaders Transform.

Public Interfaces

The It is proposed to add new configuration is to the SMT org.apache.kafka.connect.transforms.DropHeaders method apply is proposed to be extended by the new functionality. The existing interface will not be changed. to allow regexp-based matching of heades to be dropped:

Name: headers.pattern

Description:List of regular expressions to match of the headers to be removed.Type: boolean

Default: "$^" (match empty)

Proposed Changes

It is proposed to use regexp-matchers inside the apply method instead of or in addition to a set-based contains (existing implementation). Link to the relevant code lines:
https://github.com/apache/kafka/blob/7b5d640cc656443a078bda096d01910b3edfdb37/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java#L54 

The current implementation stores the header keys in a hash set and performs matching using Set.contains:

...

languagejava

...

)

...

.

...

Two possible alternative implementation options are proposed:

  • Option A - Replacing the existing set-based matching by a configurale regexp matcher:
Code Block
languagejava
@Override
public R apply(R record) {
    Headers updatedHeaders = new ConnectHeaders();
    for (Header header : record.headers()) {
        if (!headersMatchAnyPattern(header.key())) {
            updatedHeaders.add(header);
        }
    }
    return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
            record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}

private boolean headersMatchAnyPattern(String key) {
    return headersMatchers.stream().anyMatch(pattern ->
        pattern.matcher(key).matches()
    );
}

@Override
public void configure(Map<String, ?> props) {
    final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    final List<String> headers = config.getList(HEADERS_FIELD);
    headersMatchers = headers.stream().map(entry -> Pattern.compile(entry)).collect(Collectors.toList());
}
  • Option B - Adding configurable a regexp-matcher in addition to be 100% backwards-compatible:
Code Block
languagejava
@Override
public R apply(R record) {
    Headers updatedHeaders = new ConnectHeaders();
    for (Header header : record.headers()) {
        if (!toBeDropped(header)) {
            updatedHeaders.add(header);
        }
    }
    return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
            record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}

private boolean toBeDropped(Header header) {
    return headers.contains(header.key()) || headersMatchAnyPattern(header.key());
}

private boolean headersMatchAnyPattern(String key) {
    return headersMatchers.stream().anyMatch(pattern ->
        pattern.matcher(key).matches()
    );
}

@Override
public void configure(Map<String, ?> props) {
    final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    headers = new HashSet<>(config.getList(HEADERS_FIELD));

    final List<String> headerPatternList = config.getList(HEADERS_PATTERN_FIELD);
    headersMatchers = headerPatternList.stream().map(entry -> Pattern.compile(entry)).collect(Collectors.toList());
}

Compatibility, Deprecation, and Migration Plan

If the functionality is added with regexp-matching in addition to the existing implementation (Option B), no breaking will be done, no migration is required as the additional feature comes with a standalone configuration optionIn case the contains-implementation is completely replaced (Option A), there are possible breaking changes for the case of headers that contain characters which are which can be interpreted as Java regular expressions like e.g. "headers.*". In that case, the configurations need to be adapted before upgrading such that special characters are escaped and not interpreted as a regular expression.

Rejected Alternatives

None.