Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release



1. Motivation

Flink doesn’t do anything when users remove the state inside of the operator in the new job and restore from the checkpoint of old job, it causes 2 problems:

  • These removed states will leak: 
    • These leaked states are still maintained in the state backend. They use the memory or disk of TM, and they will be snapshotted during the checkpoint.
    • These removed states are never accessed.
  • Unexpected behavior:
    • Flink doesn’t throw any exception and doesn’t have any log even if execution.savepoint.ignore-unclaimed-state[1] is false. 
    • Flink skips the state of savepoint, it’s better to let the user know.



2. For Flink engine or Flink users, when will the state be removed?

2.1 Case1: The user actively deletes the old state 

In the old Flink job, the user defined two states in the UDF: stateA and stateB. After the business logic is changed, there is no change in the DAG, but the StateB in the UDF is no longer needed, only stateA is needed.

At this time, in the code of the new job, the user will actively delete StateB.

2.2 Case2: The DAG has not changed, but one of the Operators has been replaced (very dangerous)


After the operator is replaced, if the user doesn’t set the uid for it,  the operator id won’t be changed. The operator of the new job will restore the state of the old job, however it never accesses the state of the old job.

The old state will no longer be accessed, so it is similar to the state being removed.

Upgrading kafka source from FlinkKafkaConsumer to KafkaSource without uid is such a case. It’s very dangerous for flink users:

  • The FlinkKafkaConsumer before the upgrade uses union list state, and the KafkaSource after the upgrade uses coordinator state.
  • For the upgraded KafkaSource, union list state is no longer used(it’s leaked).
  • More seriously: flink doesn’t throw an exception, users think the KafkaSource is restored from the checkpoint.


Note:

The kafka connector doc[2] mentioned the upgrading process: 

  • Change the assigned uid of your source/sink. This makes sure the new source/sink doesn’t read state from the old source/sink operators.
  • Start the new job with --allow-non-restored-state because we still have the state of the previous connector version in the savepoint.

If the user doesn’t respect the process, flink will skip the state of savepoint, it’s better to let the user know.


3. Public Change

3.1 The semantic is enhanced

The semantic of --allow-non-restored-state or execution.savepoint.ignore-unclaimed-state is enhanced.

3.2 Adding the handlerAfterOpenAndInitializeState interface for StreamOperator

After initializeState and open, the handlerAfterOpenAndInitializeState can check whether any state is removed. 


4. Proposed solution

4.1 Generic idea

Currently, the ignore-unclaimed-state just supports removing a stateful operator, when removing a stateful operator:

  • If execution.savepoint.ignore-unclaimed-state is false, flink will throw exception (Letting user know)
  • If execution.savepoint.ignore-unclaimed-state is true, flink will discard these states of removed stateful operator (Preventing the state leak)


It solved 2 problems: letting users know and preventing the state leak.

I propose the ignore-unclaimed-state supports removing a state inside of the operator as well, when removing a state inside of operator:

  • If execution.savepoint.ignore-unclaimed-state is false, flink will throw exception (Letting user know)
  • If execution.savepoint.ignore-unclaimed-state is true, flink will discard these states inside of the operator (Preventing the state leak)


Note: Discarding state means discarding the state maintained by TM, and will not affect the old checkpoint. Subsequent checkpoints of new jobs will not contain the discarded state.


Flink has 3 states: operator state, keyedState and Coordinator State.

When ignore-unclaimed-state supports each type of state, 3 things need to be done:

  1. How to detect whether the state in operator has been removed?
  2. After removal, if the state is not allowed to be discarded, an exception will be thrown.
  3. After removal, if the state is allowed to be discarded, clean up the corresponding state


Throwing exceptions is easy, so each type of state focuses on 1 and 3.

4.2 Coordinator State

POC commit: https://github.com/1996fanrui/flink/commit/8f698de4049606cae6a945592255bde82cf37ad3

4.2.1 How to detect whether the coordinator state is removed?

When the job is restored from the savepoint, if the operator id has coordinator state in the savepoint/checkpoint, but the operator corresponding to the operator id is not a Coordinator operator. Then it is considered that the Coordinator state of the Operator id has been removed.

4.2.2 How to clean up the removed Coordinator state?

Calling the operatorState.setCoordinatorState(null); to ensure that CoordinatorState is not distributed to the Operator.

4.3 Operator State

POC commit: https://github.com/1996fanrui/flink/commit/41b7c3f07c938d0d92b7fcd6a75d813770b1e9e7


4.3.1 How to detect whether the operator state is removed?

Core idea: 

  • DefaultOperatorStateBackend will maintain all operator state and broadcast state restored from Savepoint/Checkpoint. 
  • When a new job is initialized, the operator state will be created by accessing DefaultOperatorStateBackend in initializeState. 
  • After initializeState, DefaultOperatorStateBackend can check. 
  • If a state restored from savepoint is not accessed during initializeState, then the Operator will no longer be able to get the state, and this state can be considered to be removed.



Adding the OperatorStateBackend#checkStateIsRemoved(boolean allowNonRestored); . And call it after the initializeState.

4.3.2 How to clean up the removed operator state? 


Just clean up these removed states from registeredOperatorStates and registeredBroadcastStates in DefaultOperatorStateBackend. 

After cleaning, the memory will be released in the TM memory. And subsequent checkpoints will not contain these removed states.


4.4 Keyed State

4.4.1 How to detect whether the keyed state is removed?

The core idea is similar to Operator State:

  • All keyedState will be maintained in KeyedStateBackend
    • HeapKeyedStateBackend maintains state in TM's heap memory
    • RocksDBKeyedStateBackend maintains state in rocksdb
  • KeyedState is usually created in initializeState or StreamOperator#open
  • After initializeState and open, KeyedStateBackend can check


Compared with Operator State, KeyedState has two additional problems: 

  • KeyedState may be created after open, although this way of creation is unreasonable.
    • I suggest all states are created at initializeState or StreamOperator#open
    • FLIP-22[3] focuses on this problem.
  • Adding the handlerAfterOpenAndInitializeState interface for StreamOperator
    • Adding the KeyedStateBackend#checkStateIsRemoved(boolean allowNonRestored)
    • And call it in StreamOperator#handlerAfterOpenAndInitializeState

4.4.2 How to clean up the removed keyed state? 

  • HeapKeyedStateBackend cleans up the state in memory
  • RocksDBKeyedStateBackend can call dropColumnFamily of rocksdb to delete the corresponding state



Compatibility, Deprecation, and Migration Plan


Test Plan


Rejected Alternatives



[1] https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#execution-savepoint-ignore-unclaimed-state

[2] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version

[3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-22%3A+Eager+State+Declaration

  • No labels