Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

kafka.common.MessageReader is a input argument of kafka-console-producer and we expect users can have their custom reader to produce custom records. Hence, MessageReader is a public interface and we should offer a java version to replace current scala code. Also, the new MessageReader should be placed at clients module. (kafka.common.MessageReader is in core module)

Public Interfaces

New org.apache.kafka.common.

...

RecordReader interface


Code Block
languagejava
/**
 * Typical implementations of this interface convert data from an `InputStream` received via `init` into a
 * `ProducerRecord` instance on each invocation of `readMessage``readRecord`.
 *
 * This is used by the `kafka.tools.ConsoleProducer`.
 */
public interface MessageReaderRecordReader extends Closeable {

    /**
     * Initialises the MessageReader
     * @param inputStream of message
     * @param props Properties to configure the reader
     * @deprecated Use {@link #configure(InputStream, Map)} instead, this method is for backward compatibility with
     * the older reader interface
     */
    @Deprecated
    default void init(InputStream inputStream, Properties props) {}

    /**
     * Configures the MessageReaderRecordReader
     * @param inputStream of message
     * @param configs Map to configure the reader
     */
    default void configure(InputStream inputStream, Map<String, ?> configs) {
        Properties properties = new Properties();
        properties.putAll(configs);
        init(inputStream, properties);
    }

    /**
     * read byte array from input stream and then generate a producer record
     * @return a producer record
     */
    ProducerRecord<byte[], byte[]> readMessagereadRecord();


    /**
     * Closes this reader
     */
    default void close() {}
}

...

Code Block
languagescala
@deprecated("This class is deprecated and will be replaced by org.apache.kafka.common.MessageReader.", "2.7.0")
trait MessageReader extends org.apache.kafka.common.MessageReader


Compatibility, Deprecation, and Migration Plan

  1. backward compatibility
    kafka.common.MessageReader implementations can keep working without recompilation. 
  2. deprecation
    1. kafka.common.MessageReader is deprecated
    2. the method init(InputStream inputStream, Properties props) is deprecated and replacement is configure(InputStream inputStream, Map<String, ?> configs)
  3. migration plan: users have address following changes to complete code migration
    1. change inheritance from kafka.common.MessageReader to org.apache.kafka.common.MessageReaderRecordReader
    2. change method signature from init(InputStream inputStream, Properties props) to configure(InputStream inputStream, Map<String, ?> configs)
    3. change method signature from readMessage to readRecord

Rejected Alternatives

...

  1. support the usage of Serializers (from Juma): this support could complicate the configs, since users have to define both MessageReader and serializer. It seems to me the mechanism of MessageReader should include serialization.