You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »

Motivation

A common use case for Kafka is real-time processes that transform data from input topics to output topics. Today there are a couple of options available for users to process such data:

  1. Use the Kafka producer and consumer APIs with customized processing logic. For example:

    // create a producer and a consumer
    KafkaProducer producer = new KafkaProducer(configs);
    KafkaConsumer consumer = new KafkaConsumer(configs);
     
    // start a thread with a producer and consumer client
    // for data IO and execute processing logic
    new Thread(new Runnable {
      @Override
      void run() {
        while (isRunning) {
          // read some data from up-stream Kafka
          List<Message> inputMessages = consumer.poll();
     
          // do some processing..
    
          // send the output to the down-stream Kafka
          producer.send(outputMessages);
        }
      }
    }).start()
  2. Use a full-fledged stream processing system such as Storm, Samza, Spark Streaming, or Flink with Kafka as their source / sink stream data storage.

 

Both of those approaches have some downsides. Downsides of using the first option are that the producer and consumer APIs used for writing transformations are somewhat low level; simple examples are relatively simple, but any kind of more complex transformation tends to be a bit complicated. The opportunities for a richer client to add value beyond what the producer and consumer do would be:

1. Manage multi-threading and parallelism within a process.

2. Manage partitioning assignment to processes / threads.

3. Manage journaled local state storage.

4. Manage offset commits and "exactly-once" guarantees as appropriate features are added in Kafka to support this.

 

The second option, i.e. using a full stream processing framework can be a good solution but a couple of things tend to make it a bit heavy-weight (a brief and still-going survey can be found here):

1. These frameworks are poorly integrated with Kafka (different concepts, configuration, monitoring, terminology). For example, these frameworks only use Kafka as its stream data source / sink of the whole processing topology, while using their own in-memory format for storing intermediate data (RDD, Bolt memory map, etc). If users want to persist these intermediate results to Kafka as well, they need to break their processing into multiple topologies that need to be deployed separately, increasing operation and management costs.

2. These frameworks either duplicate or force the adoption of a packaging, deployment, and clustering solution. For example, in Storm you need to run a Storm cluster which is a separate thing that has to be monitored and operated. In an elastic environment like AWS, Mesos, YARN, etc this is sort of silly since then you have a Storm cluster inside the YARN cluster vs just directly running the jobs in Mesos; similarly Samza is tied up with YARN. 

3. These frameworks can't be integrated with existing services or applications. For example, you can't just embed a light transformation library inside an existing app, but rather the entire framework that runs as a service.

 

Processor Client Proposal

We want to propose another standalone "processor" client besides the existing producer and consumer clients for processing data consumed from Kafka and storing results back to Kafka. 

API Design

The processor client would provide the following APIs:

Data Processing

A processor computes on a stream of messages, with each message composed as a key-value pair

Processor receives one message at a time and does not have access to the whole data set at once.

1. Per-message processing: this is the basic function that can be triggered once a new message has arrived from the stream.

2. Time-triggered processing: this function can be triggered whenever a specified time period has elapsed. It can be used for windowing computation, for example.

Chained Processing

Multiple processors should be able to chained up to form a DAG (i.e. the processor topology) for complex processing logic.

Local State Storage

Users can create state storage inside a processor that can be accessed locally.

For example, a processor may retain a (usually most recent) subset of data for a join, aggregation / non-monolithic operations.

 

The proposed processing interface is as the following:

public interface ProcessorContext {

    void send(String topic, Object key, Object value);  // send the key value-pair to a Kafka topic

    void schedule(long timestamp);                      // repeatedly schedule the punctuation function for the period

    void commit();                                      // commit the current state, along with the upstream offset and the downstream sent data

    String topic();                                     // return the Kafka record's topic of the current processing key-value pair

    int partition();                                    // return the Kafka record's partition id of the current processing key-value pair

    long offset();                                      // return the Kafka record's offset of the current processing key-value pair
}
 
public interface Processor<K1, V1, K2, V2>  {

