Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateUnder Discussionadopted

Discussion threadhere

VOTE: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

kafka.common.MessageReader is a input argument of kafka-console-producer and we expect users can have their custom reader to produce custom records. Hence, MessageReader is a public interface and we should offer a java version to replace current scala code. Also, the new MessageReader should be placed at clients module. (kafka.common.MessageReader is in core module)

Public Interfaces

New org.apache.kafka.

...

tools.

...

api.RecordReader interface and new module tools-api


Code Block
languagejava
/**
 * Typical implementations of this interface convert data from an `InputStream` received via `configure``readRecords` into a
 * iterator of `ProducerRecord` instance on each invocation of `readRecord`. Noted that the implementations to have a public
 * nullary constructor.
 *
 * This is used by the `kafka.tools.ConsoleProducer`.
 */
public interface RecordReader extends Closeable, Configurable {
    /**
     * read byte array from input stream and then generate a iterator of producer record
     * @param inputStream of message. the implementation does not need to close the input stream.
     * @return a iterator of producer record. It should implement following rules. 1) the hasNext() method must be idempotent.
     *         2) the convert error should be thrown by next() method.
     */
    ProducerRecord<byteIterator<ProducerRecord<byte[], byte[]>>> readRecordreadRecords(InputStream inputStream);


    /**
     * Closes this reader. This method is invoked if the iterator from readRecords either has no more records or throws exception.
     */
    default void close() {}
}

...

Code Block
languagescala
@deprecated("This class has been deprecated and will be removed in 4.0. Please use org.apache.kafka.tools.commonapi.RecordReader instead", "3.5.0")
trait MessageReader

...

  1. backward compatibility
    kafka.common.MessageReader implementations can keep working without recompilation. 
  2. deprecation
    1. kafka.common.MessageReader is deprecated
    2. the method init(InputStream inputStream, Properties props) is deprecated and replacement is configure(InputStream inputStream, Map<String, ?> configs)
  3. migration plan: users have address following changes to complete code migration
    1. import the new dependency: tools-api
    2. change inheritance from kafka
    3. change inheritance from kafka.common.MessageReader to orgto org.apache.kafka.commonclient.tools.api.RecordReader
    4. change method signature from init(InputStream inputStream, Properties props) to configure(InputStream inputStream, Map<String, ?> configs)change method signature from readMessage to readRecord)
    5. remove the implementation of readMessage()
    6. implements the readRecords(InputStream) to return Iterator of records

Rejected Alternatives

  1. support the usage of Serializers (from Juma): this support could complicate the configs, since users have to define both MessageReader and serializer. It seems to me the mechanism of MessageReader should include serialization.
  2. move RecordReader to "org.apache.kafka.common": "org.apache.kafka.common" does not allow to import code from "org.apache.kafka.clients.producer". However, the tool-related interface should be able to access producer, consumer and admin code.
  3. move RecordReader to "org.apache.kafka.clients.tool": client module already has many pluggable interfaces. we should follow the package naming. The server-related pluggable interfaces are located at "org.apache.kafka.server", and thus tools-related interface should be located at "org.apache.kafka.tools"
  4. new RecordReader implements Configurable: Implementing Configurable will change the arguments of configure method from (InputStream, configs) to (configs). That obstructs RecordReader from keeping input stream itself as some of that state.
  5. configurable(InputStream, configs) - diverges from the Configurable interface and it is strange to pass an InputStream to a configure method.
  6. move RecordReader to "org.apache.kafka.tools" (client module): The tools module has same package so we should not create the same package on another module to avoid split package (https://www.logicbig.com/tutorials/core-java-tutorial/modules/split-packages.html)