You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Motivation

A common use case for Kafka is real-time processes that transform data from input topics to output topics. Today there are a couple of options available for users to process such data:

  1. Use the Kafka producer and consumer APIs with customized processing logic. For example:

    // create a producer and a consumer
    KafkaProducer producer = new KafkaProducer(configs);
    KafkaConsumer consumer = new KafkaConsumer(configs);
     
    // start a thread with a producer and consumer client
    // for data IO and execute processing logic
    new Thread(new Runnable {
      @Override
      void run() {
        while (isRunning) {
          // read some data from up-stream Kafka
          List<Message> inputMessages = consumer.poll();
     
          // do some processing..
    
          // send the output to the down-stream Kafka
          producer.send(outputMessages);
        }
      }
    }).start()
  2. Use a full-fledged stream processing system such as Storm, Samza, Spark Streaming, or Flink with Kafka as their source / sink stream data storage.

 

Both of those approaches have some downsides. Downsides of using the first option are that the producer and consumer APIs used for writing transformations are somewhat low level; simple examples are relatively simple, but any kind of more complex transformation tends to be a bit complicated. The opportunities for a richer client to add value beyond what the producer and consumer do would be:

1. Manage multi-threading and parallelism within a process.

2. Manage partitioning assignment to processes / threads.

3. Manage journaled local state storage.

4. Manage offset commits and "exactly-once" guarantees as appropriate features are added in Kafka to support this.


The second option, i.e. using a full stream processing framework can be a good solution but a couple of things tend to make it a bit heavy-weight (a brief and still-going survey can be found here):

1. These frameworks are poorly integrated with Kafka (different concepts, configuration, monitoring, terminology). For example, these frameworks only use Kafka as its stream data source / sink of the whole processing topology, while using their own in-memory format for storing intermediate data (RDD, Bolt memory map, etc). If users want to persist these intermediate results to Kafka as well, they need to break their processing into multiple topologies that need to be deployed separately, increasing operation and management costs.

2. These frameworks either duplicate or force the adoption of a packaging, deployment, and clustering solution. For example, in Storm you need to run a Storm cluster which is a separate thing that has to be monitored and operated. In an elastic environment like AWS, Mesos, YARN, etc this is sort of silly since then you have a Storm cluster inside the YARN cluster vs just directly running the jobs in Mesos; similarly Samza is tied up with YARN. 

3. These frameworks can't be integrated with existing services or applications. For example, you can't just embed a light transformation library inside an existing app, but rather the entire framework that runs as a service.

 

 

Processor Client Proposal

We want to propose another standalone client besides the existing producer and consumer clients for processing data consumed from Kafka and storing results back to Kafka. We call it the "processor" client

The processor client would provide the following functionalities:

Data Processing APIs

0. Like the producer / consumer clients, the processor client should also be able to specify the Kafka cluster(s) and topic(s) to consume data from / send data to.

1. Per-message processing: this is the basic function that can be triggered once a new message has consumed from Kafka. 

2. Time-triggered processing: this function can be triggered whenever a specified time period has elapsed. It can be used for windowing computation, for example.

 

Local State Storage

Users can create state storage that can be accessed locally in the previous two functions. It can be used to compute aggregates / non-monolithic operations, for example.

 

Following the above requirements, an example processing interface is as the following:

public interface KafkaProcessorContext {

  void send(String topic, Object key, Object value);  // send the key value-pair to a Kafka topic

  void schedule(long timestamp);                      // repeatedly schedule the punctuation function for the period

  void commit();                                      // commit the current state, along with the upstream offset and the downstream sent data

  String topic();                                     // return the Kafka record's topic of the current processing key-value pair

  int partition();                                    // return the Kafka record's partition id of the current processing key-value pair

  long offset();                                      // return the Kafka record's offset of the current processing key-value pair
}
 
public interface KafkaProcessor<K, V>  {

  void init(KafkaProcessorContext context);           // initialize the processor

  void process(K key, V value);                       // process a key-value pair

  void punctuate(long streamTime);                    // process when the the scheduled time has reached
}

 

And an simple aggregate processor will look like the following:

public class MyKafkaProcessor extends KafkaProcessor<String, Integer> {

