Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

If we have some offline log files, we need to enter data into kafka for analysis and use in subsequent processes.

This situation may not be frequent, but it does exist.

Currently, the producer command line tool can read the content of the file to produce data in this way:

e.g kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt (Only supports reading a single file)

Or use a more complex shell to enter the contents of multiple files into kafka.

If the application layer of the kafka producer command line tool supports reading multi-file data, it will be more convenient for users.

Even if we still only need to read the data of a single file, at least from the application layer to support this function, there will be more choices for users, which does not do any harm.

So We want to introduce --files to the producer command line tool to support reading data from a given multi-file,

Multiple files are separated by --files-separator, the default comma is the separatorby comma. Comma is the only delimiter supported. We will add this prompt information to the option comment.

Public Interfaces

kafka-console-producer.sh:
The producer command will accept  --files  and --files-separator options.Implement FileMessageReader for the MessageReader interface,The original LineMessageReader will still be the default reader.
When the --files option is used, it will switch to FileMessageReader.

E.g
1.The content in the 1.txt file is:
a
b
c

...

4.Execute consumer command line tool:
./bin/kafka-console-consumer.sh --bootstrap-server `hostname`:9092 --topic test-01 --from-beginning
console output:
a
b
c
1
2
3



Proposed Changes

Implement FileMessageReader for the MessageReader interface,The original LineMessageReader will still be the default reader.

When the --files option is used, it will switch to FileMessageReader.

The current implementation scheme is not complicated, it does not have much difference with Line MessageReader, but in order to distinguish it from Line, I extracted FileMessageReader.

MessageReader

MessageReader adds close method to close some resources.

Code Block
languagescala
titleMessageReader
trait MessageReader {

  def init(inputStream: InputStream, props: Properties): Unit = {}

  def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]

  def close(): Unit = {}

}

FileMessageReader

Code Block
languagescala
titleFileMessageReader
  class FileMessageReader extends MessageReader {
    var topic: String = null
    var reader: BufferedReader = null
    var parseKey = false
    var keySeparator = "\t"
    var ignoreError = false
    var lineNumber = 0
    var filePaths = Array.empty[String]
    var next = 0

    def checkFilesPath(filesPath: String, filesSeparator: String): Array[String] = {
      val files = filesPath.split(filesSeparator)
      var nonExistentFiles = Set.empty[String]
      for (fileName <- files){
        val file = new File(fileName)
        if (!file.exists())
          nonExistentFiles += fileName
      }
      if (nonExistentFiles.nonEmpty){
        System.err.println(s"ERROR: The given file path $nonExistentFiles does not exist. Please check the file path!")
        Exit.exit(1)
      }
      files
    }

    override def init(inputStream: InputStream, props: Properties): Unit = {
      topic = props.getProperty("topic")
      if (props.containsKey("parse.key"))
        parseKey = props.getProperty("parse.key").trim.equalsIgnoreCase("true")
      if (props.containsKey("key.separator"))
        keySeparator = props.getProperty("key.separator")
      if (props.containsKey("ignore.error"))
        ignoreError = props.getProperty("ignore.error").trim.equalsIgnoreCase("true")
      filePaths = checkFilesPath(props.getProperty("filesPath"), props.getProperty("files.separator"))
      reader = new BufferedReader(new FileReader(filePaths(next)))
    }

    override def readMessage() = {
      lineNumber += 1

      (reader.readLine(), parseKey) match {
        case (null, _) =>
          next += 1
          if (next >= filePaths.length)
            null
          else {
            reader = new BufferedReader(new FileReader(filePaths(next)))
            readMessage()
          }
        case (line, true) =>
          line.indexOf(keySeparator) match {
            case -1 =>
              if (ignoreError) new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
              else throw new KafkaException(s"No key found on line $lineNumber: $line")
            case n =>
              val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes(StandardCharsets.UTF_8)
              new ProducerRecord(topic, line.substring(0, n).getBytes(StandardCharsets.UTF_8), value)
          }
        case (line, false) =>
          new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
      }
    }

    override def close(): Unit = {
      if (reader != null)
        reader.close()
    }
  }


Compatibility, Deprecation, and Migration Plan

...