    void init(ProcessorContext context);           // initialize the processor

    void process(K1 key, V1 value);                // process a key-value pair
 
    void forward(K2 key, V2 value);                // forward a key-value pair to the downstream chained processor

    void punctuate();                              // process when the the scheduled time has reached
}
 
public abstract class KafkaProcessor<K1, V2, K2, V2> implements Processor<K1, V1, K2, V2> {
 
    @Override
    public final forward(K2 key, V2 value) { 
        .... // implemented to trigger the downstream chained processor's process()
    }
 
    @Override
    public void punctuate() {
        // do nothing, can be overriden by users
    }

    @Override
    public void init(ProcessorContext context) {
        // do nothing, can be overriden by users
    }

    @Override
    public void close() {
        // do nothing, can be overriden by users
    }
}

 

And users can create their processing job with the created processor topology as the following:

public class StatefulProcessJob {

    private static class AggregateProcessor extends KafkaProcessor<String, Integer, String, Integer> {
        private ProcessorContext context;
        private KeyValueStore<String, Integer> kvStore;

        public MyProcessor(String name) {
            super(name);
        }

        @Override
        public void init(ProcessorContext context) {
            this.context = context;
            this.context.schedule(this, 1000);

            this.kvStore = new InMemoryKeyValueStore<>("local-state", context);
        }

        @Override
        public void process(String key, Integer value) {
            Integer oldValue = this.kvStore.get(key);
            if (oldValue == null) {
                this.kvStore.put(key, value);
            } else {
                int newValue = oldValue + value;
                this.kvStore.put(key, newValue);
            }

            context.commit();
        }

        @Override
        public void punctuate(long streamTime) {
            KeyValueIterator<String, Integer> iter = this.kvStore.all();
            while (iter.hasNext()) {
                Entry<String, Integer> entry = iter.next();
                forward(entry.key(), entry.value());
            }
            this.kvStore.clear();
        }

        @Override
        public void close() {
            this.kvStore.close();
        }
    }
 
    private static class FilterProcessor extends KafkaProcessor<String, Integer, String, Integer> {
        public MyProcessor(String name) {
            super(name);
        }

        @Override
        public void process(String key, Integer value) {
            if (value > 10)
                forward(key, value)
        }
    }

    // create the topology
    private static class MyPTopology extends ProcessorTopology {

        @Override
        public void build() {
            KafkaProcessor<String, Integer, String, Integer> source;
            KafkaProcessor<String, Integer, String, Integer> aggregate;
            KafkaProcessor<String, Integer, String, Integer> filter;
            source = addSource(new StringDeserializer(), new IntegerDeserializer(), "topic-source");
            aggregate = new AggregateProcessor("aggregate");
            filter = new FilterProcessor("filter");
            addProcessor(aggregate, source);
            addProcessor(filter, aggregate);
        }
    }

    public static void main(String[] args) throws Exception {
        KafkaProcess process = new KafkaProcess(MyPTopology.class, new ProcessorProperties(new Properties()));
        process.start();
    }
}

 

This example API demonstrates the abstraction of the low-level consumer / producer interfaces, such as consumer.poll() / commit(), producer.send(callback), producer.flush(), etc.

 

High-level Stream DSL

In addition to the processor API, we would also like to introduce a higher-level stream DSL for users that covers most common processor implementations.

public interface KStream<K, V> {

    /**
     * Creates a new stream consists of all elements of this stream which satisfy a predicate
     */
    KStream<K, V> filter(Predicate<K, V> predicate);

