Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. State API between the Runner and the SDK harness which could be used for state access in the Python user-defined function. (Note: The state communication uses a separate channel from the data channel.
  2. It has defined five kinds of states in the proto message in the State API (Refer to StateKey for more details) and three types of operations for state access in the State API: Get / Append / Clear:

    State Type

    Usecase

    Runner

    Remote references (and other Runner specific extensions)

    IterableSideInput

    Side input of iterable

    MultimapSideInput

    Side input of of values of map

    MultimapKeyedSideInput

    Side input of of keys of map

    BagUserState

    User state with primary key (value state, bag state, combining value state)

    Among them, only BagUserStage is BagUserState is dedicated for state access in Beam. The others are used for data access in Beam. 

  3. Building on the proto message of BagUserState, it has supported four kinds of user-facing API in Beam’s Python SDK harness: BagRuntimeState, SetRuntimeState, ReadModifyWriteRuntimeState and CombiningValueRuntimeState.

...

  1. In the proto message layer, it has defined 5 types of proto messages.
  2. Each proto message could only represent a single kind of state in the underlying execution engine and it’s up to the operator to decide which kind of state one kind of proto message mapped to.
  3. In the user-facing API (State defined in Python function) layer, the Python SDK harness could expose different kinds of user-facing API even with the same underlying proto message.

BagUserStage will BagUserState will be mapped to ListState in Flink, to support the other kinds of state, such as MapState which could not be simulated using ListState, we will make use of the other kinds of proto messages even if they are not designed to be used for state access in Beam. This could work as it’s up to the operator to decide which kind of state it’s mapped to and will be described in the following sections.

...

We will introduce ValueState and ValueStateDescriptor to let users use value state in Python DataStream API. The interfaces are as following:

import abc

from pyflink.common.typeinfo import TypeInformation


class StateDescriptor(abc.ABC):

   def __init__(self, name: str, type_info: TypeInformation):

       self.name = name

       self.type_info = type_info


class ValueStateDescriptor(StateDescriptor):

   def __init__(self,
                      name: str,

                       value_type_info: TypeInformation):

       super(ValueStateDescriptor, self).__init__(name, value_type_info)


class State(ABC):

   @abstractmethod

   def clear(self) -> None:

       pass


class ValueState(State, Generic[T]):

   @abstractmethod

   def value(self) -> T:

       pass

   @abstractmethod

   def update(self, value: T) -> None:

       pass

AppendingState & MergingState

class AppendingState(State, Generic[IN, OUT]):

   @abstractmethod

   def get(self) -> OUT:

       pass

   @abstractmethod

   def add(self, value: IN) -> None:

       pass

class MergingState(AppendingState[IN, OUT]):

       pass

ListState

class ListStateDescriptor(StateDescriptor):

   def __init__(self,
                      name,
                      elem_type_info: TypeInformation):

       super(ListStateDescriptor, self).__init__(name, Types.List(elem_type_info))

       


class ListState(MergingState[T, Iterable[T]]):

   @abstractmethod

   def update(self, values: List[T]) -> None:

       pass

   @abstractmethod

   def add_all(self, values: List[T]) -> None:

       pass

   def __iter__(self) -> Iterator[T]:

       return iter(self.get())

MapState

class MapStateDescriptor(StateDescriptor):

   def __init__(self,
                      name: str,

                       key_type_info: TypeInformation,
                      value_type_info: TypeInformation):

       super(MapStateDescriptor, self).__init__(name, Types.Map(key_type_info,

                                                                                                       value_type_info))


class MapState(State, Generic[K, V]):

   @abstractmethod

   def get(self, key: K) -> V:

       pass

   @abstractmethod

   def put(self, key: K, value: V) -> None:

       pass

   @abstractmethod

   def put_all(self, dict_value: Dict[K, V]) -> None:

       pass

   @abstractmethod

   def remove(self, key: K) -> None:

       pass

   @abstractmethod

   def contains(self, key: K) -> bool:

       pass

   @abstractmethod

   def items(self) -> Iterable[Tuple[K, V]]:

       pass

   @abstractmethod

   def keys(self) -> Iterable[K]:

       pass

   @abstractmethod

   def values(self) -> Iterable[V]:

       pass

   @abstractmethod

   def is_empty(self) -> bool:

       pass

   def __getitem__(self, key: K) -> V:

       return self.get(key)

   def __setitem__(self, key: K, value: V) -> None:

       self.put(key, value)

   def __delitem__(self, key: K) -> None:

       self.remove(key)

   def __contains__(self, key: K) -> bool:

       return self.contains(key)

   def __iter__(self) -> Iterator[K]:

       return iter(self.keys())

ReducingState

class ReduceFunction(Function, Generic[T]):

   def reduce(self, first: T, second: T) -> T:

       pass


class ReducingStateDescriptor(StateDescriptor):

  def __init__(self,

               name: str,

               reduce_function: ReduceFunction,

               elem_type_info: TypeInformation):

      super(ReducingStateDescriptor, self).__init__(name)

      self.reduce_function = reduce_function

      self.elem_type_info = elem_type_info

class ReducingState(MergingState, Generic[T]):

       pass

AggregatingState

class AggregateFunction(Function, Generic[ACC, IN, OUT]):

   def create_accumulator(self) -> ACC:

       pass

   def add(self, acc: ACC, value: IN) -> ACC:

       pass

   def get_result(self, acc: ACC) -> OUT:

       pass

   def merge(self, acc1: ACC, acc2: ACC) -> ACC:

       pass


class AggregatingStateDescriptor(StateDescriptor):

  def __init__(self,

               name: str,

               aggregate_function: AggregateFunction,

               acc_type_info: TypeInformation):

      super(AggregatingStateDescriptor, self).__init__(name)

      self.aggregate_function = aggregate_function

      self.acc_type_info = acc_type_info


class AggregatingState(MergingState, Generic[T]):

       pass

RuntimeContext

RuntimeContext contains information about the context in which functions are executed. The following methods will be added in RuntimeContext to allow creating state.

...

2) At the Java operator, upon receiving a StateRequest, the operator will read/write the state backend according to the type of the StateRequest. It will also return StateResposeStateResponse(it holds the value of the state for write read requests) to the Python worker. 

...