This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
If we have some offline log files, we need to enter data into kafka for analysis and use in subsequent processes.
This situation may not be frequent, but it does exist.
Currently, the producer command line tool can read the content of the file to produce data in this way:
e.g kafka-console-producer.sh --bootstrap-server `hostname`:9092 --topic test-01 < 1.txt (Only supports reading a single file)
Or use a more complex shell to enter the contents of multiple files into kafka.
As far as user habits are concerned, when encountering such problems, they always ask Kafka whether it supports such use at the first time, and then look for other alternative solutions.
If the application layer of the kafka producer command line tool supports reading multi-file data, it will be more convenient for users.
Even if we still only need to read the data of a single file, at least from the application layer to support this function, there will be more choices for users, which does not do any harm.
On the other hand, I want to separate the multi-file data reading from the shell, so that the kafka tool itself has this function.
On different operating system platforms, using different interpreters, reading multi-file scripts needs to consider compatibility, such as the difference between bash and bat.
If the producer command line tool naturally supports reading multiple files, it will be very convenient for users to use this function on the windows platform without learning bat.
So We want to introduce --files to the producer command line tool to support reading data from a given multi-file,
Multiple files are separated by comma. Comma is the only delimiter supported. We will add this prompt information to the option comment.
Public Interfaces
kafka-console-producer.sh:
The producer command will accept --files option.
E.g
1.The content in the 1.txt file is:
a
b
c
2.The content in the 2.txt file is:
1
2
3
3.Execute producer command line tool:
./bin/kafka-console-producer.sh --bootstrap-server `hostname`:9092 --topic test-01 --files 1.txt,2.txt
4.Execute consumer command line tool:
./bin/kafka-console-consumer.sh --bootstrap-server `hostname`:9092 --topic test-01 --from-beginning
console output:
a
b
c
1
2
3
Proposed Changes
Implement FileMessageReader for the MessageReader interface,The original LineMessageReader will still be the default reader.
When the --files option is used, it will switch to FileMessageReader.
The current implementation scheme is not complicated, it does not have much difference with Line MessageReader, but in order to distinguish it from Line, I extracted FileMessageReader.
MessageReader
MessageReader adds close method to close some resources.
trait MessageReader { def init(inputStream: InputStream, props: Properties): Unit = {} def readMessage(): ProducerRecord[Array[Byte], Array[Byte]] def close(): Unit = {} }
FileMessageReader
class FileMessageReader extends MessageReader { var topic: String = null var reader: BufferedReader = null var parseKey = false var keySeparator = "\t" var ignoreError = false var lineNumber = 0 var filePaths = Array.empty[String] var next = 0 def checkFilesPath(filesPath: String, filesSeparator: String): Array[String] = { val files = filesPath.split(filesSeparator) var nonExistentFiles = Set.empty[String] for (fileName <- files){ val file = new File(fileName) if (!file.exists()) nonExistentFiles += fileName } if (nonExistentFiles.nonEmpty){ System.err.println(s"ERROR: The given file path $nonExistentFiles does not exist. Please check the file path!") Exit.exit(1) } files } override def init(inputStream: InputStream, props: Properties): Unit = { topic = props.getProperty("topic") if (props.containsKey("parse.key")) parseKey = props.getProperty("parse.key").trim.equalsIgnoreCase("true") if (props.containsKey("key.separator")) keySeparator = props.getProperty("key.separator") if (props.containsKey("ignore.error")) ignoreError = props.getProperty("ignore.error").trim.equalsIgnoreCase("true") filePaths = checkFilesPath(props.getProperty("filesPath"), props.getProperty("files.separator")) reader = new BufferedReader(new FileReader(filePaths(next))) } override def readMessage() = { lineNumber += 1 (reader.readLine(), parseKey) match { case (null, _) => next += 1 if (next >= filePaths.length) null else { reader = new BufferedReader(new FileReader(filePaths(next))) readMessage() } case (line, true) => line.indexOf(keySeparator) match { case -1 => if (ignoreError) new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8)) else throw new KafkaException(s"No key found on line $lineNumber: $line") case n => val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes(StandardCharsets.UTF_8) new ProducerRecord(topic, line.substring(0, n).getBytes(StandardCharsets.UTF_8), value) } case (line, false) => new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8)) } } override def close(): Unit = { if (reader != null) reader.close() } }
Compatibility, Deprecation, and Migration Plan
The new options have no effect on existing usage.
Rejected Alternatives
1.Discard the --files-separator option: The comma separator conforms to the habit of most users, and this habit does not change frequently. Similarly, we will use the comma as the default separator and just follow this rule.