You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

If we have some offline log files, we need to enter data into kafka for analysis and use in subsequent processes.

This situation may not be frequent, but it does exist.

Currently, the producer command line tool can read the content of the file to produce data in this way:

e.g kafka-console-producer.sh --bootstrap-server `hostname`:9092 --topic test-01 < 1.txt (Only supports reading a single file)

Or use a more complex shell to enter the contents of multiple files into kafka.

As far as user habits are concerned, when encountering such problems, they always ask Kafka whether it supports such use at the first time, and then look for other alternative solutions.

If the application layer of the kafka producer command line tool supports reading multi-file data, it will be more convenient for users.

Even if we still only need to read the data of a single file, at least from the application layer to support this function, there will be more choices for users, which does not do any harm.

On the other hand, I want to separate the multi-file data reading from the shell, so that the kafka tool itself has this function.

On different operating system platforms, using different interpreters, reading multi-file scripts needs to consider compatibility, such as the difference between bash and bat.

If the producer command line tool naturally supports reading multiple files, it will be very convenient for users to use this function on the windows platform without learning bat.

So We want to introduce --files to the producer command line tool to support reading data from a given multi-file,

Multiple files are separated by comma. Comma is the only delimiter supported. We will add this prompt information to the option comment.

Public Interfaces

kafka-console-producer.sh:
The producer command will accept  --files option.

E.g
1.The content in the 1.txt file is:
a
b
c

2.The content in the 2.txt file is:
1
2
3

3.Execute producer command line tool:
./bin/kafka-console-producer.sh --bootstrap-server `hostname`:9092 --topic test-01 --files 1.txt,2.txt

4.Execute consumer command line tool:
./bin/kafka-console-consumer.sh --bootstrap-server `hostname`:9092 --topic test-01 --from-beginning
console output:
a
b
c
1
2
3

Proposed Changes

Implement FileMessageReader for the MessageReader interface,The original LineMessageReader will still be the default reader.

When the --files option is used, it will switch to FileMessageReader.

The current implementation scheme is not complicated, it does not have much difference with Line MessageReader, but in order to distinguish it from Line, I extracted FileMessageReader.

MessageReader

MessageReader adds close method to close some resources.

MessageReader
trait MessageReader {

  def init(inputStream: InputStream, props: Properties): Unit = {}

  def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]

  def close(): Unit = {}

}

FileMessageReader

FileMessageReader
  class FileMessageReader extends MessageReader {
    var topic: String = null
    var reader: BufferedReader = null
    var parseKey = false
    var keySeparator = "\t"
    var ignoreError = false
    var lineNumber = 0
    var filePaths = Array.empty[String]
    var next = 0

    def checkFilesPath(filesPath: String, filesSeparator: String): Array[String] = {
      val files = filesPath.split(filesSeparator)
      var nonExistentFiles = Set.empty[String]
      for (fileName <- files){
        val file = new File(fileName)
        if (!file.exists())
          nonExistentFiles += fileName
      }
      if (nonExistentFiles.nonEmpty){
        System.err.println(s"ERROR: The given file path $nonExistentFiles does not exist. Please check the file path!")
        Exit.exit(1)
      }
      files
    }

    override def init(inputStream: InputStream, props: Properties): Unit = {
      topic = props.getProperty("topic")
      if (props.containsKey("parse.key"))
        parseKey = props.getProperty("parse.key").trim.equalsIgnoreCase("true")
      if (props.containsKey("key.separator"))
        keySeparator = props.getProperty("key.separator")
      if (props.containsKey("ignore.error"))
        ignoreError = props.getProperty("ignore.error").trim.equalsIgnoreCase("true")
      filePaths = checkFilesPath(props.getProperty("filesPath"), props.getProperty("files.separator"))
      reader = new BufferedReader(new FileReader(filePaths(next)))
    }

    override def readMessage() = {
      lineNumber += 1

      (reader.readLine(), parseKey) match {
        case (null, _) =>
          next += 1
          if (next >= filePaths.length)
            null
          else {
            reader = new BufferedReader(new FileReader(filePaths(next)))
            readMessage()
          }
        case (line, true) =>
          line.indexOf(keySeparator) match {
            case -1 =>
              if (ignoreError) new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
              else throw new KafkaException(s"No key found on line $lineNumber: $line")
            case n =>
              val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes(StandardCharsets.UTF_8)
              new ProducerRecord(topic, line.substring(0, n).getBytes(StandardCharsets.UTF_8), value)
          }
        case (line, false) =>
          new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
      }
    }

    override def close(): Unit = {
      if (reader != null)
        reader.close()
    }
  }


Compatibility, Deprecation, and Migration Plan

The new options have no effect on existing usage.

Rejected Alternatives

1.Discard the --files-separator option: The comma separator conforms to the habit of most users, and this habit does not change frequently. Similarly, we will use the comma as the default separator and just follow this rule.

  • No labels