Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion thread: here


JIRA:here 

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-15597

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In many use cases you might not only want it is required to drop a few specific Kafka headers but a set of headers whose names can also dynamically change (e.g. when used with some end-to-end-encryption libraries, tracing etc.). To prevent those headers following a special pattern (which may or may not not comply with downstream system format) to be further forwarded/processed downstream (e.g.header forwarding in Http Sinks), I suggest this KIP proposes to add a wildcard/regexp matching feature to the DropHeaders Transform.

Public Interfaces

The SMT org.apache.kafka.connect.transforms.DropHeaders is to be extended. Depending on the final decision on the implementation, an optional property might be added.

Proposed Changes

to the apply method instead of a set-based contains. Link to the relevant code:
https://github.com/apache/kafka/blob/7b5d640cc656443a078bda096d01910b3edfdb37/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java#L54 

One common example is forwarding of encryption-headers that contain byte arrays to a HTTP server via HTTP Sinks which will not be accepted by the HTTP server.

Public Interfaces

The SMT org.apache.kafka.connect.transforms.DropHeaders is to be extended. Depending on the final decision on the implementation, an optional property might be added.

Proposed Changes

...

The current implementation stores the header keys in a hash set and performs matching using Set.contains:


Code Block
languagejava
@Override
public R apply(R record) {
	Headers updatedHeaders = new ConnectHeaders();
 	for (Header header : record.headers()) {
 		if (!headers.contains(header.key())) {
  			updatedHeaders.add(header);
  		}
  	}
    return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
		record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}

@Override
public void configure(Map<String, ?> props) {
    final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    headers = new HashSet<>(config.getList(HEADERS_FIELD));
}


Two possible alternative implementation options are proposed:

  • Option A - Replacing the existing set-based matching by a configurale regexp matcher:
Code Block
languagejava
@Override
public R apply(R record) {
    Headers updatedHeaders = new ConnectHeaders();
    for (Header header : record.headers()) {
        if (!headersMatchAnyPattern(header.key())) {
            updatedHeaders.add(header);
        }
    }
    return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
            record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}

private boolean headersMatchAnyPattern(String key) {
    return headersMatchers.stream().anyMatch(pattern ->
        pattern.matcher(key).matches()
    );
}

@Override
public void configure(Map<String, ?> props) {
    final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    final List<String> headers = config.getList(HEADERS_FIELD);
    headersMatchers = headers.stream().map(entry -> Pattern.compile(entry)).collect(Collectors.toList());
}


  • Option B - Adding configurable a regexp-matcher in addition to be 100% backwards-compatible:
Code Block
languagejava
@Override
public R apply(R record) {
    Headers updatedHeaders = new ConnectHeaders();
    for (Header header : record.headers()) {
        if (!toBeDropped(header)) {
            updatedHeaders.add(header);
        }
    }
    return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
            record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}

private boolean toBeDropped(Header header) {
    return headers.contains(header.key()) || headersMatchAnyPattern(header.key());
}

private boolean headersMatchAnyPattern(String key) {
    return headersMatchers.stream().anyMatch(pattern ->
        pattern.matcher(key).matches()
    );
}

@Override
public void configure(Map<String, ?> props) {
    final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    headers = new HashSet<>(config.getList(HEADERS_FIELD));

    final List<String> headerPatternList = config.getList(HEADERS_PATTERN_FIELD);
    headersMatchers = headerPatternList.stream().map(entry -> Pattern.compile(entry)).collect(Collectors.toList());
}


An example implementation was done already for the option replacing the set-based method - link to PR: https://github.com/apache/kafka/pull/14536

...