This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
Discussion thread: TODO
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
kafka.common.MessageReader is a input argument of kafka-console-producer and we expect users can have their custom reader to produce custom records. Hence, MessageReader is a public interface and we should offer a java version to replace current scala code. Also, the new MessageReader should be placed at clients module. (kafka.common.MessageReader is in core module)
Public Interfaces
New org.apache.kafka.common.MessageReader interface
/** * Typical implementations of this interface convert data from an `InputStream` received via `init` into a * `ProducerRecord` instance on each invocation of `readMessage`. * * This is used by the `kafka.tools.ConsoleProducer`. */ public interface MessageReader extends Closeable { /** * Initialises the MessageReader * @param inputStream of message * @param props Properties to configure the reader * @deprecated Use {@link #configure(InputStream, Map)} instead, this method is for backward compatibility with * the older reader interface */ @Deprecated default void init(InputStream inputStream, Properties props) {} /** * Configures the MessageReader * @param inputStream of message * @param configs Map to configure the reader */ default void configure(InputStream inputStream, Map<String, ?> configs) { Properties properties = new Properties(); properties.putAll(configs); init(inputStream, properties); } /** * read byte array from input stream and then generate a producer record * @return a producer record */ ProducerRecord<byte[], byte[]> readMessage(); /** * Closes this reader */ default void close() {} }
Proposed Changes
Deprecate kafka.common.MessageReader
@deprecated("This class is deprecated and will be replaced by org.apache.kafka.common.MessageReader.", "2.7.0") trait MessageReader extends org.apache.kafka.common.MessageReader
Compatibility, Deprecation, and Migration Plan
- backward compatibility
kafka.common.MessageReader implementations can keep working without recompilation. - deprecation
- kafka.common.MessageReader is deprecated
- the method init(InputStream inputStream, Properties props) is deprecated and replacement is configure(InputStream inputStream, Map<String, ?> configs)
- migration plan: users have address following changes to complete code migration
- change inheritance from kafka.common.MessageReader to org.apache.kafka.common.MessageReader
- change method signature from init(InputStream inputStream, Properties props) to configure(InputStream inputStream, Map<String, ?> configs)
Rejected Alternatives
N/A