...
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussionadopted
Discussion thread: TODO here
VOTE: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
kafka.common.MessageReader is a input argument of kafka-console-producer and we expect users can have their custom reader to produce custom records. Hence, MessageReader is a public interface and we should offer a java version to replace current scala code. Also, the new MessageReader should be placed at clients module. (kafka.common.MessageReader is in core module)
Public Interfaces
New org.apache.kafka.tools.
...
api.
...
RecordReader interface and new module tools-api
Code Block | ||
---|---|---|
| ||
/** * Typical implementations of this interface convert data from an `InputStream` received via `init``readRecords` into a * iterator of `ProducerRecord` instance on each invocation of `readMessage`. Noted that the implementations to have a public nullary constructor. * * This is used by the `kafka.tools.ConsoleProducer`. */ public interface MessageReaderRecordReader extends Closeable, Configurable { /** * Initialisesread thebyte MessageReader array from input stream and then *generate @parama inputStreamiterator of producer messagerecord * @param propsinputStream Properties to configureof message. the reader implementation does not need to *close @deprecatedthe Use {@link #configure(InputStream, Map)} instead, this method is for backward compatibility with * the older reader interface */ @Deprecated default void init(InputStream inputStream, Properties props) {} /**input stream. * @return a iterator of producer record. It should implement following rules. 1) the hasNext() method must be idempotent. * Configures the MessageReader * @param2) inputStreamthe ofconvert message error should be thrown by * @param configs Map to configure the readernext() method. */ default void configure(InputStream inputStream, Map<String, ?> configs) { Properties properties = new Properties(); properties.putAll(configs); init(inputStream, properties);Iterator<ProducerRecord<byte[], byte[]>> readRecords(InputStream); } /** * readCloses bytethis arrayreader. fromThis inputmethod streamis andinvoked thenif generatethe aiterator producerfrom record readRecords either has no more *records @returnor a producer record */ ProducerRecord<byte[], byte[]> readMessage(); throws exception. /** * Closes this reader */ default void close() {} } |
Proposed Changes
- Deprecate kafka.common.MessageReader
Code Block | ||
---|---|---|
| ||
@deprecated("This class ishas been deprecated and will be replaced by removed in 4.0. Please use org.apache.kafka.commontools.MessageReaderapi.RecordReader instead", "23.75.0") trait MessageReader extends org.apache.kafka.common. |
2. log warning messages to users when using deprecated MessageReader
Compatibility, Deprecation, and Migration Plan
- backward compatibility
kafka.common.MessageReader implementations can keep working without recompilation. - deprecation
- kafka.common.MessageReader is deprecated
- the method init(InputStream inputStream, Properties props) is deprecated and replacement is configure(InputStream inputStream, Map<String, ?> configs)
- migration plan: users have address following changes to complete code migration
- import the new dependency: tools-api
- change inheritance from kafkachange interface from kafka.common.MessageReader to orgto org.apache.kafka.client.tools.commonapi.MessageReaderRecordReader
- change method signature from init(InputStream inputStream, Properties props) to configure(InputStream inputStream, Map<String, ?> configs)
Rejected Alternatives
...
- remove the implementation of readMessage()
- implements the readRecords(InputStream) to return Iterator of records
Rejected Alternatives
- support the usage of Serializers (from Juma): this support could complicate the configs, since users have to define both MessageReader and serializer. It seems to me the mechanism of MessageReader should include serialization.
- move RecordReader to "org.apache.kafka.common": "org.apache.kafka.common" does not allow to import code from "org.apache.kafka.clients.producer". However, the tool-related interface should be able to access producer, consumer and admin code.
- move RecordReader to "org.apache.kafka.clients.tool": client module already has many pluggable interfaces. we should follow the package naming. The server-related pluggable interfaces are located at "org.apache.kafka.server", and thus tools-related interface should be located at "org.apache.kafka.tools"
new RecordReader implements Configurable: Implementing Configurable will change the arguments of configure method from (InputStream, configs) to (configs). That obstructs RecordReader from keeping input stream itself as some of that state.- configurable(InputStream, configs) - diverges from the Configurable interface and it is strange to pass an InputStream to a configure method.
- move RecordReader to "org.apache.kafka.tools" (client module): The tools module has same package so we should not create the same package on another module to avoid split package (https://www.logicbig.com/tutorials/core-java-tutorial/modules/split-packages.html)