...
In many use cases it is required to drop a few specific Kafka headers but a set of headers whose names can also dynamically change (e.g. when used with some end-to-end-encryption libraries, tracing etc.). To prevent those headers following a special pattern (which for example may or may not not comply with downstream system format) to be further forwarded/processed downstream (e.g.header forwarding in Http HTTP Sinks), this KIP proposes to add a wildcard/regexp matching feature to the DropHeaders Transform.
Public Interfaces
The It is proposed to add new configuration is to the SMT org.apache.kafka.connect.transforms.DropHeaders is to be extended. Depending on the final decision on the implementation, an optional property might be added.
Proposed Changes
to the apply method instead of a set-based contains. Link to the relevant code:
https://github.com/apache/kafka/blob/7b5d640cc656443a078bda096d01910b3edfdb37/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java#L54
The current implementation stores the header keys in a hash set and performs matching using Set.contains:
Code Block | ||
---|---|---|
| ||
@Override
public R apply(R record) {
Headers updatedHeaders = new ConnectHeaders();
for (Header header : record.headers()) {
if (!headers.contains(header.key())) {
updatedHeaders.add(header);
}
}
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
headers = new HashSet<>(config.getList(HEADERS_FIELD));
} |
Two possible alternative implementation options are proposed:
- Option A - Replacing the existing set-based matching by a configurale regexp matcher:
Code Block | ||
---|---|---|
| ||
@Override
public R apply(R record) {
Headers updatedHeaders = new ConnectHeaders();
for (Header header : record.headers()) {
if (!headersMatchAnyPattern(header.key())) {
updatedHeaders.add(header);
}
}
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}
private boolean headersMatchAnyPattern(String key) {
return headersMatchers.stream().anyMatch(pattern ->
pattern.matcher(key).matches()
);
}
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
final List<String> headers = config.getList(HEADERS_FIELD);
headersMatchers = headers.stream().map(entry -> Pattern.compile(entry)).collect(Collectors.toList());
} |
- Option B - Adding configurable a regexp-matcher in addition to be 100% backwards-compatible:
Code Block | ||
---|---|---|
| ||
@Override
public R apply(R record) {
Headers updatedHeaders = new ConnectHeaders();
for (Header header : record.headers()) {
if (!toBeDropped(header)) {
updatedHeaders.add(header);
}
}
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}
private boolean toBeDropped(Header header) {
return headers.contains(header.key()) || headersMatchAnyPattern(header.key());
}
private boolean headersMatchAnyPattern(String key) {
return headersMatchers.stream().anyMatch(pattern ->
pattern.matcher(key).matches()
);
}
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
headers = new HashSet<>(config.getList(HEADERS_FIELD));
final List<String> headerPatternList = config.getList(HEADERS_PATTERN_FIELD);
headersMatchers = headerPatternList.stream().map(entry -> Pattern.compile(entry)).collect(Collectors.toList());
} |
An example implementation was done already for the option replacing the set-based method - link to PR: https://github.com/apache/kafka/pull/14536
to allow regexp-based matching of headers to be dropped:
Name: headers.patterns
Description:List of regular expressions to match of the headers to be removed.
Type: String (List)
Default: "$^" (match empty)
Proposed Changes
It is proposed to use regexp-matchers inside the apply method in addition to a set-based contains (existing implementation)Having backward compatibility in mind we should discuss whether to change the existing implementation an replace with a regexp-matcher or to add another configuration specifically for just using regular expressions.
Compatibility, Deprecation, and Migration Plan
If the functionality is added with regexp-matching in addition to the existing implementation, no breaking changes. In case the contains-implementation is replaced, there might be breaking changes in rare cases of headers that contain characters which are use in regular expressions.
Test Plan
Unit-Testing of the SMT with additional tests covering backwards-compatibility.will be done, no migration is required as the additional feature comes with a standalone configuration option
Rejected Alternatives
None.