Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Calling StateBackend.setCurrentKey() before processing the state request in StateRequestHandler

State backend access synchronization

All the state access will be synchronized in the Java operator to ensure that there is no concurrent access to the state backend. There are the following cases which may need to access the state backend:

  • The state handler which is responsible for processing state requests from the Python worker process during executing user-defined functions. It's executed in the callback thread of state service.
  • The timer registration which is executed in the callback thread of data service.
  • During checkpoint.   

We need to synchronize the above two cases to make sure that there is no concurrent access to the state backend. For checkpoint, currently, all the data buffered in the Java operator and Python worker will be flushed in the method prepareSnapshotPreBarrier. It ensures that there is no state access from the Python worker during checkpoint.

Checkpoint

As all the state write operations will be delegated to the underlying state backend of the Java operator, we just need to make sure that all the state mutation requests are sent back to the Java operator before checkpoint, then the checkpoint will just work as it’s.

...