  private KafkaProcessorContext processorContext;
  private StateStore<String, Integer> store;

  @Override
  public void init(KafkaProcessorContext context) {
    // trigger the punctuation function every 1000ms
    this.processorContext = context;
    this.processorContext.schedule(1000);  

    // create and restore a local key-value store
    this.store = new KeyValueStore<>("local-state", context);
    this.store.restore();
  }

  @Override
  public void process(String key, Integer value) {
    // aggregate the values by key in the local store
    Integer oldValue = this.kvStore.get(key);
    if (oldValue == null) {
      this.kvStore.put(key, value);
    } else {
      int newValue = oldValue + value;
      this.kvStore.put(key, newValue);
    }

    processorContext.commit();
  }

  @Override
  public void punctuate(long streamTime) {
    // send the aggregated values to the aggregate Kafka topic
    KeyValueIterator<String, Integer> iter = this.store.all();
    while (iter.hasNext()) {
      Entry<String, Integer> entry = iter.next();
      processorContext.send("aggregate-topic", entry.key(), entry.value())
    }
  }
}
 
// define an entry function to run this processor
public static void main(String[] args) {
  // starting a new thread running the user defined processing logic;
  // the configs will contain the consuming Kafka topics
  KafkaProcessor processor = new MyKafkaProcessor(configs);
  KStreamThread thread = new KStreamThread(processor);
  thread.start();
 
  // stopping the stream thread
  thread.shutdown();
}

 

This example API is just for demonstrating the programmability easiness comparing with the consumer / producer client implementations. In other words, it should abstract the low-level consumer / producer interfaces, such as consumer.poll() / commit(), producer.send(callback), producer.flush(), etc, and provide a convenient manner to express time-based computing logic along with stateful processing such as aggregates, windowed joins, etc.

 

In addition, we can provide a set of command line scripts so that this processor client can be started as a separate process like other clients, i.e., just like bin/kafka-XXX.sh, we can also have a another bin/kafka-processor-start.sh such that we can use it like the following:

bin/kafka-processor-start.sh org.apache.kafka.clients.processor.examples.MyKafkaProcessor -configs=...

 

Flexible Partition Distribution

Since Kafka topics are partitioned, to increase parallelism users should be able to run multiple instances of their processor client with each client instance fetching a subset of partitions from the specified Kafka topics. Hence a common requirement is to flexibly specify the distribution of those partitions among all the processor instances. Some example scenarios of partition distribution are listed here:

1. Co-partitioning: for example, suppose we have a processor topology like the following:

Topic-A --> Stream-A --> |
                          Join -->   Topic-C  --> Stream-C   -->  |
Topic-B --> Stream-B --> |
                                                                    Join -->   Topic-E  
Topic-D --> Stream-D ------------------------------------------>  |

To support these two joins, users would then like to co-partition topic A and B, and co-partition topic C and D, but not necessarily co-partition A/B with C/D.

3. Stick partitioning: for stateful processing, users may want to have a static mapping from stream partitions to process threads.

4. N-way partitioning: when we have stand-by processor instances, users may want to assign a single stream partition to multiple process threads.

 

In the current consumer coordinator design the partition rebalancing algorithm is executed at the server side, and we would better expose this assigning interface to the client when necessary. A separate wiki page for this change proposal on the Java consumer will be up soon.

Metrics Reporting

This processor client should be naturally leverage Kafka's metrics modules for its own processing-related metrics collection and reporting, such as processing-latency, punctuation-frequency, commit-latency, restore-latency, etc.

 

High-level Functional APIs

In addition, it would be great if some high-level functional DSL interface can be provided, such that those common processing patterns like map / aggregate / filter / union / join / etc can be easily expressed without re-implementing the processor APIs.

 

[NOTE] We don't have a full fledged proposal with all the implementation details for any of the above features yet; it will need some prototyping to work out, but we want to start the conversation and which of the above features should be included, and if yes how to do that. This wiki page will be updated as we have a more complete prototype to show.

 

 

Compatibility, Deprecation, and Migration Plan

This KIP only proposes additions. There should be no compatibility issues.


  • No labels