Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
/**
 * Typical implementations of this interface convert data from an `InputStream` received via `configure` into a
 * `ProducerRecord` instance on each invocation of `readRecord`. Noted that the implementations to have a public
 * nullary constructor.
 *
 * This is used by the `kafka.tools.ConsoleProducer`.
 */
public interface RecordReader extends Closeable {


    default void configure(InputStream input, Map<String, ?> configs) {}
    , Configurable {
    /**
     * read byte array from input stream and then generate a producer record
     * @param inputStream of message 
     * @return a producer record
     */
    ProducerRecord<byteIterator<ProducerRecord<byte[], byte[]>>> readRecordreadRecords(InputStream);


    /**
     * Closes this reader
     */
    default void close() {}
}

...

  1. support the usage of Serializers (from Juma): this support could complicate the configs, since users have to define both MessageReader and serializer. It seems to me the mechanism of MessageReader should include serialization.
  2. move RecordReader to "org.apache.kafka.common": "org.apache.kafka.common" does not allow to import code from "org.apache.kafka.clients.producer". However, the tool-related interface should be able to access producer, consumer and admin code. 
  3. new RecordReader implements Configurable: Implementing Configurable will change the arguments of configure method from (InputStream, configs) to (configs). That obstructs RecordReader from keeping input stream itself as some of that state.
  4. configurable(InputStream, configs) - diverges from the Configurable interface and it is strange to pass an InputStream to a configure method.