Status
Current state: [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
IterativeCondition defines a user-defined condition that decides if an element should be accepted in the pattern or not. The condition iterates over the previously accepted elements in the pattern and decides to accept a new element or not based on some statistic over elements. In certain accumulation scenarios, for example filtering goods with more than 1,000 orders within 10 minutes, accumulation operation needs to perform in IterativeCondition. The accumulation behaivor causes the repeated calculation of the accumulation state, because an accumulation state may execute multiple transitions with condition and each condition invoker will be accumulated once. AccumulationStateCondition is proposed to define the IterativeCondition with accumulation and filter the accumulation state with accumulator. The accumulation state is consistent within the lifecycle of a matching NFA, on other words, user doesn't need to pay attention to when the accumulation state is initialized and cleaned up.
Public Interfaces
The following public interfaces are introduced to support IterativeCondition with Accumulator functional requirement.
Adds AccumulationStateCondition
AccumulationStateCondition is proposed to define an accumulation state condition that accumulates the event element and filters the accumulation state with user-defined accumulator. AccumulationStateCondition provides users with the accumulation functionality and allows users to filter the accumulation state with the accumulator in context.
/** * An accumulation state condition which accumulates the event value and filters the accumulation state with accumulator. * * @param <T> The type of the accumulated value. */ public abstract class AccumulationStateCondition<T> extends IterativeCondition<T> { /** * Accumulates the given value with the accumulator in context. In the case of a <i>sum</i> accumulator, * this method adds the given value to the sum. * * @param value The value to be accumulated. * @param ctx The {@link Context} used for the evaluation of the function and provides access to * the already accepted events in the pattern (see {@link Context#getEventsForPattern(String)}). * @throws Exception This method may throw exceptions. Throwing an exception will cause the * operation to fail and may trigger recovery. */ public abstract void accumulate(T value, IterativeCondition.Context<T> ctx) throws Exception; }
Adds getAccumulator(stateName, serializer) and setAccumulator(stateName, accumulator, serializer) in IterativeCondition#Context
getAccumulator(stateName, serializer) and setAccumulator(stateName, accumulator, serializer) interfaces are introduced in IterativeCondition#Context to set an user-defined accumulator with a given accumulation state, and get the accumulator with the given accumulation state. With the accumulator getter and setter, user accumulates with the event value via an user-defined accumulator, and filter the accumulation state with the corresponding accumulator.
/** * Get the accumulator with a given accumulation state. * * @param stateName The name of the accumulation state. * @param serializer The serializer of the accumulator. * @return An accumulator corresponding to a given accumulation state. * @throws Exception This method may throw exceptions. Throwing an exception will cause the * accumulation to fail. */ <ACC> ACC getAccumulator(String stateName, TypeSerializer<ACC> serializer) throws Exception; /** * Set an accumulator with a given accumulation state. * * @param stateName The name of the accumulation state. * @param accumulator The accumulator of the accumulation state. * @param serializer The serializer of the accumulator. * @throws Exception This method may throw exceptions. Throwing an exception will cause the * accumulation to fail. */ <ACC> void setAccumulator(String stateName, ACC accumulator, TypeSerializer<ACC> serializer) throws Exception;
Example Usage
Example section provides code snippets that shows how user invokes AccumulationStateCondition to defined accumulation logic in condition. For example, in the user transaction analysis scenario: find order id with an order amount greater than 10 within 5 seconds. In the following example, we define OrderSumCondition which extends AccumulationStateCondition to express that the accumulative order sum is greater than 10.
public class OrderSumCondition extends AccumulationStateCondition<Event> { @Override public void accumulate(Event value, Context<Event> ctx) throws Exception { Double sum = ctx.getAccumulator("sum(price)", DoubleSerializer.INSTANCE); ctx.setAccumulator("sum(price)", sum == null ? value.getPrice() : sum + value.getPrice(), DoubleSerializer.INSTANCE); } @Override public boolean filter(Event value, Context<Event> ctx) throws Exception { Double acc = ctx.getAccumulator("sum(price)", DoubleSerializer.INSTANCE); return acc != null && !(acc < 10.0); } }
The following end-to-end example will show how to use user-defined OrderSumCondition to construct cep pattern and show what output is obtained from a specific datastream input.
Pattern.begin("start") .where((IterativeCondition) new OrderSumCondition()) .within(Time.seconds(5L)); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Event> input = env.fromElements( new Event(1, "barfoo", 1.0), new Event(2, "barfoo", 2.0), new Event(3, "foobar", 3.0), new Event(4, "foobar", 5.0), new Event(5, "foobar", 42.0), new Event(6, "foobar", 1.0)); DataStream<String> result = CEP.pattern(input, pattern) .inProcessingTime() .flatSelect( (p, o) -> { o.collect(String.valueOf(p.get("start").get(0).getId())); }, Types.STRING); result.print()
Based on the above example, we expect to obtain order id 4 and 5 will both be output. Because the total order amount for sequences with order id 1 to 4 is 11 which exceeds the threshold of 10 and the order amount of order id 5 is 42 that also exceeds the threshold of 10. When the result is printed we actually get order id 4 and 5.
Proposed Changes
Introduces AccumulatorId
AccumulatorId is introduced to identify the accumulator in SharedBuffer. AccumulatorId is a wrapper that accesses the accumulator in SharedBuffer, which contains the name and version of accumulator, properties of ComputationState, and the built-in serializer.
public class AccumulatorId { /** * The accumulator version. */ private final int version; /** * The {@link ComputationState} corresponding to accumulator. */ private final ComputationState computationState; /** * The name of accumulator */ private final String accumulatorKey; /** * The constructor of AccumulatorId */ public AccumulatorId(ComputationState computationState, String accmulatorKey) {} /** Serializer for {@link AccumulatorId}. */ public static class AccumulatorIdSerializer extends TypeSerializerSingleton<AccumulatorId> { /** Serializer configuration snapshot for compatibility and format evolution. */ public static class AccumulatorIdSerializer extends SimpleTypeSerializerSnapshot<AccumulatorId> { public AccumulatorIdSerializerSnapshot() {} } } }
Introduces the getter/setter of Accumulator in SharedBuffer
The accumulator passed in by the user will eventually be stored in the SharedBuffer and the accumulated value will be maintained here, therefore the SharedBuffer needs to have some supporting methods for the SharedBufferAccessor to call.
/** * Gets the accumulator id that already exists. * @return An {@link Iterator} over the accumulator id that already exists. * @throws Exception This method may throw exceptions. Throwing an exception will fail to * get the {@link Iterator} of the {@link AccumulatorId}. */ public Iterator<AccumulatorId> getAccumulatorIdIterator() throws Exception {} /** * Gets the accumulator by the specified {@link AccumulatorId}. * * @param accumulatorId The {@link SharedBuffer} access wrapper for accumulator id. * @return The serialized data of the accumulator. * @throws Exception This method may throw exceptions. Throwing an exception will fail to * get the accumulator of the {@link AccumulatorId}. */ public byte[] getAccumulator(AccumulatorId accumulatorId) throws Exception {} /** * Sets the accumulator by specified {@link AccumulatorId}. * * @param accumulatorId The {@link SharedBuffer} access wrapper for accumulator id. * @param data The serialized data of the accumulator. * @throws Exception This method may throw exceptions. Throwing an exception will fail to * set the accumulator of the {@link AccumulatorId}. */ public void setAccumulator(AccumulatorId accumulatorId, byte[] data) throws Exception {}
Introduces operations of Accumulator in SharedBufferAccessor
Set of methods which access the accumulator in SharedBuffer need to cooperate with SharedBufferAccessor and both the serializer and deserializer need to be specified so that accumulator could be maintenanced by AccumulatorId which is a accumulator id wrapper.
/** * The deserializer of input value */ private final DataInputDeserializer dataInputView; /** * The Serializer of output value */ private final DataOutputSerializer dataOutputView; /** * Update the constructor with input deserializer and output serializer */ SharedBufferAccessor(SharedBuffer<V> sharedBuffer) { this.sharedBuffer = sharedBuffer; this.dataInputView = new DataInputDeserializer(); this.dataOutputView = new DataOutputSerializer(128); } /** * Gets the accumulator stored in {@link SharedBuffer}. * * @param stateName The name of the accumulation state that needs to be unique. * @param computationState The {@link ComputationState} of NFA. * @param serializer The serializer of the accumulator. * @return An accumulator that accumulated value is stored. * @throws Exception This method may throw exceptions. Throwing an exception will fail to * get the accumulator with the given accumulation state. */ public <ACC> ACC getAccumulator(String stateName, ComputationState computationState, TypeSerializer<ACC> serializer) throws Exception {} /** * Sets the accumulator into {@link SharedBuffer}. * * @param stateName The name of the accumulation state that needs to be unique. * @param computationState The {@link ComputationState} of NFA. * @param accumulator The accumulator that accumulated value is stored. * @param serializer The serializer of the accumulator. * @throws Exception This method may throw exceptions. Throwing an exception will fail to * set the accumulator with the given accumulation state. */ public <ACC> void setAccumulator(String stateName, ComputationState computationState, ACC accumulator, TypeSerializer<ACC> serializer) throws Exception {} /** * Removes the accumulator stored in {@link SharedBuffer}. * * @param computationState The {@link ComputationState} of NFA. * @throws Exception This method may throw exceptions. Throwing an exception will fail to * remove the accumulator with the given accumulation state. */ public void removeAccumulator(ComputationState computationState) throws Exception {}
Adds branch in NFA StateTransitions
When traversing stateTransitions with condition, we call the accumulate method to process the accumulation operation in AccumulationStateCondition if it is a AccumulationStateCondition instance and clear a state in accumulator when SharedBuffer node is released or NFA stop state is reached. AccumulationStateCondition is a type of Condition that could get from the stateTransition. In terms of NFA execution, AccumulationStateCondition is different from ordinary Condition. A stateTransition will only perform the accumulation operation once to avoid repeated accumulation. For example, what the user actually wants is update the accumulate state when the take transition is met, but in fact, the current state may not only have take transition, but also ignore transition or proceed transition. If all transitions of this state call a accumulation operation once, it will cause double calculation which is not desirable.
Compatibility, Deprecation, and Migration Plan
No.
Test Plan
Unit tests of CEPStateInConditionTest will be added to verify the correctness of IterativeCondition with accumulator in Pattern.
Rejected Alternatives
No.