    /**
     * Creates a new stream by transforming key-value pairs by a mapper to all elements of this stream
     */
    <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, K1, V1> mapper);

    /**
     * Creates a new stream by transforming valuesa by a mapper to all values of this stream
     */
    <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);

    /**
     * Creates a new stream by applying a flat-mapper to all elements of this stream
     */
    <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, K1, ? extends Iterable<V1>> mapper);

    /**
     * Creates a new stream by applying a flat-mapper to all values of this stream
     */
    <V1> KStream<K, V1> flatMapValues(ValueMapper<V, ? extends Iterable<V1>> processor);

    /**
     * Creates a new windowed stream using a specified window instance.
     */
    KStreamWindowed<K, V> with(Window<K, V> window);

    /**
     * Creates an array of streams from this stream. Each stream in the array corresponds to a predicate in
     * supplied predicates in the same order. 
     */
    KStream<K, V>[] branch(Predicate<K, V>... predicates);

    /**
     * Sends key-value to a topic.
     */
    void sendTo(String topic);

    /**
     * Sends key-value to a topic, also creates a new stream from the topic.
     * This is mostly used for repartitioning and is equivalent to calling sendTo(topic) and from(topic).
     */
    KStream<K, V> through(String topic);

    /**
     * Processes all elements in this stream by applying a processor.
     */
    <K1, V1> KStream<K1, V1> process(KafkaProcessor<K, V, K1, V1> processor);
 
    // .. more operators
}

public interface KStreamWindowed<K, V> extends KStream<K, V> {

    /**
     * Creates a new stream by joining this windowed stream with the other windowed stream.
     * Each element arrived from either of the streams is joined with elements with the same key in another stream.
     * The resulting values are computed by applying a joiner.
     */
    <V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joiner);

    /**
     * Creates a new stream by joining this windowed stream with the other windowed stream.
     * Each element arrived from either of the streams is joined with elements with the same key in another stream
     * if the element from the other stream has an older timestamp.
     * The resulting values are computed by applying a joiner.
     */
    <V1, V2> KStream<K, V2> joinPrior(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joiner);
}

 

With this high-level interface, the user instantiated program can be simplified as (using lambda expression):

public class StatefulProcessJob {

    private static class MyKStreamTopology extends KStreamTopology {

        @Override
        public void build() {
            // create a source stream
            KStream<String, String> stream1 = from(new StringDeserializer(), new StringDeserializer(), "topic-source");

            // parse the value string to integer
            KStream<String, Integer> stream2 =
                stream1.map((String key, String value) -> return new KeyValue<>(key, new Integer(value)))
                       .filter((String key, Integer value) -> return value > 10));

            // branch two streams with odd / even values
            KStream<String, Integer>[] streams = stream2.branch(
                (String key, Integer value) -> return isOdd(),
                (String key, Integer value) -> return isEven());

            // send result stream to Kafka topics
            streams[0].sendTo("topic-odd");
            streams[1].sendTo("topic-even");
        }
    }

    public static void main(String[] args) throws Exception {
        KafkaProcess process = new KafkaProcess(MyKStreamTopology.class, new ProcessorProperties(new Properties()));
        process.start();
    }
}

 

Architecture Design

We summarize some key architecture design points in the following sub-sections.

Partition Distribution

