You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Motivation

A common use case for Kafka is real-time processes that transform data from input topics to output topics. Today there are a couple of options available for users to process such data:

  1. Use the Kafka producer and consumer APIs with customized processing logic. For example:

    // create a producer and a consumer
    KafkaProducer producer = new KafkaProducer(configs);
    KafkaConsumer consumer = new KafkaConsumer(configs);
     
    // start a thread with a producer and consumer client
    // for data IO and execute processing logic
    new Thread(new Runnable {
      @Override
      void run() {
        while (isRunning) {
          // read some data from up-stream Kafka
          List<Message> inputMessages = consumer.poll();
     
          // do some processing..
    
          // send the output to the down-stream Kafka
          producer.send(outputMessages);
        }
      }
    }).start()
  2. Use a full-fledged stream processing system such as Storm, Samza, Spark Streaming, or Flink with Kafka as their source / sink stream data storage.

 

Both of those approaches have some downsides. Downsides of using the first option are that the producer and consumer APIs used for writing transformations are somewhat low level; simple examples are relatively simple, but any kind of more complex transformation tends to be a bit complicated. The opportunities for a richer client to add value beyond what the producer and consumer do would be:

1. Manage multi-threading and parallelism within a process.

2. Manage partitioning assignment to processes / threads.

3. Manage journaled local state storage.

4. Manage offset commits and "exactly-once" guarantees as appropriate features are added in Kafka to support this.


The second option, i.e. using a full stream processing framework can be a good solution but a couple of things tend to make it a bit heavy-weight:

1. These frameworks are poorly integrated with Kafka (different concepts, configuration, monitoring, terminology). For example, these frameworks only use Kafka as its stream data source / sink of the whole processing topology, while using their own in-memory format for storing intermediate data (RDD, Bolt memory map, etc). If users want to persist these intermediate results to Kafka as well, they need to break their processing into multiple topologies that need to be deployed separately, increasing operation and management costs.

2. These frameworks either duplicate or force the adoption of a packaging, deployment, and clustering solution. For example, in Storm you need to run a Storm cluster which is a separate thing that has to be monitored and operated. In an elastic environment like AWS, Mesos, YARN, etc this is sort of silly since then you have a Storm cluster inside the YARN cluster vs just directly running the jobs in Mesos; similarly Samza is tied up with YARN. 

3. These frameworks can't be integrated with existing services or applications. For example, you can't just embed a light transformation library inside an existing app, but rather the entire framework that runs as a service.

 

Processor Client Proposal

We want to propose another standalone client besides the existing producer and consumer clients for processing data consumed from Kafka and storing results back to Kafka. We call it the "processor" client

The processor client would provide the following functionalities:

Data Processing APIs

0. Like the producer / consumer clients, the processor client should also be able to specify the Kafka cluster(s) and topic(s) to consume data from / send data to.

1. Per-message processing: this is the basic function that can be triggered once a new message has consumed from Kafka. 

2. Time-triggered processing: this function can be triggered whenever a specified time period has elapsed. It can be used for windowing computation, for example.

3. Local state storage: users can create state storage that can be accessed locally in the previous two functions. It can be used to compute aggregates / non-monolithic operations, for example.

 

Following the above requirements, an example processing interface is as the following:

public interface KafkaProcessorContext {

  void send(String topic, Object key, Object value);  // send the key value-pair to a Kafka topic


  void schedule(long timestamp);                      // repeatedly schedule the punctuation function for the period

  void commit();                                      // commit the current state, along with the upstream offset and the downstream sent data

  String topic();                                     // return the Kafka record's topic of the current processing key-value pair

  int partition();                                    // return the Kafka record's partition id of the current processing key-value pair

  long offset();                                      // return the Kafka record's offset of the current processing key-value pair
}
 
public interface KafkaProcessor<K, V>  {

  void init(KafkaProcessorContext context);           // initialize the processor

  void process(K key, V value);                       // process a key-value pair

  void punctuate(long streamTime);                    // process when the the scheduled time has reached
}

 

And an simple aggregate processor will look like the following:

public class MyKafkaProcessor extends KafkaProcessor<String, Integer> {

  private KafkaProcessorContext processorContext;
  private StateStore<String, Integer> store;

  @Override
  public void init(KafkaProcessorContext context) {
    // trigger the punctuation function every 1000ms
    this.processorContext = context;
    this.processorContext.schedule(1000);  

    // create and restore a local key-value store
    this.store = new KeyValueStore<>("local-state", context);
    this.store.restore();
  }

  @Override
  public void process(String key, Integer value) {
    // aggregate the values by key in the local store
    Integer oldValue = this.kvStore.get(key);
    if (oldValue == null) {
      this.kvStore.put(key, value);
    } else {
      int newValue = oldValue + value;
      this.kvStore.put(key, newValue);
    }

    processorContext.commit();
  }

  @Override
  public void punctuate(long streamTime) {
    // send the aggregated values to the aggregate Kafka topic
    KeyValueIterator<String, Integer> iter = this.store.all();
    while (iter.hasNext()) {
      Entry<String, Integer> entry = iter.next();
      processorContext.send("aggregate-topic", entry.key(), entry.value())
    }
  }
}
 
// define an entry function to run this processor
public static void main(String[] args) {
  // starting a new thread running the user defined processing logic;
  // the configs will contain the consuming Kafka topics
  KafkaProcessor processor = new MyKafkaProcessor(configs);
  KStreamThread thread = new KStreamThread(processor);
  thread.start();
 
  // stopping the stream thread
  thread.shutdown();
}

 

Comparing it with the above consumer / producer client implementations, this processing API has the following programming benefits:

1. Abstract the low-level consumer / producer interfaces, such as consumer.poll() / commit(), producer.send(callback), producer.flush(), etc.

2. Provide a convenient manner to express time-based computing logic along with local state storage such as aggregates, windowed joins, etc.

3. 

 

 

Design Goals

The Processor client should include the following functionalities:

  1. Processing APIs: as shown in the previous section, it should provide some programming interface for users to specify the processing logic. This would at least include 1) per-arrived-message processing, 2) time-triggered processing, which is called periodically for windowing computations, etc, 3) local state representation for aggregates / non-monolithic operations, etc. In addition, it would be great if some high-level functional processing interface, such as map / filter / union / etc can also be supported.

  2. Flexible usage mode: it is able to be used as a standalone library just like other producer / consumer clients; similar cmd line tools could also be provided start the client as a separate process.
     

  3. Leveraging Kafka-specific features such as metrics reporting, metadata refreshing, client-side partition assignment, re-processing, etc.

 

We don't have a full fledged proposal for all the implementation details yet; it will need some prototyping to work out. This KIP is meant as a stub to start the conversation, it will be updated as we have a more complete prototype to show.

 

 

Compatibility, Deprecation, and Migration Plan

This KIP only proposes additions. There should be no compatibility issues.


  • No labels