Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateAdoptedDiscussion

Discussion threadhere, herehere

...

2. Time-triggered processing: this function can be triggered whenever a specified time period has elapsed. It can be used for windowing computation, for example.

...

Compossible Processing

Multiple processors should be able to chained up to form a DAG (i.e. the processor topology) for complex processing logic.

Users can define such processor topology in a exploring REPL manner: make an initial topology, deploy and run, check the results and intermediate values, and pause the job and edit the topology on-the-fly.

Local State Storage

Users can create state storage inside a processor that can be accessed locally.

...

Code Block
languagejava
public interface ProcessorContext {

    void send(String topic, Object key, Object value);  // send the key value-pair to a Kafka topic

    void schedule(long timestamp);                      // repeatedly schedule the punctuation function for the period

    void commit();                                      // commit the current state, along with the upstream offset and the downstream sent data

    String topic();                                     // return the Kafka record's topic of the current processing key-value pair

    int partition();                                    // return the Kafka record's partition id of the current processing key-value pair

    long offset();                                      // return the Kafka record's offset of the current processing key-value pair
}
 
public interface Processor<K1Processor<K, V1, K2,V> V2>  {

    void init(ProcessorContext context);           // initialize the processor

    void process(K1 key, V1 value);                // process a key-value pair
 
    void forwardpunctuate(K2); key, V2 value);                // forward a key-value pair to the downstream chained processor

  // process void punctuatewhen the the scheduled time has reached
 
    void close();                              // process when the the// scheduledclose timethe has reachedprocessor
}
 
public abstractinterface class KafkaProcessor<K1, V2, K2, V2> implements Processor<K1, V1, K2, V2> {
 
ProcessorDef {

    Processor instance();        @Override
    public final forward(K2 key, V2 value)         .... // implementedcreate toa triggernew theinstance downstreamof chainedthe processor's process()
     from its definition
}
 
public class TopologyBuilder  @Override{
 
    public voidfinal TopologyBuilder punctuate(addSource(String name, String... topics) {
 ... }      // do nothing, can be overriden by users
    }

    @Override
    public// voidadd init(ProcessorContext context) {
        // do nothing, can be overriden by users
    }

    @Override
    public void close() {
        // do nothing, can be overriden by users
    }
}

 

And users can create their processing job with the created processor topology as the following:

a source node to the topology which generates incoming traffic with the specified Kafka topics
 
    public final TopologyBuilder addSink(String name, String topic, String... parentNames) { ... }        // add a sink node to the topology with the specified parent nodes that sends out-going traffic to the specified Kafka topics
 
    public final TopologyBuilder addProcessor(String name, ProcessorDef definition, String... parentNames) { ... }   // add a processor node to the topology with the specified parent nodes
}

 

And users can create their processing job with the created processor topology as the following:

Code Block
languagejava
public class ProcessorJob {

    private static class MyProcessorDef implements ProcessorDef {
Code Block
languagejava
public class StatefulProcessJob {

    private static class AggregateProcessor extends KafkaProcessor<String, Integer, String, Integer> {
        private ProcessorContext context;
        private KeyValueStore<String, Integer> kvStore;

