You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateUnder Discussion

Discussion thread: TODO

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

kafka.common.MessageReader is a input argument of kafka-console-producer and we expect users can have their custom reader to produce custom records. Hence, MessageReader is a public interface and we should offer a java version to replace current scala code. Also, the new MessageReader should be placed at clients module. (kafka.common.MessageReader is in core module)

Public Interfaces

New org.apache.kafka.common.MessageReader interface


/**
 * Typical implementations of this interface convert data from an `InputStream` received via `init` into a
 * `ProducerRecord` instance on each invocation of `readMessage`.
 *
 * This is used by the `kafka.tools.ConsoleProducer`.
 */
public interface MessageReader extends Closeable {

    /**
     * Initialises the MessageReader
     * @param inputStream of message
     * @param props Properties to configure the reader
     * @deprecated Use {@link #configure(InputStream, Map)} instead, this method is for backward compatibility with
     * the older reader interface
     */
    @Deprecated
    default void init(InputStream inputStream, Properties props) {}

    /**
     * Configures the MessageReader
     * @param inputStream of message
     * @param configs Map to configure the reader
     */
    default void configure(InputStream inputStream, Map<String, ?> configs) {
        Properties properties = new Properties();
        properties.putAll(configs);
        init(inputStream, properties);
    }

    /**
     * read byte array from input stream and then generate a producer record
     * @return a producer record
     */
    ProducerRecord<byte[], byte[]> readMessage();


    /**
     * Closes this reader
     */
    default void close() {}
}


Proposed Changes

Deprecate kafka.common.MessageReader


@deprecated("This class is deprecated and will be replaced by org.apache.kafka.common.MessageReader.", "2.7.0")
trait MessageReader extends org.apache.kafka.common.MessageReader


Compatibility, Deprecation, and Migration Plan

  1. backward compatibility
    kafka.common.MessageReader implementations can keep working without recompilation. 
  2. deprecation
    1. kafka.common.MessageReader is deprecated
    2. the method init(InputStream inputStream, Properties props) is deprecated and replacement is configure(InputStream inputStream, Map<String, ?> configs)
  3. migration plan: users have address following changes to complete code migration
    1. change inheritance from kafka.common.MessageReader to org.apache.kafka.common.MessageReader
    2. change method signature from init(InputStream inputStream, Properties props) to configure(InputStream inputStream, Map<String, ?> configs)

Rejected Alternatives

N/A

  • No labels