You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

NOTE: This document is a design draft and does not reflect the state of a stable release, but the work in progress on the current master, and future considerations.

 

This document is intended to give an overview of how operator state is handled and represented in streaming programs.

State in streaming programs

tbd

State access patterns

To design a properly functioning, scalable state representation we need to understand the different state access and handling patterns that applications need in streaming programs. We can distinguish three main state types: local state, partitioned state, out-of-core state. 

Local (non-partitioned) state

Local or non-partitioned state is the simplest form of operator state which represent the current state of a specific operator instance in a parallel streaming operator. Local states stored at different operator instances do not interact with each other. For instance if we have a mapper with parallelism of 10 that means each parallel instance holds it’s own local state.  An important thing to note here is that state updates reflected in a particular local state will depend only on the input of that specific operator instance. As there is no strict control over the input which each operator instance processes (only via stream partitioning), the operator itself needs to deal with this non-deterministic nature of the inputs and this also limits expressivity.

 Typical usage of non-partitioned state includes source operators (storing offsets), oany global aggregation/summary or analysis operators where the local results will be merged into a global result afterwards.

 Scaling out non-partitioned state can in most cases be done by just starting a new operator instance with a blank state, or a user supplied splitting function can be used to split the state of an existing instance. For reducing job parallelism the user should  generally provide a function that can merge 2 local states to maintain correct results.

Local state example
val env: StreamExecutionEnvironment = ...
 
val stream = env.addSource(new SourceFunction[Int]() {
 	
	// store current offset as operator state
    var offset: OperatorState[Int] = _
	var isRunning = false
 
    override def open(config: Configuration): Unit = {
		// get non-partitioned state from context with default value of 0
        offset = getRuntimeContext().getState("offset", 0, false)
    }
 
    override def run(ctx: SourceContext[Int]): Int = {
		isRunning = true
        while (isRunning) {
			// output the current offset then increment
			val currOffset = offset.value
			ctx.collect(currOffset)
			offset.update(currOffset + 1)
		}
    }
 
	override def cancel(): Unit = { isRunning = false }
})

Alternatively, the user can also use the Checkpointed interface for implementing local state functionality. This interface gives more fine-grained control over the checkpoint/restore process:

Checkpointed interface example
class CheckpointedSource extends SourceFunction[Int] with Checkpointed[Int] {
    var offset: Int
	var isRunning = false
 
    override def run(ctx: SourceContext[Int]): Int = {
		isRunning = true
        while (isRunning) {
			ctx.collect(offset)
			offset += 1
		}
    }
 
	override def snapshotState(checkpointID: Long, timeStamp: Long): Int = offset
	override def restoreState(state: Int): Unit = {offset = state}
 
	override def cancel(): Unit = { isRunning = false }
})

Partitioned state

Partitioned state provides a representation for operator states that are tied to partitions (keys) of the operator input. An independent state is maintained for each partition and operator states for different keys don't interact. Partitioning is done by some user defined key which is extracted from each input and inputs with the same key share the state. Big advantage of partitioned state is both expressivity and trivial scalability.

In case of partitioned state, the operator instances need not to be aware of the partitioning of the inputs as they can only access the state for the current input and the system guarantees proper partitioning by the selected key. This makes the implementation of per-key operators very simple. Operations using partitioned state can also benefit from the partial ordering guarantees that the flink runtime provides, to implement deterministic behaviour. Furthermore partitioned state can easily be scaled automatically by moving some partitions to new operators and changing the input partitioning.

Typical usage includes any per key stateful operation, such as group aggregates/summaries, or analysis over several distinct groups in the stream for instance pattern detection.

Partitioned state is only accessible from operators applied on KeyedDataStreams (the state partitioning will be done by the key in the stream):

Partitioned state example
val stream: DataStream[String] = ...
val keyed: KeyedDataStream[String] = stream.keyBy(x => x)
 
val  = keyed.map(new RichMapFunction[String, (String, Int)]() {
 
    val countPerID: OperatorState[Int] = _
 
    override def open(config: Configuration): Unit = {
		//the true flag marks the state as partitioned
        countPerID = getRuntimeContext().getState("count", 0, true)
    }
 
    override def map(in: String): (String, Int) = {
		// countPerID holds the state for the current input
        val countSoFar= countPerID.value + 1
        countPerID.update(countSoFar)
        (in, countSoFar)
    }
})

Out-of-core state

We can think of out-of-core state as an extension of partitioned state, where the state is kept outside of the system in an external storage layer (in some key-value store). In contrast with partitioned state, here the states are not stored and checkpointed with other operator states, but are read from and updated externally (we will of course use some sort of caching to increase performance). There are several reasons why one would use out-of-core state in streaming applications:

  1. Storing large states that don't fit in memory (with the restriction that a state for one key needs to fit)
  2. Making operator states available for external systems

While 1. is important as well and we will touch this issue later, we will focus on 2. here. The problem that arises here is that we want to maintain a consistent view of the streaming state from the outside despite possible node failures during processing. While our fault tolerance mechanism can deal with consistence within the streaming job, we need to extend it to handle out-of-core state consistently.

The main problem here is that we want to simulate a global transaction (a transaction containing all the state updates, a change-log, since the last checkpoint) to the external storage without actually having such a mechanism. This is hard because we need to guarantee that once we have written some update to the external storage we cannot roll-back in case if failure, so we need to make sure that either all states are written or none (we can’t even start writing in that case).

Proposed implementation

 

We can create a global transaction by storing the change-log first in a persistent layer (write-ahead log) before starting to write to the external database. In order this to work we need to use versioning in the external database to only commit the new version once all the local changes have been written.

Assumptions:

  1. External storage with transaction + versioning support
  2. Persistent in-memory storage for write-ahead log

Proposed algorithm:

 

  1. (Task) Upon receiving a trigger-checkpoint event each task stores its change-log in the persistent storage layer and sends an ack

  2. (JM) Upon receiving acks from all tasks start committing the the change-logs

  3. (JM) The checkpoint is complete when all states have been committed to the key-value store and we commit the new version

This algorithm builds on the current checkpointing mechanism, by adding an extra round to commit the changes to the external database. The implementation of 2. can use the same logic as triggering the checkpoints and individual tasks can commit their own change-log. In case of failure, the failed change-logs can be retrieved from the persistent storage and can be committed by any other task.

 


 





 

 

  • No labels