        public MyProcessor(String name) {
        @Override
    super(name);
    public Processor<String, Integer> instance() }{

        @Override
    return new   public void init(ProcessorContext contextProcessor<String, Integer>() {
            this.context = context;
  private ProcessorContext context;
        this.context.schedule(this, 1000);

       private KeyValueStore<String, Integer> kvStore;

   this.kvStore  = new InMemoryKeyValueStore<>("local-state", context);
        }
@Override
        @Override
        public void processinit(String key, Integer valueProcessorContext context) {
            Integer oldValue = this.kvStore.get(key);
     this.context = context;
     if (oldValue == null) {
                this.kvStorecontext.putschedule(keythis, value1000);
            } else {
      this.kvStore = new InMemoryKeyValueStore<>("local-state", context);
       int newValue = oldValue + value;
    }

            this.kvStore.put(key, newValue);
   @Override
         }

       public void process(String key, Integer context.commit();value) {
        }

        @Override
    Integer oldValue   public void punctuate(long streamTime) {= this.kvStore.get(key);
            KeyValueIterator<String, Integer> iter = this.kvStore.all();
    if (oldValue == null) {
    while (iter.hasNext()) {
                Entry<String, Integer> entry = iterthis.kvStore.nextput(key, value);
                forward(entry.key(), entry.value());    } else {
            }
            this.kvStore.clear();
        }

int newValue = oldValue + value;
        @Override
        public void close() {
            this.kvStore.closeput(key, newValue);
        }
    }
 
    private static class FilterProcessor extends}

 KafkaProcessor<String, Integer, String, Integer> {
        public MyProcessor(String name) {
    context.commit();
        super(name);
        }

        @Override
        public@Override
 void process(String key, Integer value) {
          public void if punctuate(valuelong >streamTime) 10){
                forward(key, value)
   KeyValueIterator<String, Integer> iter =  }
this.kvStore.all();

     }

    // create the topology
    private static class MyPTopology extends ProcessorTopologywhile (iter.hasNext()) {

        @Override
        public void build() {
      Entry<String, Integer> entry    KafkaProcessor<String, Integer, String, Integer> source;
= iter.next();

            KafkaProcessor<String, Integer, String, Integer> aggregate;
        System.out.println("[" + entry.key() + KafkaProcessor<String", " Integer, String, Integer> filter;
+ entry.value() + "]");

            source = addSource(new StringDeserializer(), new IntegerDeserializer(), "topic-source");
      context.forward(entry.key(), entry.value());
     aggregate = new AggregateProcessor("aggregate");
            filter}
 = new FilterProcessor("filter");
            addProcessor(aggregate, source);
 }

              addProcessor(filter, aggregate);  @Override
        }
    }

    public static void mainclose(String[] args) throws Exception {
        KafkaProcess process = new KafkaProcess(MyPTopology.class, new ProcessorProperties(new Properties())       this.kvStore.close();
        process.start();
        }
            };
        }
}

 

This example API demonstrates the abstraction of the low-level consumer / producer interfaces, such as consumer.poll() / commit(), producer.send(callback), producer.flush(), etc.

 

High-level Stream DSL

In addition to the processor API, we would also like to introduce a higher-level stream DSL for users that covers most common processor implementations.

    }

    public static void main(String[] args) throws Exception {
        StreamingConfig config = new StreamingConfig(new Properties());
 
        // build topology
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("SOURCE", "topic-source");
               .addProcessor("PROCESS", new MyProcessorDef(), "SOURCE");
               .addSink("SINK", "topic-sink", "PROCESS");
 
        // start process
        KafkaStreaming streaming = new KafkaStreaming(builder, config);
        streaming.start();
    }
}

 

This example API demonstrates the abstraction of the low-level consumer / producer interfaces, such as consumer.poll() / commit(), producer.send(callback), producer.flush(), etc.

 

High-level Stream DSL

In addition to the processor API, we would also like to introduce a higher-level stream DSL for users that covers most common processor implementations.

Code Block
languagejava
public interface KStream<K, V> {
Code Block
languagejava
public interface KStream<K, V> {

    /**
     * Creates a new stream consists of all elements of this stream which satisfy a predicate
     */
    KStream<K, V> filter(Predicate<K, V> predicate);

    /**
     * Creates a new stream by transforming key-value pairs by a mapper to all elements of this stream
     */
    <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, K1, V1> mapper);

