...
NOTE: This document is a design draft and does not reflect the state of a stable release, but the work in progress on the current master, and future considerations.
...
Page contents
Table of Contents | ||||
---|---|---|---|---|
|
State in streaming programs
More complex streaming pipelines generally need to keep some sort of operator state to execute the application logic. Examples include, keeping some aggregation or summary of the received elements, or we can imagine more complex states such as keeping a state-machine for detecting patterns for fraudulent financial transactions or holding a model for some machine learning application. While in all mentioned cases we keep some kind of summary of the input history, the concrete requirements vary greatly from one stateful application to another.
This document is intended to serve as a guide for stateful stream processing in Flink and other systems, by identifying some common usage patterns and requirements for implementing stateful operators in streaming systems.
State access patterns
To design a properly functioning, scalable state representation we need to understand the different state access and handling patterns that applications need in streaming programs. We can distinguish three main state types (access patterns): local state, partitioned state, out-of-core state.
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
val env: StreamExecutionEnvironment = ... val stream = env.addSource(new SourceFunction[Int]() { // store current offset as operator state var offset: OperatorState[Int] = _ var isRunning = false override def open(config: Configuration): Unit = { // get non-partitioned state from context with default value of 0 offset = getRuntimeContext().getState("offset", 0, false) } override def run(ctx: SourceContext[Int]): Int = { isRunning = true while (isRunning) { // output the current offset then increment val currOffset = offset.value ctx.collect(currOffset) offset.update(currOffset + 1) } } override def cancel(): Unit = { isRunning = false } }) |
Alternatively, the user can also use the Checkpointed interface for implementing local state functionality. This interface gives more fine-grained control over the checkpoint/restore process:
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
class CheckpointedSource extends SourceFunction[Int] with Checkpointed[Int] { var offset: Int var isRunning = false override def run(ctx: SourceContext[Int]): Int = { isRunning = true while (isRunning) { ctx.collect(offset) offset += 1 } } override def snapshotState(checkpointID: Long, timeStamp: Long): Int = offset override def restoreState(state: Int): Unit = {offset = state} override def cancel(): Unit = { isRunning = false } }) |
Partitioned state
Partitioned state provides a representation for operator states that are tied to partitions (keys) of the operator input. An independent state is maintained for each partition and operator states for different keys don't interact. Partitioning is done by some user defined key which is extracted from each input and inputs with the same key share the state. Big advantage of partitioned state is both expressivity and trivial scalability. In case of partitioned state, the operator instances need not to be aware of the partitioning of the inputs as they can only access the state for the current input and the system guarantees proper partitioning by the selected key. This makes the implementation of per-key operators very simple. Operations using partitioned state can also benefit from the partial ordering guarantees that the flink runtime provides, to implement deterministic behaviour. Furthermore partitioned state can easily be scaled automatically by moving some partitions to new operators and changing the input partitioning. Typical usage includes any per key stateful operation, such as group aggregates/summaries/window buffers, or analysis over several distinct groups in the stream for instance pattern detection. |
Partitioned state is only accessible from operators applied on KeyedDataStreams (the state partitioning will be done by the key in the stream):
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
val stream: DataStream[String] = ... val keyed: KeyedDataStream[String] = stream.keyBy(x => x) val = keyed.map(new RichMapFunction[String, (String, Int)]() { val countPerID: OperatorState[Int] = _ override def open(config: Configuration): Unit = { //the true flag marks the state as partitioned countPerID = getRuntimeContext().getState("count", 0, true) } override def map(in: String): (String, Int) = { // countPerID holds the state for the current input val countSoFar= countPerID.value + 1 countPerID.update(countSoFar) (in, countSoFar) } }) |
Out-of-core state
We can think of out-of-core state as an extension of partitioned state, where the state is kept outside of the system in an external storage layer (in some key-value store). In contrast with partitioned state, here the states are not stored and checkpointed with other operator states, but are read from and updated externally (we will of course use some sort of caching to increase performance). There are several reasons why one would use out-of-core state in streaming applications:
While 1. is important as well and we will touch this issue later, we will focus on 2. here. The problem that arises here is that we want to maintain a consistent view of the streaming state from the outside despite possible node failures during processing. While our fault tolerance mechanism can deal with consistence within the streaming job, we need to extend it to handle out-of-core state consistently.
The main problem here is that we want to simulate a global transaction (a transaction containing all the state updates, a change-log, since the last checkpoint) to the external storage without actually having such a mechanism. This is hard because we need to guarantee that once we have written some update to the external storage we cannot roll-back in case if failure, so we need to make sure that either all states are written or none (we can’t even start writing in that case). | |
Proposed implementationWe can create a global transaction by storing the change-log first in a persistent layer (write-ahead log) before starting to write to the external database. In order this to work we need to use versioning in the external database to only commit the new version once all the local changes have been written. Assumptions:
Proposed algorithm:
This algorithm builds on the current checkpointing mechanism, by adding an extra round to commit the changes to the external database. The implementation of 2. can use the same logic as triggering the checkpoints and individual tasks can commit their own change-log. In case of failure, the failed change-logs can be retrieved from the persistent storage and can be committed by any other task.
|
State requirements
The state interfaces and implementations need to satisfy certain requirements in order to efficiently scale to large data volumes while keeping flexibility for application developers:
State flexibility for applications
Job scalability while maintaining correctness
Scalability to large state sizes
Efficient snapshots and failure recovery
Flexible state backends
We will go through each requirement in this section in detail and show how Flink tackles them.
State flexibility for applications
This is probably the only requirement that is hard to capture very precisely. What it means in practice is that the provided state interfaces should cover most of the stateful use cases that can be expected from a distributed system.
To achieve this Flink provides multiple ways to declare and access operator state, giving full flexibility to the applijcations:
OperatorState interface
| The OperatorState interface encapsulates both partitioned and non-partitioned state access in streaming operators. In case of local state access the value returned by `state.value()` is the same for all inputs in an operator system, while in case of partitioned state access the state returned is the state for the key of the current input. The simple thing about using OperatorStates is that local and partitioned state access happens exactly the same way, and the user only needs to worry about declaring the state access pattern when retrieving the OperatorState instance. OperatorStates can be used in any RichFunction, and is accessible from the runtime context. The user can retrieve an OperatorState from the RuntimeContext using a string identifier (to allow multiple states) and a default value. In addition, the user can also provide a StateCheckpointer instance for fine grained control over the checkpointing of the state. By default the user states will be serialized using default Java serialization. |
Checkpointed interface
| Another alternative way to declare stateful operators is by implementing the Checkpointed interface by the user function. The system automatically detects operators that implement this interface and will draw (and restore) state snapshots using the `snapshotState` and `restoreState` methods. This approach might be preferred in cases when the user needs to execute some additional application logic during snapshotting. Of course the two interfaces can be used at the same time (even in the same operator). |