This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
kafka.common.MessageReader is a input argument of kafka-console-producer and we expect users can have their custom reader to produce custom records. Hence, MessageReader is a public interface and we should offer a java version to replace current scala code. Also, the new MessageReader should be placed at clients module. (kafka.common.MessageReader is in core module)
Public Interfaces
New org.apache.kafka.common.RecordReader interface
/** * Typical implementations of this interface convert data from an `InputStream` received via `init` into a * `ProducerRecord` instance on each invocation of `readRecord`. * * This is used by the `kafka.tools.ConsoleProducer`. */ public interface RecordReader extends Closeable { /** * Configures the RecordReader * @param inputStream of message * @param configs Map to configure the reader */ void configure(InputStream inputStream, Map<String, ?> configs); /** * read byte array from input stream and then generate a producer record * @return a producer record */ ProducerRecord<byte[], byte[]> readRecord(); /** * Closes this reader */ default void close() {} }
Proposed Changes
Deprecate kafka.common.MessageReader
@deprecated("This class is deprecated and will be replaced by org.apache.kafka.common.RecordReader.", "3.5.0") trait MessageReader
Compatibility, Deprecation, and Migration Plan
- backward compatibility
kafka.common.MessageReader implementations can keep working without recompilation. - deprecation
- kafka.common.MessageReader is deprecated
- the method init(InputStream inputStream, Properties props) is deprecated and replacement is configure(InputStream inputStream, Map<String, ?> configs)
- migration plan: users have address following changes to complete code migration
- change inheritance from kafka.common.MessageReader to org.apache.kafka.common.RecordReader
- change method signature from init(InputStream inputStream, Properties props) to configure(InputStream inputStream, Map<String, ?> configs)
- change method signature from readMessage to readRecord
Rejected Alternatives
- support the usage of Serializers (from Juma): this support could complicate the configs, since users have to define both MessageReader and serializer. It seems to me the mechanism of MessageReader should include serialization.