    /**
     * Creates a new stream by transforming valuesa by a mapper to all values of this stream
     */
    <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);

    /**
     * Creates a new stream by applying a flat-mapper to all elements of this stream
     */
    <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, K1, ? extends Iterable<V1>> mapper);

    /**
     * Creates a new stream by applying a flat-mapper toconsists of all valueselements of this stream which satisfy a predicate
     */
    <V1> KStream<K, V1>V> flatMapValuesfilter(ValueMapper<VPredicate<K, ? extends Iterable<V1>> processorV> predicate);

    /**
     * Creates a new windowed stream using a specified window instance. stream by transforming key-value pairs by a mapper to all elements of this stream
     */
    KStreamWindowed<K<K1, V1> KStream<K1, V>V1> with(Window<Kmap(KeyValueMapper<K, V, K1, V>V1> windowmapper);

    /**
     * Creates an array of streams from thisa new stream. Eachby streamtransforming invaluesa theby arraya correspondsmapper to aall predicatevalues in
of     * supplied predicates in the same order. this stream
     */
    <V1> KStream<K, V>[]V1> branchmapValues(Predicate<KValueMapper<V, V>... predicatesV1> mapper);

    /**
     * Sends key-value to Creates a topic.
new stream by applying  */
    void sendTo(String topic);

a flat-mapper to all elements of this stream
     */**
    <K1, *V1> Sends key-value to a topic, also creates a new stream from the topic.KStream<K1, V1> flatMap(KeyValueMapper<K, V, K1, ? extends Iterable<V1>> mapper);

    /**
     * ThisCreates isa mostlynew usedstream forby repartitioningapplying anda isflat-mapper equivalentto toall calling sendTo(topic) and from(topic).values of this stream
     */
    <V1> KStream<K, V>V1> through(String topicflatMapValues(ValueMapper<V, ? extends Iterable<V1>> processor);

    /**
     * ProcessesCreates alla elementsnew inwindowed thisstream streamusing bya applyingspecified awindow processorinstance.
     */
    <K1KStreamWindowed<K, V1> KStream<K1, V1> process(KafkaProcessor<K, V, K1, V1> processorV> with(Window<K, V> window);
 
    //**
 .. more operators
}

public interface KStreamWindowed<K,* V>Creates extendsan KStream<K,array V>of {

streams from this  /**
     * Creates a new stream by joining this windowed stream with the other windowed stream.stream. Each stream in the array corresponds to a predicate in
     * supplied predicates in the same order. 
     */
 Each element arrived from either of the streams is joined with elements with the same key in another stream.
     * The resulting values are computed by applying a joinerKStream<K, V>[] branch(Predicate<K, V>... predicates);

    /**
     * Sends key-value to a topic.
     */
    <V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joinervoid sendTo(String topic);

    /**
     * Creates Sends key-value to a newtopic, streamalso bycreates joininga this windowednew stream withfrom the other windowed streamtopic.
     * EachThis elementis arrivedmostly fromused eitherfor ofrepartitioning theand streams is joinedequivalent withto elements with the same key in another streamcalling sendTo(topic) and from(topic).
     */
 if the element fromKStream<K, theV> other stream has an older timestamp.through(String topic);

    /**
     * TheProcesses all resultingelements valuesin arethis computedstream by applying a joinerprocessor.
     */
    <V1<K1, V2>V1> KStream<KKStream<K1, V2>V1> joinPriorprocess(KStreamWindowed<KKafkaProcessor<K, V1> otherV, ValueJoiner<VK1, V1, V2> joinerV1> processor);
}

 

With this high-level interface, the user instantiated program can be simplified as (using lambda expression):

Code Block
languagejava
public class StatefulProcessJob {

    private static class MyKStreamTopology extends KStreamTopology 
    // .. more operators
}

public interface KStreamWindowed<K, V> extends KStream<K, V> {

    /**
    @Override
 * Creates a new stream by joining publicthis voidwindowed build() {
    stream with the other windowed stream.
     * Each element //arrived createfrom aeither sourceof stream
the streams is joined with elements with the same key in another KStream<String,stream.
 String> stream1 = from(new StringDeserializer(), new StringDeserializer(), "topic-source");

       * The resulting values are computed by applying a joiner.
      *//
 parse the value string<V1, toV2> integer
KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joiner);

    KStream<String, Integer> stream2 =
/**
     * Creates a new stream by joining this windowed stream with the other windowed stream1.map((String key, String value) -> return new KeyValue<>(key, new Integer(value)))
               stream.
     * Each element arrived from either of the streams is joined with elements with the same key in another stream
     * if the .filter((String key, Integer value) -> return value > 10));

element from the other stream has an older timestamp.
     * The resulting values are computed by applying // branch two streams with odd / even valuesa joiner.
     */
    <V1, V2> KStream<K, V2> joinPrior(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joiner);
}

 

With this high-level interface, the user instantiated program can be simplified as (using lambda expression in Java 0.8):

Code Block
languagejava
public class KStreamJob {
KStream<String, Integer>[] streams = stream2.branch(
    public static void main(String[] args) throws Exception {
     (String  key, IntegerStreamingConfig value)config ->= returnnew isOddStreamingConfig(props),;

        // build the topology
      (String key, IntegerKStreamBuilder value)builder ->= returnnew isEvenKStreamBuilder());

        KStream<String, String> stream1  // send result stream to Kafka topics
            streams[0].sendTo("topic-odd= builder.from("topic1");

        KStream<String, Integer>   streams[1].sendTo("topic-even");stream2 =
        }
    }

    public static void main(String[] args) throws Exception {
stream1.map((key, value) -> new KeyValue<>(key, new Integer(value)))
               KafkaProcess process = new KafkaProcess(MyKStreamTopology.class.filter(((key, new ProcessorProperties(new Properties(value) -> true)));

        KStream<String, Integer>[] streams  process.start();
    }
}

 

Architecture Design

We summarize some key architecture design points in the following sub-sections.

Partition Distribution

Each instance of the KafkaProcess can contain multiple threads (#.threads configurable in the properties). And users can start multiple instances of their process job. So the first question is how can we distribute the partitions of the subscribed topics in the source processor among all the processes / threads.

There are a couple of common cases for partition management in KStream:

1. Co-partitioning: for windowed-joins.

 2. Sticky partitioning: for stateful processing, users may want to have a static mapping from stream partitions to process threads.

 3. N-way partitioning: when we have stand-by processor instances, users may want to assign a single stream partition to multiple process threads.

 

These use cases would require more flexible assignments than today's server-side strategies, so we need to extend the consumer coordinator protocol in the way that:

1. Consumers send JoinGroup with their subscribed topics, and receive the JoinGroup responses with the list of members in the group and the list of topic-partitions.

 2. All consumers will get the same lists, and they can execute the same deterministic partition assignment algorithm to get their assigned topic-partitions.

 

With this new assignment protocol (details of this change can be found here), we distribute the partitions among worker thread as the following:

1. Each worker thread in the process maintains a separate consumer and producer.

= stream2
            .branch((key, value) -> value > 10,
                    (key, value) -> value <= 10);
 
        streams[0].sendTo("topic2");
        streams[1].sendTo("topic3");

        // start the process
        KafkaStreaming kstream = new KafkaStreaming(builder, config);
        kstream.start();
    }
}

 

Architecture Design

We summarize some key architecture design points in the following sub-sections.

 

Image Added

Partition Distribution

As shown in the digram above, each KStream process could have multiple threads (#.threads configurable in the properties), with each thread having a separate consumer and producer. So the first question is how can we distribute the partitions of the subscribed topics in the source processor among all the processes / threads.

There are a couple of common cases for partition management in KStream:

1. Co-partitioning: for windowed-joins.

 2. Sticky partitioning: for stateful processing, users may want to have a static mapping from stream partitions to process threads.

 3. N-way partitioning: when we have stand-by processor instances, users may want to assign a single stream partition to multiple process threads.

 

These use cases would require more flexible assignments than today's server-side strategies, so we need to extend the consumer coordinator protocol in the way that:

1. Consumers send JoinGroup with their subscribed topics, and receive the JoinGroup responses with the list of members in the group and the list of topic-partitions.

 2. All consumers will get the same lists, and they can execute the same deterministic partition assignment algorithm to get their assigned topic-partitions.

 

With this new assignment protocol (details of this change can be found here), we distribute the partitions among worker thread as the following:

0. Upon starting the KStream process, user-specified number of KStream threads will be created. There is no shared variables between threads and no synchronization barriers as well hence these threads will execute completely asynchronously. Hence we will describe the behavior of a s single thread in all the following steps.

 

1. Thread constructs the user-specified processor topology without initializing it just in order to extract the list of subscribed topics from the topology. 

2. Thread uses its consumer's partitionsFor() to fetch the metadata for each of the subscribed topics to get a information of topic -> #.partitions. 

3. Thread now triggers consumer's subscribe() with the subscribed topics, which will then applies the new rebalance protocol. The join-group request will be instantiated as follows (for example):

Code Block
JoinGroupRequest => 
  GroupId                 => "KStream-[JobName]"
  GroupType               => "KStream"

 

And the assignor interface is implemented as follows:

Code Block
List<TopicPartition> assign(String consumerId,
                            Map<String, Integer> partitionsPerTopic, 
                            List<ConsumerMetadata<T>> consumers) {
 
   // 1. trigger user-customizable grouping function to group the partitions into groups.
   // 2. assign partitions to consumers at the granularity of partition-groups.
   // 3*. persist the assignment result using commit-offset to Kafka.
}

 

The interface of the grouping function is the following, it is very similar to the assign() interface above, with the only difference that it does not have the consumer-lists.

Code Block
interface PartitionGrouper {
 
  /**
   * Group partitions into partition groups
   */
  List<Set<TopicPartition>> group(Map<String, Integer> partitionsPerTopic)
}

 

So after the rebalance completes, each partition-group will be assigned as a whole to the consumers, i.e. no partitions belonging to the same group will be assigned to different consumers. The default grouping function maps partitions with the same id across topics to into a group (i.e. co-partitioning). 

 

4. Upon getting the partition-groups, thread creates one task for each partition-group. And for each task, constructs the processor topology AND initializes the topology with the task context.

    a. Initialization process will trigger Processor.init() on each processor in the topology following topology's DAG order.

    b. All user-specified local states will also be created during the initialization process (we will talk about this later in the later sections).

    c. Creates the record queue for each one of the task's associated partition-group's partitions, so that when consumers fetches new messages, it will put them into the corresponding queue.

 

Hence all tasks' topologies have the same "skeleton" but different processor / state instantiated objects; in addition, partitions are also synchronized at the tasks basis (we will talk about partition-group synchronization in the next section).

 

5. When rebalance is triggered, the consumers will read its last persisted partition assignment from Kafka and checks if the following are true when comparing with the new assignment result:

    a. Existing partitions are still assigned to the same partition-groups.

    b. New partitions are assigned to the existing partition-groups instead of creating new groups.

 

For a), since the partition-group's associated task-id is used as the corresponding change-log partition id, if a partition gets migrated from one group to another during the rebalance, its state will be no longer valid; for b) since we cannot (yet) dynamically change the #.partitions from the consumer APIs, dynamically adding more partition-groups (hence tasks) will cause change log partitions possibly not exist yet.

 

[NOTE] there are some more advanced partitioning setting such as sticky-partitioning / consistent hashing that we want to support in the future, which may then require additionally:

    c. Partition groups are assigned to the specific consumers instead of randomly / round-robin2. Upon startup, each thread's consumer executes the partition assignment algorithm and gets the allocated partitions.

 

Stream Time and Sync.

Time in the stream processing is very important. Windowing operations (join and aggregation) are defined by time. Since Kafka can replay stream, wall-clock based time (system time) may not make sense due to delayed messages / out-of-order messages. Hence we need to define a "time" for each stream according to its progress. We call it stream time.

Stream Time

defined by time. Since Kafka can replay stream, wall-clock based time (system time) may not make sense due to delayed messages / out-of-order messages. Hence we need to define a "time" for each stream according to its progress. We call it stream time.

Stream Time

A stream is defined to abstract all the partitions of the same topic within a task, and its name is the same as the topic name. For example if a task's assigned partitions are {Topic1-P1, Topic1-P2, Topic1-P3, Topic2-P1, Topic2-P2}, then we treat this task as having two streams: "Topic1" and "Topic2" where "Topic1" represents three partitions P1 P2 and P3 of Topic1, and stream "Topic2" represents two partitions P1 and P2 of Topic2.

Each message in a stream has to have a timestamp to perform window based operations and punctuations. Since Kafka message does not have timestamp in the message header, users can define a timestamp extractor based on message content that is used in the source processor when deserializing the messages. This extractor can be as simple as always returning the current system time (or wall-clock time), or it can be an Avro decoder that gets the timestamp field specified in the record schema. 

...

2. Each message has an associated timestamp that is extracted from the timestamp extractor in the message content.

3. The partition 's timestamp time is defined as the lowest message timestamp value in its buffer.

...

4. The stream time is defined as the lowest partition timestamp value across all its partitions in the processtask:

    a. Since partition times are monotonically increasing, stream times are also monotonically increasing.

...

When joining two streams, their progress need to be synchronized. If they are out of sync, a time window based join becomes faulty. Say a delay of one stream is negligible and a delay of the other stream is one day, doing join over 10 minutes window does not make sense. To handle this case, we define a stream group as a set of streams whose rate of consumptions need to be synchronized. Within a process instance, such stream groups are actually instantiated as partition groups from the assigned topic partitions. Each worker need to make sure that the consumption rates of all partitions within each task's assigned partition-group are "synchronized". Note that each thread may have one or more tasks, but it does not need to synchronize the partitions across tasks' partition-groups.

 

Work thread synchronizes the consumption within each one of such groups through consumer's pause / resume APIs as following:

1. When one un-paused partition is a head of time (partition time defined as above) beyond some defined threshold with other partitions, notify the corresponding consumer to pause. 2. When one paused partition is ahead of time below some defined with other partitions, notify the corresponding consumer to un-pause.

 

to pause.

 2. When one paused partition is ahead of time below some defined with other partitions, notify the corresponding consumer to un-pause.

 

Two Users can instantiate a "grouping function" that maps the assigned partitions from the consumer into partition groups; they act as the stream groups in which the stream synchronization mechanism above is applied. The default grouping function maps partitions with the same id across topics to into a group (i.e. co-partitioning). Two streams that are joined together have to be in the same stream grouptask, and their represented partition lists have to match each other. That is, for example, a stream representing P1, P2 and P3 can be joined with another stream also representing P1, P2 and P3.

 

Local State Management

Users can create one or more state stores during their processing logic, and each task will have a state manager that keeps an instance of each specified store inside the task.

a store instance will be created for each of their specified partition groups. Since a single store instance will not be shared across multiple partition groups, and each partition group will only be processed by a single thread, this guarantees any store will not be accessed concurrently by multiple thread at any given time.

...

Each state store will be backed up by a different Kafka change log topic, and each instance of the store correlates to one partition of the topic, such that:

Code Block
#.partitions tasks of== the#. changepartition loggroups == #. store instances for each state store instances == #. partition groupspartitions of the change log for each state store


For example, if a processor instance consumes from upstream Kafka topic "topic-A" with 4 partitions, and creates two stores, namely store1 and store2, and user groups the 8 4 partitions into {topic-A-p1, topic-A-p2} and {topic-A-p3, topic-A-p4}; then two change log topics, for example namely "topic-store1-changelog" and "topic-store2-changelog", need to be created beforehand, each with two partitions.

...

    b. Otherwise, do not load the previously flushed state and replay the change log from the beginning up to the log-end-offset.

 

Workflow Summary

This section summarized the KafkaProcess workflow We summarize the workflow of a kafka streaming process following the above architecture design.

Startup

Upon user calling KafkaProcesscalling KafkaStreaming.start(), the process instance creates the worker threads given user specified #.threads. In each worker thread:

1. Trigger Topology.build() to retrieve  Construct the producer and consumer client, extract the subscription topic names from the topology.

2. Construct Let the consumer , to subscribe to the topics and gets the assigned partitions.

3. Trigger the grouping function with the assigned partitions get the returned list of partition-groups (hence tasks) with associated ids.

4. Initialize each partition group task by:

    a. Creates a record queue for buffering the fetched records for each partition.

    b. Initialize the topology with the constructed processor context, in which users may create the local statea topology instance for the task from the builder with a newly created processor context.

    c. Perform validations on changelog topics if local state gets created and restored following the above stepsInitialize the state manager of the task and constructs / resumes user defined local states.

5. Each thread then runs their Runs the loop at their its own pace , until notified to be shutdown: there is no synchronization between these threads. In each iteration of the loop:

     a. Thread checks if the record queues are empty / low, and if yes calls consumer.poll(timeout) / consumer.poll(0) to re-fill the buffer.

     b. Choose one record from the queues and process it through the processor topology.

     c. Check if user calls commit() during the processing of this records; if yes commit the offset / flush the local state / flush the producer.

 

Shutdown

Upon user calling KafkaProcess.shutdown(), the following steps are executed:

1. Commit / flush each partition-group's current processing state as described in the local state management section.

2. Close the embedded producer and consumer clients.

 

Packaging Design

It would be best to package Processor / KStream as a separate jar, since it introduces extra external dependencies, such as RocksDB, etc. Under this model:

1. We will let users to create their own MyKStream.java class that depends on the kafka-stream.jar.

2. We will let users to write their own Main function as the entry point for starting their process instance.

 

Current class / package names can be found in this PR. A general summary:

1. All classes are defined in the "stream" folder.

2. Low-level Processor interface is under the "o.a.k.clients.processor" package; high-level KStream interface is under the "o.a.k.stream" package.

3. Important user-facing classes include:

Code Block
KafkaProcessor: implements Processor, Receiver, Punctuator; used for computation logic.
 
ProcessorContext: passed in KafkaProcessor.init(); provides schedule / send / commit / etc functions, and topic / partition / offset / etc source record metadata.

StateStore: can be created inside KafkaProcessor.init() for storing local state.
PTopology: requires users to implement the build() function, in which addProcessor / addSource can be used to construct the DAG.
 
KStreamTopology: extends PTopology, and in its build() function high-level operators like map / filter / branch / etc can be used.

KStreamProcess: used in main function to take provided Topology class and configs to start the instance.

(timeout) / consumer.poll(0) to re-fill the buffer.

     b. Choose one record from the queues and process it through the processor topology.

     c. Check if some of the processors' punctuate functions need to be triggered, and if yes, execute the function.

     d. Check if user calls commit() during the processing of this records; if yes commit the offset / flush the local state / flush the producer.

 

Shutdown

Upon user calling KafkaStreaming.shutdown(), the following steps are executed:

1. Commit / flush each partition-group's current processing state as described in the local state management section.

2. Close the embedded producer and consumer clients.

 

Packaging Design

It would be best to package Processor / KStream as a separate jar, since it introduces extra external dependencies, such as RocksDB, etc. Under this model:

1. We will let users to create their own MyKStream.java class that depends on the kafka-stream.jar.

2. We will let users to write their own Main function as the entry point for starting their process instance.

 

Current class / package names can be found in this PR. A general summary:

1. All classes are defined in the "streams" folder.

2. Low-level Processor interface is under the "o.a.k.streams.processor" package; high-level KStream interface is under the "o.a.k.streams.kstream" package.

3. Some example classes can be found in o.a.k.streamstreams.examples.

 

4. Important internal classes include:

Code Block
IngestorPartitionGroup: a set of partitions along with their queuing buffers and timestamp extraction logic.
 
ProcessorStateManager: the manager of the local states within a task.
 
ProcessorTopology: the wrappedinstance consumerof instancethe fortopology fetchinggenerated databy /the managing offsetsTopologyBuilder.
 
KStreamThreadStreamTask: multi-threaded KStreamProcess will create #.KStreamThread specified in configs, each maintaining its own Ingestorthe task of the processing tasks unit, which include a ProcessorTopology, a ProcessorStateManager and a PartitionGroup.
 
StreamGroupStreamThread: thecontains unitmultiple ofStreamTasks, processinga tasksConsumer thatand area assigned to KStreamThread within the KStreamProcess instanceProducer client.
 
KStreamFilter/Map/Branch/...: implementations of high-level KStream topology builder operators.

...