Motivation
A common use case for Kafka is real-time processes that transform data from input topics to output topics. Today there are a couple of options available for users to process such data:
Use the Kafka producer and consumer APIs with customized processing logic. For example:
// create a producer and a consumer KafkaProducer producer = new KafkaProducer(configs); KafkaConsumer consumer = new KafkaConsumer(configs); // start a thread with a producer and consumer client // for data IO and execute processing logic new Thread(new Runnable { @override void run() { while (isRunning) { // read some data from up-stream Kafka List<Message> inputMessages = consumer.poll(); // do some processing.. // send the output to the down-stream Kafka producer.send(outputMessages); } } }).start()
- Use a full-fledged stream processing system such as Storm, Samza, Spark Streaming, or Flink with Kafka as their source / sink stream data storage.
Both of those approaches have some downsides. Downsides of using the first option are that the producer and consumer APIs used for writing transformations are somewhat low level; simple examples are relatively simple, but any kind of more complex transformation tends to be a bit complicated. The opportunities for a richer client to add value beyond what the producer and consumer do would be:
1. Manage multi-threading and parallelism within a process.
2. Manage partitioning assignment to processes / threads.
3. Manage journaled local state storage.
4. Manage offset commits and "exactly-once" guarantees as appropriate features are added in Kafka to support this.
The second option, i.e. using a full stream processing framework can be a good solution but a couple of things tend to make it a bit heavy-weight:
1. These frameworks are poorly integrated with Kafka (different concepts, configuration, monitoring, terminology). For example, these frameworks only use Kafka as its stream data source / sink of the whole processing topology, while using their own in-memory format for storing intermediate data (RDD, Bolt memory map, etc). If users want to persist these intermediate results to Kafka as well, they need to break their processing into multiple topologies that need to be deployed separately, increasing operation and management costs.
2. These frameworks either duplicate or force the adoption of a packaging, deployment, and clustering solution. For example, in Storm you need to run a Storm cluster which is a separate thing that has to be monitored and operated. In an elastic environment like AWS, Mesos, YARN, etc this is sort of silly since then you have a Storm cluster inside the YARN cluster vs just directly running the jobs in Mesos; similarly Samza is tied up with YARN.
3. These frameworks can't be integrated with existing services or applications. For example, you can't just embed a light transformation library inside an existing app, but rather the entire framework that runs as a service.
Proposed Changes
We want to propose another transformation-centric client besides the existing producer and consumer clients (we can call it the "processor" client). This client can be used as a standalone API for processing stream data stored in Kafka.
Comparing it with the above consumer / producer client implementations:
class MyKafkaProcessor implements KafkaProcessor { @overide void process(String /* topic */, K /* key */, V /* value */, Context) { .. } } // starting a new thread running the user defined processing logic KafkaProcessor processor = new KafkaProcessor(configs); KStreamThread thread = new KStreamThread(processor); thread.start(); // stopping the stream thread thread.shutdown();
Design Goals
The Processor client should include the following functionalities:
- Processing APIs: as shown in the previous section, it should provide some programming interface for users to specify the processing logic. This would at least include 1) per-arrived-message processing, 2) time-triggered processing, which is called periodically for windowing computations, etc, 3) local state representation for aggregates / non-monolithic operations, etc. In addition, it would be great if some high-level functional processing interface, such as map / filter / union / etc can also be supported.
Flexible usage mode: it is able to be used as a standalone library just like other producer / consumer clients; similar cmd line tools could also be provided start the client as a separate process.
- Leveraging Kafka-specific features such as metrics reporting, metadata refreshing, client-side partition assignment, re-processing, etc.
We don't have a full fledged proposal for all the implementation details yet; it will need some prototyping to work out. This KIP is meant as a stub to start the conversation, it will be updated as we have a more complete prototype to show.
Compatibility, Deprecation, and Migration Plan
This KIP only proposes additions. There should be no compatibility issues.