Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

2. Time-triggered processing: this function can be triggered whenever a specified time period has elapsed. It can be used for windowing computation, for example.

 

Local State Storage

Users 3. Local state storage: users can create state storage that can be accessed locally in the previous two functions. It can be used to compute aggregates / non-monolithic operations, for example.

...

Code Block
languagejava
public interface KafkaProcessorContext {

  void send(String topic, Object key, Object value);  // send the key value-pair to a Kafka topic


  void schedule(long timestamp);                      // repeatedly schedule the punctuation function for the period

  void commit();                                      // commit the current state, along with the upstream offset and the downstream sent data

  String topic();                                     // return the Kafka record's topic of the current processing key-value pair

  int partition();                                    // return the Kafka record's partition id of the current processing key-value pair

  long offset();                                      // return the Kafka record's offset of the current processing key-value pair
}
 
public interface KafkaProcessor<K, V>  {

  void init(KafkaProcessorContext context);           // initialize the processor

  void process(K key, V value);                       // process a key-value pair

  void punctuate(long streamTime);                    // process when the the scheduled time has reached
}

...

Code Block
languagejava
public class MyKafkaProcessor extends KafkaProcessor<String, Integer> {

  private KafkaProcessorContext processorContext;
  private StateStore<String, Integer> store;

  @Override
  public void init(KafkaProcessorContext context) {
    // trigger the punctuation function every 1000ms
    this.processorContext = context;
    this.processorContext.schedule(1000);  

    // create and restore a local key-value store
    this.store = new KeyValueStore<>("local-state", context);
    this.store.restore();
  }

  @Override
  public void process(String key, Integer value) {
    // aggregate the values by key in the local store
    Integer oldValue = this.kvStore.get(key);
    if (oldValue == null) {
      this.kvStore.put(key, value);
    } else {
      int newValue = oldValue + value;
      this.kvStore.put(key, newValue);
    }

    processorContext.commit();
  }

  @Override
  public void punctuate(long streamTime) {
    // send the aggregated values to the aggregate Kafka topic
    KeyValueIterator<String, Integer> iter = this.store.all();
    while (iter.hasNext()) {
      Entry<String, Integer> entry = iter.next();
      processorContext.send("aggregate-topic", entry.key(), entry.value())
    }
  }
}
 
// define an entry function to run this processor
public static void main(String[] args) {
  // starting a new thread running the user defined processing logic;
  // the configs will contain the consuming Kafka topics
  KafkaProcessor processor = new MyKafkaProcessor(configs);
  KStreamThread thread = new KStreamThread(processor);
  thread.start();
 
  // stopping the stream thread
  thread.shutdown();
}

 

Comparing it This example API is just for demonstrating the programmability easiness comparing with the above consumer / producer client implementations, this processing API has the following programming benefits:1. Abstract the . In other words, it should abstract the low-level consumer / producer interfaces, such as consumer.poll() / commit(), producer.send(callback), producer.flush(), etc.2. Provide , and provide a convenient manner to express time-based computing logic along with local state storage stateful processing such as aggregates, windowed joins, etc.

3. 

 

 

Design Goals

The Processor client should include the following functionalities:

...

 

In addition, we can provide a set of command line scripts so that this processor client can be started as a separate process like other clients, i.e., just like bin/kafka-XXX.sh, we can also have a another bin/kafka-processor-start.sh such that we can use it like the following:

Code Block
bin/kafka-processor-start.sh org.apache.kafka.clients.processor.examples.MyKafkaProcessor -configs=...

 

Flexible Partition Distribution

Since Kafka topics are partitioned, to increase parallelism users should be able to run multiple instances of their processor client with each client instance fetching a subset of partitions from the specified Kafka topics. Hence a common requirement is to flexibly specify the distribution of those partitions among all the processor instances. Some example scenarios of partition distribution are listed here:

1. Co-partitioning: for example, suppose we have a processor topology like the following:

Code Block
Topic-A --> Stream-A --> |
                          Join -->   Topic-C  --> Stream-C   -->  |
Topic-B --> Stream-B --> |
                                                                    Join -->   Topic-E  
Topic-D --> Stream-D ------------------------------------------>  |

To support these two joins, users would then like to co-partition topic A and B, and co-partition topic C and D, but not necessarily co-partition A/B with C/D.

3. Stick partitioning: for stateful processing, users may want to have a static mapping from stream partitions to process threads.

4. N-way partitioning: when we have stand-by processor instances, users may want to assign a single stream partition to multiple process threads.

 

In the current consumer coordinator design the partition rebalancing algorithm is executed at the server side, and we would better expose this assigning interface to the client when necessary. A separate wiki page for this change proposal on the Java consumer will be up soon.

Metrics Reporting

This processor client should be naturally leverage Kafka's metrics modules for its own processing-related metrics collection and reporting, such as processing-latency, punctuation-frequency, commit-latency, restore-latency, etc.

 

High-level Functional APIs

In addition, it would be great if some high-level functional

...

DSL interface can be provided, such

...

that those common processing patterns like map / aggregate / filter / union / join / etc can

...

Flexible usage mode: it is able to be used as a standalone library just like other producer / consumer clients; similar cmd line tools could also be provided start the client as a separate process.
 

...

be easily expressed without re-implementing the processor APIs.

 

[NOTE] We don't have a full fledged proposal for with all the implementation details details for any of the above features yet; it will need some prototyping to work out. This KIP is meant as a stub , but we want to start the conversation , it and which of the above features should be included, and if yes how to do that. This wiki page will be updated as we have a more complete prototype to show.

...