Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Both of those approaches have some downsides. Downsides of using the first option are that the producer and consumer APIs used for writing transformations are somewhat low level; simple examples are relatively simple, but any kind of more complex transformation tends to be a bit complicated. The opportunities for a richer client to add value beyond what the producer and consumer do would be:

 1. Manage multi-threading and parallelism within a process.

 2. Manage partitioning assignment to processes / threads.

 3. Manage journaled local state storage.

 4. Manage offset commits and "exactly-once" guarantees as appropriate features are added in Kafka to support this.

...

The second option, i.e. using a full stream processing framework can be a good solution but a couple of things tend to make it a bit heavy-weight:

1. These frameworks are poorly integrated with Kafka (different concepts, configuration, monitoring, terminology). For example, these frameworks only use Kafka as its stream data source / sink of the whole processing topology, while using their own in-memory format for storing intermediate data (RDD, Bolt memory map, etc). If users want to persist these intermediate results to Kafka as well, they need to break their processing into multiple topologies that need to be deployed separately, increasing operation and management costs.

2. These frameworks either duplicate or force the adoption of a packaging, deployment, and clustering solution. For example, in Storm you need to run a Storm cluster which is a separate thing that has to be monitored and operated. In an elastic environment like AWS, Mesos, YARN, etc this is sort of silly since then you have a Storm cluster inside the YARN cluster vs just directly running the jobs in Mesos; similarly Samza is tied up with YARN. 

3. These frameworks can't be integrated with existing services or applications. For example, you can't just embed a light transformation library inside an existing app, but rather the entire framework that runs as a service.

...

 

Processor Client Proposal

We want to propose another standalone client besides the existing producer and consumer clients for processing data consumed from Kafka and storing results back to Kafka. We call it the "processor" client

The processor client would provide the following functionalities:

Data Processing APIs

0. Like the producer / consumer clients, the processor client should also be able to specify the Kafka cluster(s) and topic(s) to consume data from / send data to.

1. Per-message processing: this is the basic function that can be triggered once a new message has consumed from Kafka. 

2. Time-triggered processing: this function can be triggered whenever a specified time period has elapsed. It can be used for windowing computation, for example.

3. Local state storage: users can create state storage that can be accessed locally in the previous two functions. It can be used to compute aggregates / non-monolithic operations, for example.

 

Following the above requirements, an example processing interface is as the following:

Code Block
languagejava
public interface KafkaProcessorContext {

  void send(String topic, Object key, Object value);  // send the key value-pair to a Kafka topic


  void schedule(long timestamp);                      // repeatedly schedule the punctuation function for the period

  void commit();                                      // commit the current state, along with the upstream offset and the downstream sent data

  String topic();                                     // return the Kafka record's topic of the current processing key-value pair

  int partition();                                    // return the Kafka record's partition id of the current processing key-value pair

  long offset();                                      // return the Kafka record's offset of the current processing key-value pair
}
 
public interface KafkaProcessor<K, V>  {

  void init(KafkaProcessorContext context);           // initialize the processor

  void process(K key, V value);                       // process a key-value pair

  void punctuate(long streamTime);                    // process when the the scheduled time has reached
}

 

And an simple aggregate processor will look like the followingComparing it with the above consumer / producer client implementations:

Code Block
languagejava
public class MyKafkaProcessor extends implementsKafkaProcessor<String, KafkaProcessorInteger> {

  private KafkaProcessorContext processorContext;
  private StateStore<String, Integer> store;

  @Override
  public void process(String /* topic */, K /* key */, V /* value */, Context) { .. }
}
 
init(KafkaProcessorContext context) {
    // trigger the punctuation function every 1000ms
    this.processorContext = context;
    this.processorContext.schedule(1000);  

    // create and restore a local key-value store
    this.store = new KeyValueStore<>("local-state", context);
    this.store.restore();
  }

  @Override
  public void process(String key, Integer value) {
    // aggregate the values by key in the local store
    Integer oldValue = this.kvStore.get(key);
    if (oldValue == null) {
      this.kvStore.put(key, value);
    } else {
      int newValue = oldValue + value;
      this.kvStore.put(key, newValue);
    }

    processorContext.commit();
  }

  @Override
  public void punctuate(long streamTime) {
    // send the aggregated values to the aggregate Kafka topic
    KeyValueIterator<String, Integer> iter = this.store.all();
    while (iter.hasNext()) {
      Entry<String, Integer> entry = iter.next();
      processorContext.send("aggregate-topic", entry.key(), entry.value())
    }
  }
}
 
// define an entry function to run this processor
public static void main(String[] args) {
  // starting a new thread running the user defined processing logic;
  // the configs will contain the consuming Kafka topics
  KafkaProcessor processor = new KafkaProcessorMyKafkaProcessor(configs);
  KStreamThread thread = new KStreamThread(processor);
  thread.start();
  
  // stopping the stream thread
  thread.shutdown();
}

 

Comparing it with the above consumer / producer client implementations, this processing API has the following programming benefits:

1. Abstract the low-level consumer / producer interfaces, such as consumer.poll() / commit(), producer.send(callback), producer.flush(), etc.

2. Provide a convenient manner to express time-based computing logic along with local state storage such as aggregates, windowed joins, etc.

3. 

 

 

Design Goals

The Processor client should include the following functionalities:

...