...
We will introduce ValueState and ValueStateDescriptor to let users use value state in Python DataStream API. The interfaces are as following:
import abc from pyflink.common.typeinfo import TypeInformation class StateDescriptor(abc.ABC): def __init__(self, name: str, type_info: TypeInformation): |
self.name = name self.type_info = type_info class ValueStateDescriptor(StateDescriptor): def __init__(self, value_type_info: TypeInformation): super(ValueStateDescriptor, self).__init__(name |
, value_type_info |
)
@abstractmethod def clear(self) -> None: pass class ValueState(State, Generic[T]): @abstractmethod def value(self) -> T: pass @abstractmethod def update(self, value: T) -> None: pass |
AppendingState & MergingState
class AppendingState(State, Generic[T]): @abstractmethod def get(self) -> T: pass @abstractmethod def add(self, value: T) -> None: pass class MergingState(AppendingState): pass |
ListState
class ListStateDescriptor(StateDescriptor): def __init__(self, super(ListStateDescriptor, self).__init__(name |
, Types.List(elem_type_info |
))
class ListState( |
MergingState, Generic |
@abstractmethod
def get(self) -> Iterable[T] |
pass
@abstractmethod
def add(self, value: T) |
: |
@abstractmethod def update(self, values: List[T]) -> None: pass @abstractmethod def add_all(self, values: List[T]) -> None: pass def __iter__(self) -> Iterator[T]: return iter(self.get()) |
MapState
class MapStateDescriptor(StateDescriptor): def __init__(self, key_type_info: TypeInformation, super(MapStateDescriptor, self).__init__(name |
, Types.Map(key_type_info |
, value_type_info |
))
@abstractmethod def get(self, key: K) -> V: pass @abstractmethod def put(self, key: K, value: V) -> None: pass @abstractmethod def put_all(self, dict_value: Dict[K, V]) -> None: pass @abstractmethod def remove(self, key: K) -> None: pass @abstractmethod def contains(self, key: K) -> bool: pass @abstractmethod def items(self) -> Iterable[Tuple[K, V]]: pass @abstractmethod def keys(self) -> Iterable[K]: pass @abstractmethod def values(self) -> Iterable[V]: pass @abstractmethod def is_empty(self) -> bool: pass def __getitem__(self, key: K) -> V: return self.get(key) def __setitem__(self, key: K, value: V) -> None: self.put(key, value) def __delitem__(self, key: K) -> None: self.remove(key) def __contains__(self, key: K) -> bool: return self.contains(key) def __iter__(self) -> Iterator[K]: return iter(self.keys()) |
ReducingState
class ReduceFunction(Function, Generic[T]): |
def reduce(self, first: T, second: T) -> T: pass class ReducingStateDescriptor(StateDescriptor): def __init__(self, name: str, reduce_function: ReduceFunction, elem_type_info: TypeInformation): super(ReducingStateDescriptor, self).__init__(name) self.reduce_function = reduce_function self.elem_type_info = elem_type_info |
MergingState, Generic[T] |
) |
: pass |
pass
AggregatingState
class AggregateFunction(Function, Generic[ACC, IN, OUT]): def create_accumulator(self) -> ACC: pass def add(self, acc: ACC, value: IN) -> ACC: pass def get_result(self, acc: ACC) -> OUT: pass def merge(self, acc1: ACC, acc2: ACC) -> ACC: pass class AggregatingStateDescriptor(StateDescriptor): def __init__(self, name: str, aggregate_function: AggregateFunction, acc_type_info: TypeInformation): super(AggregatingStateDescriptor, self).__init__(name) self.aggregate_function = aggregate_function self.acc_type_info = acc_type_info
|
MergingState, Generic[T] |
def get(self) -> T:
pass
def add(self, value: T): pass |
RuntimeContext
RuntimeContext contains information about the context in which functions are executed. The following methods will be added in RuntimeContext to allow creating state.
...