Status

Current state: Accepted

Discussion thread: here

JIRA: here

Released: 0.11.0.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Connect is designed to handle copying structured data between Kafka and other systems. Handling structured data is important since it is often necessary to perform translations and transformations between systems, and in many cases moving data to/from another system doesn't even make much sense without structure (e.g. copying data from a relational database). This is one of the reasons Kafka Connect is an additional layer on top of the otherwise format-agnostic Apache Kafka (serializers are available in the clients for convenience, but in no way affect the rest of the system, most especially not the brokers).

However, there are cases in Kafka Connect where dealing with the raw data is ideal. A couple of examples are:

To address these use cases where avoiding the cost (CPU, memory, garbage collection overhead), this KIP proposes providing a ByteArrayConverter supporting only a single type to be converted to/from the Kafka Connect data API, much like the ByteArraySerializer/Deserializer already provided for Java clients.

Similar classes are already being implemented by third parties in their own connectors, so providing a standard implementation would avoid unnecessary duplication of effort.

Public Interfaces

The new public interface is a class implementing the Converter interface that works only with byte[] data. The implementation is trivial enough to be included inline in the KIP:

package org.apache.kafka.connect.converters;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.storage.Converter;

import java.util.Map;

/**
 * Pass-through converter for raw byte data.
 */
public class ByteArrayConverter implements Converter {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] fromConnectData(String topic, Schema schema, Object value) {
        if (schema != null && schema.type() != Schema.Type.BYTES)
            throw new DataException("Invalid schema type for ByteArrayConverter: " + schema.type().toString());

        if (value != null && !(value instanceof byte[]))
            throw new DataException("ByteArrayConverter is not compatible with objects of type " + value.getClass());

        return (byte[]) value;
    }

    @Override
    public SchemaAndValue toConnectData(String topic, byte[] value) {
        return new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA, value);
    }

}

 

Proposed Changes

We propose adding the new ByteArrayConverter interface. The implementation is trivial, but there are a few key characteristics worth noting:

Kafka Connect divides its classes into multiple jars; especially for the provided JSON converter this is done to allow removing the converter and all of its dependencies. This Converter will be added directly to the runtime jar since it introduces no dependencies, is broadly useful, and doesn't warrant its own jar.

Compatibility, Deprecation, and Migration Plan

Adding ByteArrayConverter has no implications for existing classes, compatibility, deprecation, or migration.

Test Plan

The general Converter interface is already well tested by Connect unit and system tests. Given its simplicity, a small number of unit tests should sufficiently cover this new class.

Rejected Alternatives

None