You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In many use cases it is required to drop a few specific Kafka headers but a set of headers whose names can also dynamically change (e.g. when used with some end-to-end-encryption libraries, tracing etc.). To prevent those headers following a special pattern (which may or may not not comply with downstream system format) to be further forwarded/processed downstream (e.g.header forwarding in Http Sinks), this KIP proposes to add a wildcard/regexp matching feature to the DropHeaders Transform.

Public Interfaces

The SMT org.apache.kafka.connect.transforms.DropHeaders is to be extended. Depending on the final decision on the implementation, an optional property might be added.

Proposed Changes

to the apply method instead of a set-based contains. Link to the relevant code:
https://github.com/apache/kafka/blob/7b5d640cc656443a078bda096d01910b3edfdb37/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java#L54 

The current implementation stores the header keys in a hash set and performs matching using Set.contains:


@Override
public R apply(R record) {
	Headers updatedHeaders = new ConnectHeaders();
 	for (Header header : record.headers()) {
 		if (!headers.contains(header.key())) {
  			updatedHeaders.add(header);
  		}
  	}
    return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
		record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}

@Override
public void configure(Map<String, ?> props) {
    final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    headers = new HashSet<>(config.getList(HEADERS_FIELD));
}


Two possible alternative implementation options are proposed:

  • Option A - Replacing the existing set-based matching by a configurale regexp matcher:
@Override
public R apply(R record) {
    Headers updatedHeaders = new ConnectHeaders();
    for (Header header : record.headers()) {
        if (!headersMatchAnyPattern(header.key())) {
            updatedHeaders.add(header);
        }
    }
    return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
            record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}

private boolean headersMatchAnyPattern(String key) {
    return headersMatchers.stream().anyMatch(pattern ->
        pattern.matcher(key).matches()
    );
}

@Override
public void configure(Map<String, ?> props) {
    final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    final List<String> headers = config.getList(HEADERS_FIELD);
    headersMatchers = headers.stream().map(entry -> Pattern.compile(entry)).collect(Collectors.toList());
}
  • Option B - Adding configurable a regexp-matcher in addition to be 100% backwards-compatible:
@Override
public R apply(R record) {
    Headers updatedHeaders = new ConnectHeaders();
    for (Header header : record.headers()) {
        if (!toBeDropped(header)) {
            updatedHeaders.add(header);
        }
    }
    return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
            record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}

private boolean toBeDropped(Header header) {
    return headers.contains(header.key()) || headersMatchAnyPattern(header.key());
}

private boolean headersMatchAnyPattern(String key) {
    return headersMatchers.stream().anyMatch(pattern ->
        pattern.matcher(key).matches()
    );
}

@Override
public void configure(Map<String, ?> props) {
    final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    headers = new HashSet<>(config.getList(HEADERS_FIELD));

    final List<String> headerPatternList = config.getList(HEADERS_PATTERN_FIELD);
    headersMatchers = headerPatternList.stream().map(entry -> Pattern.compile(entry)).collect(Collectors.toList());
}

An example implementation was done already for the option replacing the set-based method - link to PR: https://github.com/apache/kafka/pull/14536

Having backward compatibility in mind we should discuss whether to change the existing implementation an replace with a regexp-matcher or to add another configuration specifically for just using regular expressions.

Compatibility, Deprecation, and Migration Plan

If the functionality is added with regexp-matching in addition, no breaking changes. In case the contains-implementation is replaced, there might be breaking changes in rare cases of headers that contain characters which are use in regular expressions. 

Test Plan

Unit-Testing of the SMT with additional tests covering backwards-compatibility.

Rejected Alternatives

None.

  • No labels