...
The SMT org.apache.kafka.connect.transforms.DropHeaders method apply is proposed to be extended . Depending on the final decision on the implementation, an optional property might be addedby the new functionality. The existing interface will not be changed.
Proposed Changes
to the apply method instead of a set-based contains. Link to the relevant code:
https://github.com/apache/kafka/blob/7b5d640cc656443a078bda096d01910b3edfdb37/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java#L54
...
Code Block | ||
---|---|---|
| ||
@Override public R apply(R record) { Headers updatedHeaders = new ConnectHeaders(); for (Header header : record.headers()) { if (!toBeDropped(header)) { updatedHeaders.add(header); } } return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp(), updatedHeaders); } private boolean toBeDropped(Header header) { return headers.contains(header.key()) || headersMatchAnyPattern(header.key()); } private boolean headersMatchAnyPattern(String key) { return headersMatchers.stream().anyMatch(pattern -> pattern.matcher(key).matches() ); } @Override public void configure(Map<String, ?> props) { final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); headers = new HashSet<>(config.getList(HEADERS_FIELD)); final List<String> headerPatternList = config.getList(HEADERS_PATTERN_FIELD); headersMatchers = headerPatternList.stream().map(entry -> Pattern.compile(entry)).collect(Collectors.toList()); } |
An example implementation was done already for the option replacing the set-based method - link to PR: https://github.com/apache/kafka/pull/14536
...
Compatibility, Deprecation, and Migration Plan
If the functionality is added with regexp-matching in addition (Option B), no breaking changes need to be done. In case the contains-implementation is replaced (Option A), there might be breaking changes in rare cases of headers that contain characters which are use in regular expressions.
Test Plan
which can be interpreted as Java regular expressions like e.g. "headers.*".
In case of Option B, no migration needs to be done as the additional feature comes with a standalone configuration option. In case of Option A only changes in configuration are required in case the configured header-names contain characters which can be interpreted as Java regular expressionsUnit-Testing of the SMT with additional tests covering backwards-compatibility.
Rejected Alternatives
None.