Table of Contents |
---|
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
Discussion thread: here
JIRA:here
Jira | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In many use cases you might not only want it is required to drop a few specific Kafka headers but a set of headers whose names can also dynamically change (e.g. when used with some end-to-end-encryption libraries, tracing etc.). To prevent those headers following a special pattern (which may or may not not comply with downstream system format) to be further forwarded/processed downstream (e.g.header forwarding in Http Sinks), I suggest this KIP proposes to add a wildcard/regexp matching feature to the DropHeaders Transform.
Public Interfaces
The SMT org.apache.kafka.connect.transforms.DropHeaders is to be extended. Depending on the final decision on the implementation, an optional property might be added.
Proposed Changes
to the apply method instead of a set-based contains. Link to the relevant code:
https://github.com/apache/kafka/blob/7b5d640cc656443a078bda096d01910b3edfdb37/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java#L54
One common example is forwarding of encryption-headers that contain byte arrays to a HTTP server via HTTP Sinks which will not be accepted by the HTTP server.
Public Interfaces
The SMT org.apache.kafka.connect.transforms.DropHeaders is to be extended. Depending on the final decision on the implementation, an optional property might be added.
Proposed Changes
...
The current implementation stores the header keys in a hash set and performs matching using Set.contains:
Code Block | ||
---|---|---|
| ||
@Override
public R apply(R record) {
Headers updatedHeaders = new ConnectHeaders();
for (Header header : record.headers()) {
if (!headers.contains(header.key())) {
updatedHeaders.add(header);
}
}
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
headers = new HashSet<>(config.getList(HEADERS_FIELD));
} |
Two possible alternative implementation options are proposed:
- Option A - Replacing the existing set-based matching by a configurale regexp matcher:
Code Block | ||
---|---|---|
| ||
@Override
public R apply(R record) {
Headers updatedHeaders = new ConnectHeaders();
for (Header header : record.headers()) {
if (!headersMatchAnyPattern(header.key())) {
updatedHeaders.add(header);
}
}
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}
private boolean headersMatchAnyPattern(String key) {
return headersMatchers.stream().anyMatch(pattern ->
pattern.matcher(key).matches()
);
}
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
final List<String> headers = config.getList(HEADERS_FIELD);
headersMatchers = headers.stream().map(entry -> Pattern.compile(entry)).collect(Collectors.toList());
} |
- Option B - Adding configurable a regexp-matcher in addition to be 100% backwards-compatible:
Code Block | ||
---|---|---|
| ||
@Override
public R apply(R record) {
Headers updatedHeaders = new ConnectHeaders();
for (Header header : record.headers()) {
if (!toBeDropped(header)) {
updatedHeaders.add(header);
}
}
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}
private boolean toBeDropped(Header header) {
return headers.contains(header.key()) || headersMatchAnyPattern(header.key());
}
private boolean headersMatchAnyPattern(String key) {
return headersMatchers.stream().anyMatch(pattern ->
pattern.matcher(key).matches()
);
}
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
headers = new HashSet<>(config.getList(HEADERS_FIELD));
final List<String> headerPatternList = config.getList(HEADERS_PATTERN_FIELD);
headersMatchers = headerPatternList.stream().map(entry -> Pattern.compile(entry)).collect(Collectors.toList());
} |
An example implementation was done already for the option replacing the set-based method - link to PR: https://github.com/apache/kafka/pull/14536
...