Table of Contents |
---|
Status
Current state: Adopted
Discussion thread: here, here, here
JIRA: TBD
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
A common use case for Kafka is real-time processes that transform data from input topics to output topics. Today there are a couple of options available for users to process such data:
...
2. Time-triggered processing: this function can be triggered whenever a specified time period has elapsed. It can be used for windowing computation, for example.
...
Compossible Processing
Multiple processors should be able to chained up to form a DAG (i.e. the processor topology) for complex processing logic.
...
Users can create state storage inside a processor that can be accessed locally.define such processor topology in a exploring REPL manner: make an initial topology, deploy and run, check the results and intermediate values, and pause the job and edit the topology on-the-fly.
Local State Storage
Users can create state storage inside a processor that can be accessed locally.
For example, a processor may retain a (usually most recent)For example, a processor may retain a (usually most recent) subset of data for a join, aggregation / non-monolithic operations.
...
Code Block | ||
---|---|---|
| ||
public interface ProcessorContext { void send(String topic, Object key, Object value); // send the key value-pair to a Kafka topic void schedule(long timestamp); // repeatedly schedule the punctuation function for the period void commit(); // commit the current state, along with the upstream offset and the downstream sent data String topic(); // return the Kafka record's topic of the current processing key-value pair int partition(); // return the Kafka record's partition id of the current processing key-value pair long offset(); // return the Kafka record's offset of the current processing key-value pair } public interface Processor<K1Processor<K, V1,V> K2, V2> { void init(ProcessorContext context); // initialize the processor void process(K1 key, V1 value); // process a key-value pair void forwardpunctuate(K2); key, V2 value); // forward a key-value pair to the downstream chained processor // process when the the scheduled time has reached void punctuateclose(); // process when the the// scheduledclose timethe has reachedprocessor } public abstractinterface class KafkaProcessor<K1, V2, K2, V2> implements Processor<K1, V1, K2, V2> { ProcessorDef { Processor instance(); @Override public final forward(K2 key, V2 value) { .... // implemented to trigger// create a new instance of the downstreamprocessor chainedfrom processor's process() }its definition } public class TopologyBuilder { @Override public final TopologyBuilder addSource(String public void punctuate(name, String... topics) { ... } // do nothing, can be overriden by users } @Override public// voidadd init(ProcessorContext context) { // do nothing, can be overriden by usersa source node to the topology which generates incoming traffic with the specified Kafka topics } public final TopologyBuilder @Override public void close() { addSink(String name, String topic, String... parentNames) { ... } // do nothing, can be overriden by users } } |
And users can create their processing job with the created processor topology as the following:
Code Block | ||
---|---|---|
| ||
public class StatefulProcessJob { add a sink node to the topology with the specified parent nodes that sends out-going traffic to the specified Kafka topics privatepublic staticfinal classTopologyBuilder AggregateProcessor extends KafkaProcessor<String, IntegeraddProcessor(String name, ProcessorDef definition, String, Integer>... parentNames) { ... } // add a privateprocessor ProcessorContextnode context; to the topology with the specified parent nodes } |
And users can create their processing job with the created processor topology as the following:
Code Block | ||
---|---|---|
| ||
public class ProcessorJob { private KeyValueStore<String, Integer> kvStore; private static class MyProcessorDef public MyProcessor(String name)implements ProcessorDef { @Override super(name); public Processor<String, Integer> instance() }{ @Override return new public void init(ProcessorContext contextProcessor<String, Integer>() { this.context = private ProcessorContext context; this.context.schedule(this, 1000); private KeyValueStore<String, Integer> kvStore; this.kvStore = new InMemoryKeyValueStore<>("local-state", context); } @Override @Override public void processinit(String key, Integer valueProcessorContext context) { Integer oldValue = this.kvStore.get(key)context = context; if (oldValue == null) { this.context.schedule(this, 1000); this.kvStore.put(key, value = new InMemoryKeyValueStore<>("local-state", context); } else { } int newValue =@Override oldValue + value; public void this.kvStore.put(process(String key, newValue);Integer value) { } Integer oldValue = context.commit(this.kvStore.get(key); } @Override if (oldValue == null) { public void punctuate(long streamTime) { KeyValueIterator<String, Integer> iter = this.kvStore.allput(key, value); while (iter.hasNext()) { } else { Entry<String, Integer> entry = iter.next(); int newValue = forward(entry.key(), entry.value())oldValue + value; } this.kvStore.clearput(key, newValue); } @Override } public void close() { thiscontext.kvStore.closecommit(); } } private static class FilterProcessor extends KafkaProcessor<String, Integer, String, Integer> { @Override public void MyProcessorpunctuate(Stringlong namestreamTime) { super(name); KeyValueIterator<String, Integer> iter }= this.kvStore.all(); @Override public void process(String key, Integer valuewhile (iter.hasNext()) { if (value > 10) Entry<String, Integer> entry = iter.next(); forward(key, value) } } // create the topology private static class MyPTopology extends ProcessorTopology { System.out.println("[" + entry.key() + ", " + entry.value() + "]"); @Override public void build() { context.forward(entry.key(), entry.value()); KafkaProcessor<String, Integer, String, Integer> source;} KafkaProcessor<String, Integer, String, Integer> aggregate;} KafkaProcessor<String, Integer, String, Integer> filter;@Override source = addSource(new StringDeserializer(), new IntegerDeserializer(), "topic-source"); public void close() { aggregate = new AggregateProcessor("aggregate"); filter = new FilterProcessor("filter"this.kvStore.close(); addProcessor(aggregate, source); } addProcessor(filter, aggregate)}; } } public static void main(String[] args) throws Exception { KafkaProcessStreamingConfig processconfig = new KafkaProcessStreamingConfig(MyPTopology.class, new ProcessorProperties(new Properties())); process.start(); } } |
This example API demonstrates the abstraction of the low-level consumer / producer interfaces, such as consumer.poll() / commit(), producer.send(callback), producer.flush(), etc.
High-level Stream DSL
In addition to the processor API, we would also like to introduce a higher-level stream DSL for users that covers most common processor implementations.
// build topology
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", "topic-source");
.addProcessor("PROCESS", new MyProcessorDef(), "SOURCE");
.addSink("SINK", "topic-sink", "PROCESS");
// start process
KafkaStreaming streaming = new KafkaStreaming(builder, config);
streaming.start();
}
}
|
This example API demonstrates the abstraction of the low-level consumer / producer interfaces, such as consumer.poll() / commit(), producer.send(callback), producer.flush(), etc.
High-level Stream DSL
In addition to the processor API, we would also like to introduce a higher-level stream DSL for users that covers most common processor implementations.
Code Block | ||
---|---|---|
| ||
public interface KStream<K, V> {
| ||
Code Block | ||
| ||
public interface KStream<K, V> { /** * Creates a new stream consists of all elements of this stream which satisfy a predicate */ KStream<K, V> filter(Predicate<K, V> predicate); /** * Creates a new stream by transforming key-value pairs by a mapper to all elements of this stream */ <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, K1, V1> mapper); /** * Creates a new stream byconsists transformingof valuesaall byelements aof mapperthis tostream allwhich valuessatisfy ofa this streampredicate */ <V1> KStream<K, V1>V> mapValuesfilter(ValueMapper<VPredicate<K, V1>V> mapperpredicate); /** * Creates a new stream by applyingtransforming key-value pairs by a flat-mapper to all elements of this stream */ <K1, V1> KStream<K1, V1> flatMapmap(KeyValueMapper<K, V, K1, ?V1> extends Iterable<V1>> mapper); /** * Creates a new stream by applyingtransforming valuesa by a flat-mapper to all values of this stream */ <V1> KStream<K, V1> flatMapValuesmapValues(ValueMapper<V, ? extends Iterable<V1>> processorV1> mapper); /** * Creates a new windowedstream streamby usingapplying a specified window instance.flat-mapper to all elements of this stream */ KStreamWindowed<K<K1, V1> KStream<K1, V>V1> with(Window<K, V> windowflatMap(KeyValueMapper<K, V, K1, ? extends Iterable<V1>> mapper); /** * Creates ana arraynew ofstream streamsby fromapplying this stream. Each stream in the array corresponds to a predicate ina flat-mapper to all values of this stream * supplied predicates in the same order. / */ <V1> KStream<K, V>[]V1> branchflatMapValues(Predicate<KValueMapper<V, V>... predicates ? extends Iterable<V1>> processor); /** * Sends key-value to a topicCreates a new windowed stream using a specified window instance. */ void sendTo(String topicKStreamWindowed<K, V> with(Window<K, V> window); /** * SendsCreates key-valuean toarray aof topic,streams alsofrom createsthis astream. newEach stream fromin the topic. array corresponds to a predicate *in This is mostly used for* repartitioningsupplied andpredicates isin equivalentthe tosame calling sendTo(topic) and from(topic).order. */ KStream<K, V>[] through(String topicbranch(Predicate<K, V>... predicates); /** * ProcessesSends allkey-value elementsto in this stream by applying a processora topic. */ <K1, V1> KStream<K1, V1> process(KafkaProcessor<K, V, K1, V1> processorvoid sendTo(String topic); // .. more operators } public interface KStreamWindowed<K, V> extends KStream<K, V> { /**** * Sends key-value to a topic, also creates a new stream from the topic. * CreatesThis ais newmostly streamused byfor joiningrepartitioning thisand windowedis streamequivalent withto the other windowed streamcalling sendTo(topic) and from(topic). */ Each element arrived fromKStream<K, eitherV> of the streams is joined with elements with the same key in another stream.through(String topic); /** * TheProcesses all resultingelements valuesin arethis computedstream by applying a joinerprocessor. */ <V1<K1, V2>V1> KStream<KKStream<K1, V2>V1> join(KStreamWindowed<Kprocess(KafkaProcessor<K, V, K1, V1> other, ValueJoiner<V, V1, V2> joiner);processor); // .. more operators } public interface KStreamWindowed<K, V> extends KStream<K, V> { /** * Creates a new stream by joining this windowed stream with the other windowed stream. * Each element arrived from either of the streams is joined with elements with the same key in another stream * if the element from the other stream has an older timestamp. * The resulting values are computed by applying a joiner. */ <V1, V2> KStream<K, V2> joinPriorjoin(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joiner); } |
With this high-level interface, the user instantiated program can be simplified as (using lambda expression):
Code Block | ||
---|---|---|
| ||
public class StatefulProcessJob { /** private* staticCreates classa MyKStreamTopologynew extendsstream KStreamTopologyby { joining this windowed stream with the other windowed @Overridestream. * Each element publicarrived voidfrom build() { // create a sourceeither of the streams is joined with elements with the same key in another stream * if the element from the other KStream<String,stream String>has stream1an = from(new StringDeserializer(), new StringDeserializer(), "topic-source"); older timestamp. * The resulting values are computed by applying // parse the value string to integera joiner. */ <V1, V2> KStream<K, V2> joinPrior(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joiner); } |
With this high-level interface, the user instantiated program can be simplified as (using lambda expression in Java 0.8):
Code Block | ||
---|---|---|
| ||
public class KStreamJob { KStream<String, Integer> stream2 = public static void main(String[] args) throws Exception { stream1.map((String key, String value)StreamingConfig ->config return= new KeyValue<>(key, new Integer(value)))StreamingConfig(props); // build the topology KStreamBuilder builder = new .filter((String key, Integer value) -> return value > 10)KStreamBuilder(); KStream<String, String> stream1 // branch two streams with odd / even values = builder.from("topic1"); KStream<String, Integer> stream2 = KStream<String, Integer>[] streams = stream2stream1.branch( map((key, value) -> new KeyValue<>(key, new Integer(value))) (String .filter(((key, Integer value) -> return isOdd(), true)); KStream<String, Integer>[] streams = stream2 .branch(String (key, Integer value) -> value return isEven()); > 10, // send result stream to Kafka topics (key, value) -> value <= 10); streams[0].sendTo("topic-oddtopic2"); streams[1].sendTo("topic-eventopic3"); } } public// staticstart void main(String[] args) throws Exception {the process KafkaProcessKafkaStreaming processkstream = new KafkaProcessKafkaStreaming(MyKStreamTopology.classbuilder, new ProcessorProperties(new Properties()))config); processkstream.start(); } } |
Architecture Design
We summarize some key architecture design points in the following sub-sections.
Partition Partition Distribution
Each instance of the KafkaProcess can contain As shown in the digram above, each KStream process could have multiple threads (#.threads configurable in the properties). And users can start multiple instances of their process job. So , with each thread having a separate consumer and producer. So the first question is how can we distribute the partitions of the subscribed topics in the source processor among all the processes / threads.
...
With this new assignment protocol (details of this change can be found here), we distribute the partitions among worker thread as the following:
1. Each worker thread in the process maintains a separate consumer and producer.
2. Upon startup, each thread's consumer executes the partition assignment algorithm and gets the allocated partitions.
Stream Time and Sync.
Time in the stream processing is very important. Windowing operations (join and aggregation) are defined by time. Since Kafka can replay stream, wall-clock based time (system time) may not make sense due to delayed messages / out-of-order messages. Hence we need to define a "time" for each stream according to its progress. We call it stream time.
Stream Time
0. Upon starting the KStream process, user-specified number of KStream threads will be created. There is no shared variables between threads and no synchronization barriers as well hence these threads will execute completely asynchronously. Hence we will describe the behavior of a s single thread in all the following steps.
1. Thread constructs the user-specified processor topology without initializing it just in order to extract the list of subscribed topics from the topology.
2. Thread uses its consumer's partitionsFor() to fetch the metadata for each of the subscribed topics to get a information of topic -> #.partitions.
3. Thread now triggers consumer's subscribe() with the subscribed topics, which will then applies the new rebalance protocol. The join-group request will be instantiated as follows (for example):
Code Block |
---|
JoinGroupRequest =>
GroupId => "KStream-[JobName]"
GroupType => "KStream" |
And the assignor interface is implemented as follows:
Code Block |
---|
List<TopicPartition> assign(String consumerId,
Map<String, Integer> partitionsPerTopic,
List<ConsumerMetadata<T>> consumers) {
// 1. trigger user-customizable grouping function to group the partitions into groups.
// 2. assign partitions to consumers at the granularity of partition-groups.
// 3*. persist the assignment result using commit-offset to Kafka.
} |
The interface of the grouping function is the following, it is very similar to the assign() interface above, with the only difference that it does not have the consumer-lists.
Code Block |
---|
interface PartitionGrouper {
/**
* Group partitions into partition groups
*/
List<Set<TopicPartition>> group(Map<String, Integer> partitionsPerTopic)
} |
So after the rebalance completes, each partition-group will be assigned as a whole to the consumers, i.e. no partitions belonging to the same group will be assigned to different consumers. The default grouping function maps partitions with the same id across topics to into a group (i.e. co-partitioning).
4. Upon getting the partition-groups, thread creates one task for each partition-group. And for each task, constructs the processor topology AND initializes the topology with the task context.
a. Initialization process will trigger Processor.init() on each processor in the topology following topology's DAG order.
b. All user-specified local states will also be created during the initialization process (we will talk about this later in the later sections).
c. Creates the record queue for each one of the task's associated partition-group's partitions, so that when consumers fetches new messages, it will put them into the corresponding queue.
Hence all tasks' topologies have the same "skeleton" but different processor / state instantiated objects; in addition, partitions are also synchronized at the tasks basis (we will talk about partition-group synchronization in the next section).
5. When rebalance is triggered, the consumers will read its last persisted partition assignment from Kafka and checks if the following are true when comparing with the new assignment result:
a. Existing partitions are still assigned to the same partition-groups.
b. New partitions are assigned to the existing partition-groups instead of creating new groups.
For a), since the partition-group's associated task-id is used as the corresponding change-log partition id, if a partition gets migrated from one group to another during the rebalance, its state will be no longer valid; for b) since we cannot (yet) dynamically change the #.partitions from the consumer APIs, dynamically adding more partition-groups (hence tasks) will cause change log partitions possibly not exist yet.
[NOTE] there are some more advanced partitioning setting such as sticky-partitioning / consistent hashing that we want to support in the future, which may then require additionally:
c. Partition groups are assigned to the specific consumers instead of randomly / round-robin.
Stream Time and Sync.
Time in the stream processing is very important. Windowing operations (join and aggregation) are defined by time. Since Kafka can replay stream, wall-clock based time (system time) may not make sense due to delayed messages / out-of-order messages. Hence we need to define a "time" for each stream according to its progress. We call it stream time.
Stream Time
A stream is defined to abstract all the partitions of the same topic within a task, and its name is the same as the topic name. For example if a task's assigned partitions are {Topic1-P1, Topic1-P2, Topic1-P3, Topic2-P1, Topic2-P2}, then we treat this task as having two streams: "Topic1" and "Topic2" where "Topic1" represents three partitions P1 P2 and P3 of Topic1, and stream "Topic2" represents two partitions P1 and P2 of Topic2.
Each message in a stream has to have a timestamp to perform window based operations and punctuations. Since Kafka message does not have timestamp in the message header, users can define a timestamp extractor based on message content that is used in the source processor when deserializing the messages. This extractor can be as simple as always returning the current system time (or wall-clock time), or it can be an Each message in a stream has to have a timestamp to perform window based operations and punctuations. Since Kafka message does not have timestamp in the message header, users can define a timestamp extractor based on message content that is used in the source processor when deserializing the messages. This extractor can be as simple as always returning the current system time (or wall-clock time), or it can be an Avro decoder that gets the timestamp field specified in the record schema.
...
2. Each message has an associated timestamp that is extracted from the timestamp extractor in the message content.
3. The partition 's timestamp time is defined as the lowest message timestamp value in its buffer.
...
4. The stream time is defined as the lowest partition timestamp value across all its partitions in the processtask:
a. Since partition times are monotonically increasing, stream times are also monotonically increasing.
...
When joining two streams, their progress need to be synchronized. If they are out of sync, a time window based join becomes faulty. Say a delay of one stream is negligible and a delay of the other stream is one day, doing join over 10 minutes window does not make sense. To handle this case, we define a stream group as a set of streams whose rate of consumptions need to be synchronized. Within a process instance, such stream groups are actually instantiated as partition groups from the assigned topic partitions. Each worker need to make sure that the consumption rates of all partitions within each task's assigned partition-group are "synchronized". Note that each thread may have one or more tasks, but it does not need to synchronize the partitions across tasks' partition-groups.
Work thread synchronizes the consumption within each one of such groups through consumer's pause / resume APIs as following:
...
2. When one paused partition is ahead of time below some defined with other partitions, notify the corresponding consumer to un-pause.
.
Two Users can instantiate a "grouping function" that maps the assigned partitions from the consumer into partition groups; they act as the stream groups in which the stream synchronization mechanism above is applied. The default grouping function maps partitions with the same id across topics to into a group (i.e. co-partitioning). Two streams that are joined together have to be in the same stream grouptask, and their represented partition lists have to match each other. That is, for example, a stream representing P1, P2 and P3 can be joined with another stream also representing P1, P2 and P3.
Local State Management
Users can create one or more state stores during their processing logic, and each task will have a state manager that keeps an instance of each specified store inside the task.
a store instance will be created for each of their specified partition groups. Since a single store instance will not be shared across multiple partition groups, and each partition group will only be processed by a single thread, this guarantees any store will not be accessed concurrently by multiple thread at any given time.
...
Each state store will be backed up by a different Kafka change log topic, and each instance of the store correlates to one partition of the topic, such that:
Code Block |
---|
#. tasks == #.partitions of the change logpartition groups == #. store instances for each state store instances == #. partition groupspartitions of the change log for each state store |
For example, if a processor instance consumes from upstream Kafka topic "topic-A" with 4 partitions, and creates two stores, namely store1 and store2, and user groups the 8 4 partitions into {topic-A-p1, topic-A-p2} and {topic-A-p3, topic-A-p4}; then two change log topics, for example namely "topic-store1-changelog" and "topic-store2-changelog", need to be created beforehand, each with two partitions.
...
b. Otherwise, do not load the previously flushed state and replay the change log from the beginning up to the log-end-offset.
Workflow Summary
This section summarized We summarize the KafkaProcess workflow of a kafka streaming process following the above architecture design.
Startup
Upon user calling KafkaProcesscalling KafkaStreaming.start(), the process instance creates the worker threads given user specified #.threads. In each worker thread:
1. Trigger Topology.build() to retrieve Construct the producer and consumer client, extract the subscription topic names from the topology.
2. Construct Let the consumer , to subscribe to the topics and gets the assigned partitions.
3. Trigger the grouping function with the assigned partitions get the returned list of partition-groups (hence tasks) with associated ids.
4. Initialize each partition group task by:
a. Creates a record queue for buffering the fetched records for each partition.
b. Initialize the topology with the constructed processor context, in which users may create the local state. Initialize a topology instance for the task from the builder with a newly created processor context.
c. Perform validations on changelog topics if local state gets created and restored following the above stepsInitialize the state manager of the task and constructs / resumes user defined local states.
5. Each thread then runs their Runs the loop at their its own pace , until notified to be shutdown: there is no synchronization between these threads. In each iteration of the loop:
...
b. Choose one record from the queues and process it through the processor topology.from the queues and process it through the processor topology.
c. Check if some of the processors' punctuate functions need to be triggered, and if yes, execute the function.
d c. Check if user calls commit() during the processing of this records; if yes commit the offset / flush the local state / flush the producer.
Shutdown
Upon user calling KafkaStreaming.shutdown(), the following steps are executed:
1. Commit / flush each partition-group's current processing state as described in the local state management section.
2. Close the embedded producer and consumer clients.
Packaging Design
It would be best to package Processor / KStream as a separate jar, since it introduces extra external dependencies, such as RocksDB, etc. Under this model:
...
Current class / package names can be found in [LINK]this PR. A general summary:
1. All classes are defined in the "streamstreams" folder.
2. Low-level Processor interface is under the "o.a.k.clientsstreams.processor" package; high-level KStream interface is under the "o.a.k.stream" package.
3. Important user-facing classes include:
Code Block |
---|
KafkaProcessor: implements Processor, Receiver, Punctuator; used for computation logic.
ProcessorContext: passed in KafkaProcessor.init(); provides schedule / send / commit / etc functions, and topic / partition / offset / etc source record metadata.
StateStore: can be created inside KafkaProcessor.init() for storing local state.
PTopology: requires users to implement the build() function, in which addProcessor / addSource can be used to construct the DAG.
KStreamTopology: extends PTopology, and in its build() function high-level operators like map / filter / branch / etc can be used.
KStreamProcess: used in main function to take provided Topology class and configs to start the instance. |
Some example classes can be found in o.a.k.stream.examples.
4. Important internal classes include:
"o.a.k.streams.kstream" package.
3. Some example classes can be found in o.a.k.streams.examples.
4. Important internal classes include:
Code Block |
---|
PartitionGroup: a set of partitions along with their queuing buffers and timestamp extraction logic.
ProcessorStateManager: the manager of the local states within a task.
ProcessorTopology: the instance of the topology generated by the TopologyBuilder.
StreamTask: the task of the processing tasks unit, which include a ProcessorTopology, a ProcessorStateManager and a PartitionGroup.
StreamThread: contains multiple StreamTasks, a Consumer and a Producer client |
Code Block |
Ingestor: the wrapped consumer instance for fetching data / managing offsets.
KStreamThread: multi-threaded KStreamProcess will create #.KStreamThread specified in configs, each maintaining its own Ingestor.
StreamGroup: the unit of processing tasks that are assigned to KStreamThread within the KStreamProcess instance.
KStreamFilter/Map/Branch/...: implementations of high-level KStream topology builder operators. |
...
Shutdown
Upon user calling KafkaProcess.shutdown(), the following steps are executed:
1. Commit / flush each partition-group's current processing state as described in the local state management section.
2. Close the embedded producer and consumer clients.
Compatibility, Deprecation, and Migration Plan
...