Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateUnder Discussionadopted

Discussion thread: TODO here

VOTE: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

kafka.common.MessageReader is a input argument of kafka-console-producer and we expect users can have their custom reader to produce custom records. Hence, MessageReader is a public interface and we should offer a java version to replace current scala code. Also, the new MessageReader should be placed at clients module. (kafka.common.MessageReader is in core module)

Public Interfaces

New org.apache.kafka.tools.

...

api.

...

RecordReader interface and new module tools-api


Code Block
languagejava
/**
 * Typical implementations of this interface convert data from an `InputStream` received via `init``readRecords` into a
 * iterator of `ProducerRecord` instance on each invocation of `readMessage`. Noted that the implementations to have a public nullary constructor.
 *
 * This is used by the `kafka.tools.ConsoleProducer`.
 */
public interface MessageReaderRecordReader extends Closeable, Configurable {

    /**
     * Initialisesread thebyte MessageReader
array from input stream and then *generate @parama inputStreamiterator of producer messagerecord
     * @param propsinputStream Properties to configureof message. the reader
implementation does not need to *close @deprecatedthe Use {@link #configure(InputStream, Map)} instead, this method is for backward compatibility with
     * the older reader interface
     */
    @Deprecated
    default void init(InputStream inputStream, Properties props) {}

    /**input stream.
     * @return a iterator of producer record. It should implement following rules. 1) the hasNext() method must be idempotent.
     * Configures the MessageReader
     * @param2) inputStreamthe ofconvert message
error should be thrown by * @param configs Map to configure the readernext() method.
     */
    default void configure(InputStream inputStream, Map<String, ?> configs) {
        Properties properties = new Properties();
        properties.putAll(configs);
        init(inputStream, properties);Iterator<ProducerRecord<byte[], byte[]>> readRecords(InputStream);


    }

    /**
     * readCloses bytethis arrayreader. fromThis inputmethod streamis andinvoked thenif generatethe aiterator producerfrom record
readRecords either has no more *records @returnor a producer record
     */
    ProducerRecord<byte[], byte[]> readMessage();


throws exception.
    /**
     * Closes this reader
     */
    default void close() {}
}


Proposed Changes

  1. Deprecate kafka.common.MessageReader


Code Block
languagescala
@deprecated("This class ishas been deprecated and will be replaced by removed in 4.0. Please use org.apache.kafka.commontools.MessageReaderapi.RecordReader instead", "23.75.0")
trait MessageReader extends org.apache.kafka.common.


2.  log warning messages to users when using deprecated MessageReader

Compatibility, Deprecation, and Migration Plan

  1. backward compatibility
    kafka.common.MessageReader implementations can keep working without recompilation. 
  2. deprecation
    1. kafka.common.MessageReader is deprecated
    2. the method init(InputStream inputStream, Properties props) is deprecated and replacement is configure(InputStream inputStream, Map<String, ?> configs)
  3. migration plan: users have address following changes to complete code migration
    1. import the new dependency: tools-api
    2. change inheritance from kafkachange interface from kafka.common.MessageReader to orgto org.apache.kafka.client.tools.commonapi.MessageReaderRecordReader
    3. change method signature from init(InputStream inputStream, Properties props) to configure(InputStream inputStream, Map<String, ?> configs)

Rejected Alternatives

...

    1. remove the implementation of readMessage()
    2. implements the readRecords(InputStream) to return Iterator of records

Rejected Alternatives

  1. support the usage of Serializers (from Juma): this support could complicate the configs, since users have to define both MessageReader and serializer. It seems to me the mechanism of MessageReader should include serialization.
  2. move RecordReader to "org.apache.kafka.common": "org.apache.kafka.common" does not allow to import code from "org.apache.kafka.clients.producer". However, the tool-related interface should be able to access producer, consumer and admin code.
  3. move RecordReader to "org.apache.kafka.clients.tool": client module already has many pluggable interfaces. we should follow the package naming. The server-related pluggable interfaces are located at "org.apache.kafka.server", and thus tools-related interface should be located at "org.apache.kafka.tools"
  4. new RecordReader implements Configurable: Implementing Configurable will change the arguments of configure method from (InputStream, configs) to (configs). That obstructs RecordReader from keeping input stream itself as some of that state.
  5. configurable(InputStream, configs) - diverges from the Configurable interface and it is strange to pass an InputStream to a configure method.
  6. move RecordReader to "org.apache.kafka.tools" (client module): The tools module has same package so we should not create the same package on another module to avoid split package (https://www.logicbig.com/tutorials/core-java-tutorial/modules/split-packages.html)