Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will introduce ValueState and ValueStateDescriptor to let users use value state in Python DataStream API. The interfaces are as following:

import abc

from pyflink.common.typeinfo import TypeInformation


class StateDescriptor(abc.ABC):

   def __init__(self, name: str, type_info: TypeInformation):

       self

       self.name = name

       self.type_info = type_info


class ValueStateDescriptor(StateDescriptor):

   def __init__(self,
                      name: str,

                       value_type_info: TypeInformation):

       super(ValueStateDescriptor, self).__init__(name

)       self.

, value_type_info

= value_type_info

)


class State(ABC):

   @abstractmethod

   def clear(self) -> None:

       pass


class ValueState(State, Generic[T]):

   @abstractmethod

   def value(self) -> T:

       pass

   @abstractmethod

   def update(self, value: T) -> None:

       pass

AppendingState & MergingState

class AppendingState(State, Generic[T]):

   @abstractmethod

   def get(self) -> T:

       pass

   @abstractmethod

   def add(self, value: T) -> None:

       pass

class MergingState(AppendingState):

       pass

ListState

class ListStateDescriptor(StateDescriptor):

   def __init__(self,
                      name,
                      elem_type_info: TypeInformation):

       super(ListStateDescriptor, self).__init__(name

)       self.

, Types.List(elem_type_info

= elem_type_info

))

       


class ListState(

State

MergingState, Generic

[T]):

   @abstractmethod

   def get(self) -> Iterable

[T]

:

       pass

   @abstractmethod

   def add(self, value: T

)

-> None

:

       pass

   @abstractmethod

   def update(self, values: List[T]) -> None:

       pass

   @abstractmethod

   def add_all(self, values: List[T]) -> None:

       pass

   def __iter__(self) -> Iterator[T]:

       return iter(self.get())

MapState

class MapStateDescriptor(StateDescriptor):

   def __init__(self,
                      name: str,

                       key_type_info: TypeInformation,
                      value_type_info: TypeInformation):

       super(MapStateDescriptor, self).__init__(name

)       self.

, Types.Map(key_type_info

= key_type_info       self.value

,

                                                                                                       value_type_info

= value_type_info

))


class MapState(State, Generic[K, V]):

   @abstractmethod

   def get(self, key: K) -> V:

       pass

   @abstractmethod

   def put(self, key: K, value: V) -> None:

       pass

   @abstractmethod

   def put_all(self, dict_value: Dict[K, V]) -> None:

       pass

   @abstractmethod

   def remove(self, key: K) -> None:

       pass

   @abstractmethod

   def contains(self, key: K) -> bool:

       pass

   @abstractmethod

   def items(self) -> Iterable[Tuple[K, V]]:

       pass

   @abstractmethod

   def keys(self) -> Iterable[K]:

       pass

   @abstractmethod

   def values(self) -> Iterable[V]:

       pass

   @abstractmethod

   def is_empty(self) -> bool:

       pass

   def __getitem__(self, key: K) -> V:

       return self.get(key)

   def __setitem__(self, key: K, value: V) -> None:

       self.put(key, value)

   def __delitem__(self, key: K) -> None:

       self.remove(key)

   def __contains__(self, key: K) -> bool:

       return self.contains(key)

   def __iter__(self) -> Iterator[K]:

       return iter(self.keys())

ReducingState

class ReduceFunction(Function, Generic[T]):

  

   def reduce(self, first: T, second: T) -> T:

       pass


class ReducingStateDescriptor(StateDescriptor):

  def __init__(self,

               name: str,

               reduce_function: ReduceFunction,

               elem_type_info: TypeInformation):

      super(ReducingStateDescriptor, self).__init__(name)

      self.reduce_function = reduce_function

      self.elem_type_info = elem_type_info

class ReducingState(

State

MergingState, Generic[T]

):   def get(self

)

-> T

:

       pass

   def add(self, value: T):

       pass

AggregatingState

class AggregateFunction(Function, Generic[ACC, IN, OUT]):

   def create_accumulator(self) -> ACC:

       pass

   def add(self, acc: ACC, value: IN) -> ACC:

       pass

   def get_result(self, acc: ACC) -> OUT:

       pass

   def merge(self, acc1: ACC, acc2: ACC) -> ACC:

       pass


class AggregatingStateDescriptor(StateDescriptor):

  def __init__(self,

               name: str,

               aggregate_function: AggregateFunction,

               acc_type_info: TypeInformation):

      super(AggregatingStateDescriptor, self).__init__(name)

      self.aggregate_function = aggregate_function

      self.acc_type_info = acc_type_info


class AggregatingState(

State

MergingState, Generic[T]

):

   def get(self) -> T:

       pass

   def add(self, value: T

):

       pass

RuntimeContext

RuntimeContext contains information about the context in which functions are executed. The following methods will be added in RuntimeContext to allow creating state.

...