Each instance of the KafkaProcess can contain multiple threads (#.threads configurable in the properties). And users can start multiple instances of their process job. So the first question is how can we distribute the partitions of the subscribed topics in the source processor among all the processes / threads.

There are a couple of common cases for partition management in KStream:

1. Co-partitioning: for windowed-joins.

 2. Sticky partitioning: for stateful processing, users may want to have a static mapping from stream partitions to process threads.

 3. N-way partitioning: when we have stand-by processor instances, users may want to assign a single stream partition to multiple process threads.

 

These use cases would require more flexible assignments than today's server-side strategies, so we need to extend the consumer coordinator protocol in the way that:

1. Consumers send JoinGroup with their subscribed topics, and receive the JoinGroup responses with the list of members in the group and the list of topic-partitions.

 2. All consumers will get the same lists, and they can execute the same deterministic partition assignment algorithm to get their assigned topic-partitions.

 

With this new assignment protocol (details of this change can be found here), we distribute the partitions among worker thread as the following:

1. Each worker thread in the process maintains a separate consumer and producer.

2. Upon startup, each thread's consumer executes the partition assignment algorithm and gets the allocated partitions.

 

Stream Time and Sync.

Time in the stream processing is very important. Windowing operations (join and aggregation) are defined by time. Since Kafka can replay stream, wall-clock based time (system time) may not make sense due to delayed messages / out-of-order messages. Hence we need to define a "time" for each stream according to its progress. We call it stream time.

Stream Time

Each message in a stream has to have a timestamp to perform window based operations and punctuations. Since Kafka message does not have timestamp in the message header, users can define a timestamp extractor based on message content that is used in the source processor when deserializing the messages. This extractor can be as simple as always returning the current system time (or wall-clock time), or it can be an Avro decoder that gets the timestamp field specified in the record schema. 

In addition, since Kafka supports multiple producers sending message to the same topic, brokers may receive messages in order that is not strictly following their timestamps (i.e. out-of-order messages). Therefore, we cannot simply define the "stream time" as the timestamp of the currently processed message in the stream hence that time can move back and forth.

 

We define define the "stream time" as a monotonically increasing value as the following:

1. For each assigned partition, the thread maintains a record queue for buffering the fetched records from the consumer.

2. Each message has an associated timestamp that is extracted from the timestamp extractor in the message content.

3. The partition's timestamp is defined as the lowest message timestamp value in its buffer.

    a. When the lowest timestamp corresponding record gets processed by the thread, the partition time possibly gets advanced.

    b. The partition time will NOT gets reset to a lower value even if a later message was put in a buffer with a even lower timestamp.

4. The stream time is defined as the lowest partition timestamp value across all its partitions in the process:

    a. Since partition times are monotonically increasing, stream times are also monotonically increasing.

5. Any newly created streams through the upstream processors inherits the stream time of the parents; for joins, the bigger parent's stream time is taken.

 

Stream Synchronization

When joining two streams, their progress need to be synchronized. If they are out of sync, a time window based join becomes faulty. Say a delay of one stream is negligible and a delay of the other stream is one day, doing join over 10 minutes window does not make sense. To handle this case, we define a stream group as a set of streams whose rate of consumptions need to be synchronized. Within a process instance, such stream groups are actually instantiated as partition groups from the assigned topic partitions. Each worker thread may have one or more partition groups.

 

Work thread synchronizes the consumption within each one of such groups through consumer's pause / resume APIs as following:

1. When one un-paused partition is a head of time (partition time defined as above) beyond some defined threshold with other partitions, notify the corresponding consumer to pause.

 2. When one paused partition is ahead of time below some defined with other partitions, notify the corresponding consumer to un-pause.

 

Users can instantiate a "grouping function" that maps the assigned partitions from the consumer into partition groups; they act as the stream groups in which the stream synchronization mechanism above is applied. The default grouping function maps partitions with the same id across topics to into a group (i.e. co-partitioning). Two streams that are joined together have to be in the same stream group.

 

Local State Management

Users can create one or more state stores during their processing logic, and a store instance will be created for each of their specified partition groups. Since a single store instance will not be shared across multiple partition groups, and each partition group will only be processed by a single thread, this guarantees any store will not be accessed concurrently by multiple thread at any given time.

 

Log-backed State Storage

Each state store will be backed up by a different Kafka change log topic, such that:

#.partitions of the change log == #. state store instances == #. partition groups

For example, if a processor instance consumes from upstream Kafka topic "topic-A" with 4 partitions, and creates two stores, namely store1 and store2, and user groups the 8 partitions into {topic-A-p1, topic-A-p2} and {topic-A-p3, topic-A-p4}; then two change log topics, for example "topic-store1-changelog" and "topic-store2-changelog", need to be created beforehand, each with two partitions.

 

After processor writes to a store instance, it first sends the change message to its corresponding changelog topic partition. When user calls commit() in his processor, KStream needs to flush both the store instance as well as the producer sending to the changelog, as well as committing the offset in the upstream Kafka. If these three operations cannot be done atomically, then if there is a crash in between this operations duplicates could be generated since the upstream Kafka committing offset is executed in the last step; if there three operations can be done atomically, then we can guarantee "exactly-once" semantics.

 

Persisting and Restoring State

When we close a KStream instance, the following steps are executed:

1. Flush all store's state as mentioned above.

2. Write the change log offsets for all stores into a local offset checkpoint file. The existence of the offset checkpoint file indicates if the instance was cleanly shutdown.

 

Upon (re-)starting the KStream instance:

1. Try to read the local offset checkpoint file into memory, and delete the file afterwards.

2. Check the offset of the corresponding change log partition read from the checkpoint file.

    a. If the offset is read successfully, load the previously flushed state and replay the change log from the read offset up to the log-end-offset.

    b. Otherwise, do not load the previously flushed state and replay the change log from the beginning up to the log-end-offset.

 

Workflow Summary

This section summarized the KafkaProcess workflow following the above architecture design.

Startup

Upon user calling KafkaProcess.start(), the process instance creates the worker threads given user specified #.threads. In each worker thread:

1. Trigger Topology.build() to retrieve the subscription topic names.

2. Construct the consumer, subscribe to the topics and gets the assigned partitions.

3. Trigger the grouping function with the assigned partitions get the returned list of partition-groups with associated ids.

4. Initialize each partition group by:

    a. Creates a record queue for buffering the fetched records for each partition.

    b. Initialize the topology with the constructed processor context, in which users may create the local state.

    c. Perform validations on changelog topics if local state gets created and restored following the above steps.

5. Each thread then runs their loop at their own pace, there is no synchronization between these threads. In each iteration of the loop:

     a. Thread checks if the record queues are empty / low, and if yes calls consumer.poll(timeout) / consumer.poll(0) to re-fill the buffer.

     b. Choose one record from the queues and process it through the processor topology.

     c. Check if user calls commit() during the processing of this records; if yes commit the offset / flush the local state / flush the producer.

 

Packaging Design

It would be best to package Processor / KStream as a separate jar, since it introduces extra external dependencies, such as RocksDB, etc. Under this model:

1. We will let users to create their own MyKStream.java class that depends on the kafka-stream.jar.

2. We will let users to write their own Main function as the entry point for starting their process instance.

 

Current class / package names can be found in [LINK]. A general summary:

1. All classes are defined in the "stream" folder.

2. Low-level Processor interface is under the "o.a.k.clients.processor" package; high-level KStream interface is under the "o.a.k.stream" package.

3. Important user-facing classes include:

KafkaProcessor: implements Processor, Receiver, Punctuator; used for computation logic.
 
ProcessorContext: passed in KafkaProcessor.init(); provides schedule / send / commit / etc functions, and topic / partition / offset / etc source record metadata.

StateStore: can be created inside KafkaProcessor.init() for storing local state.
PTopology: requires users to implement the build() function, in which addProcessor / addSource can be used to construct the DAG.
 
KStreamTopology: extends PTopology, and in its build() function high-level operators like map / filter / branch / etc can be used.

KStreamProcess: used in main function to take provided Topology class and configs to start the instance.

Some example classes can be found in o.a.k.stream.examples.

4. Important internal classes include:

Ingestor: the wrapped consumer instance for fetching data / managing offsets.
 
KStreamThread: multi-threaded KStreamProcess will create #.KStreamThread specified in configs, each maintaining its own Ingestor.
 
StreamGroup: the unit of processing tasks that are assigned to KStreamThread within the KStreamProcess instance.
 
KStreamFilter/Map/Branch/...: implementations of high-level KStream topology builder operators.

 

 

Shutdown

Upon user calling KafkaProcess.shutdown(), the following steps are executed:

1. Commit / flush each partition-group's current processing state as described in the local state management section.

2. Close the embedded producer and consumer clients.

 

Compatibility, Deprecation, and Migration Plan

This KIP only proposes additions. There should be no compatibility issues.


  • No labels