Status
Current state: [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
IterativeCondition defines a user-defined condition that decides if an element should be accepted in the pattern or not. The condition iterates over the previously accepted elements in the pattern and decides to accept a new element or not based on some statistic over elements. The call to Context#getEventsForPattern(name) has to find the elements that belong to the pattern among the elements stored by the NFA. The cost of this operation can vary, for example, in aggregation scenarios like filtering goods with more than 1,000 orders within 10 minutes, aggregation operation needs to perform mutiple times caller of getEventsForPattern in IterativeCondition. The aggregation behaivor causes the repeated calculation of the aggregation, because an aggregation can execute multiple transitions with condition and each condition invoker will be accumulated once.
IterativeCondition with Aggregator is proposed to define an aggregation IterativeCondition with aggregator and filter the aggregation of matched elements in context. The aggregation is consistent within the lifecycle of a matching NFA, on other words, user doesn't need to pay attention to when the aggregation state of NFA is initialized and cleaned up. In addtion, the introduction of AggregationCondition reduces the cost of the operation of getEventsForPattern for aggregation with the minimum times caller of this method.
Public Interfaces
The following public interfaces are introduced to support IterativeCondition with Aggregator functional requirement.
Adds AggregationCondition
AggregationCondition is proposed to define an aggregation condition that decides if an aggregation should be accepted in the pattern or not. Accepting an element also signals a state aggregation for the corresponding. AggregationCondition provides the aggregation functionality and filters the aggregation state with the AggregationContext.
/** * A user-defined aggregation condition that decides if an aggregation should be accepted in the * pattern or not. Accepting an element also signals a state aggregation for the corresponding * {@link org.apache.flink.cep.nfa.NFA}. * * <p>A condition can be a aggregation condition that gets the aggregation over the previously * accepted elements in the pattern and decides to accept a new element or not based on some * statistic over these elements. * * <p>An iterative condition that accepts an element if i) its name is middle, and ii) the sum of * the prices of all accepted elements is less than {@code 5} would look like: * * <pre>{@code * private class MyCondition extends AggregationCondition<Event, Double> { * * @Override * public boolean filter(Event value, AggregationContext<Event> ctx) throws Exception { * if (!value.getName().equals("middle")) { * return false; * } * * double sum = ctx.getAggregate("middle", "sum"); * sum += value.getPrice(); * return Double.compare(sum, 5.0) <= 0; * } * * @Override * public Tuple3<String, Aggregator<Double>, TypeSerializer<Double>> getAggregator() throws Exception { * return new Tuple3("sum", new DoubleSumAggregator(), DoubleSerializer.INSTANCE); * } * * } * }</pre> * * <b>ATTENTION: </b> The call to {@link AggregationContext#getAggregate(String) getAggregate(...)} * has to get the aggregation of the elements that belong to the pattern among the elements stored * by the NFA. */ @PublicEvolving public abstract class AggregationCondition<T, A extends Value> extends IterativeCondition<T> { private static final long serialVersionUID = 1L; public abstract Tuple2<Aggregator<A>, TypeSerializer<A>> getAggregator() throws Exception; /** The context used when evaluating the {@link AggregationCondition condition}. */ public interface AggregationContext<T, A> extends Context<T> { /** * @return An {@link Aggregator}'s current aggregate over the already accepted elements for * a given pattern. * @param patternName The name of the pattern. * @param aggregatorName The namem of the aggregator. */ A getAggregate(String patternName, String aggregatorName) throws Exception; } }
Example Usage
Example section provides code snippets that shows how to use AggregationCondition to define the aggregator of condition and filter the aggregation with the aggregator. For example, in the user transaction analysis scenario: find order id with an order amount greater than 10 within 5 seconds. In the following example, we define OrderSumCondition which extends AggregationCondition to express that the accumulative order sum is greater than 10.
public class OrderSumCondition extends AggregationCondition<Event, Double> { @Override public boolean filter(Event value, Context<Event> ctx) throws Exception { return ctx.getAggregate("start", "sum") > 10.0; } @Override public Tuple3<String, Aggregator<Double>, TypeSerializer<Double>> getAggregator() throws Exception { return new Tuple2("sum", new DoubleSumAggregator(), DoubleSerializer.INSTANCE); } }
The following end-to-end example will show how to use user-defined OrderSumCondition to construct cep pattern and show what output is obtained from a specific datastream input.
Pattern.begin("start") .oneOrMore() .until(new OrderSumCondition()) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Event> input = env.fromElements( new Event(1, "barfoo", 1.0), new Event(2, "barfoo", 2.0), new Event(3, "foobar", 3.0), new Event(4, "foobar", 5.0), new Event(5, "foobar", 42.0), new Event(6, "foobar", 1.0)); DataStream<String> result = CEP.pattern(input, pattern) .inProcessingTime() .flatSelect( (p, o) -> { o.collect(String.valueOf(p.get("start").get(0).getId())); }, Types.STRING); result.print()
Based on the above example, we expect to obtain order id 4 and 5 will both be output. Because the total order amount for sequences with order id 1 to 4 is 11 which exceeds the threshold of 10 and the order amount of order id 5 is 42 that also exceeds the threshold of 10. When the result is printed we actually get order id 4 and 5.
Proposed Changes
Introduces AccumulatorId
AccumulatorId is introduced to identify the accumulator in SharedBuffer. AccumulatorId is a wrapper that accesses the accumulator in SharedBuffer, which contains the name and version of accumulator, properties of ComputationState, and the built-in serializer.
public class AccumulatorId { /** * The accumulator version. */ private final int version; /** * The {@link ComputationState} corresponding to accumulator. */ private final ComputationState computationState; /** * The name of accumulator */ private final String accumulatorKey; /** * The constructor of AccumulatorId */ public AccumulatorId(ComputationState computationState, String accmulatorKey) {} /** Serializer for {@link AccumulatorId}. */ public static class AccumulatorIdSerializer extends TypeSerializerSingleton<AccumulatorId> { /** Serializer configuration snapshot for compatibility and format evolution. */ public static class AccumulatorIdSerializer extends SimpleTypeSerializerSnapshot<AccumulatorId> { public AccumulatorIdSerializerSnapshot() {} } } }
Introduces the getter/setter of Accumulator in SharedBuffer
The accumulator passed in by the user will eventually be stored in the SharedBuffer and the accumulated value will be maintained here, therefore the SharedBuffer needs to have some supporting methods for the SharedBufferAccessor to call.
/** * Gets the accumulator id that already exists. * @return An {@link Iterator} over the accumulator id that already exists. * @throws Exception This method may throw exceptions. Throwing an exception will fail to * get the {@link Iterator} of the {@link AccumulatorId}. */ public Iterator<AccumulatorId> getAccumulatorIdIterator() throws Exception {} /** * Gets the accumulator by the specified {@link AccumulatorId}. * * @param accumulatorId The {@link SharedBuffer} access wrapper for accumulator id. * @return The serialized data of the accumulator. * @throws Exception This method may throw exceptions. Throwing an exception will fail to * get the accumulator of the {@link AccumulatorId}. */ public byte[] getAccumulator(AccumulatorId accumulatorId) throws Exception {} /** * Sets the accumulator by specified {@link AccumulatorId}. * * @param accumulatorId The {@link SharedBuffer} access wrapper for accumulator id. * @param data The serialized data of the accumulator. * @throws Exception This method may throw exceptions. Throwing an exception will fail to * set the accumulator of the {@link AccumulatorId}. */ public void setAccumulator(AccumulatorId accumulatorId, byte[] data) throws Exception {}
Introduces operations of Accumulator in SharedBufferAccessor
Set of methods which access the accumulator in SharedBuffer need to cooperate with SharedBufferAccessor and both the serializer and deserializer need to be specified so that accumulator could be maintenanced by AccumulatorId which is a accumulator id wrapper.
/** * The deserializer of input value */ private final DataInputDeserializer dataInputView; /** * The Serializer of output value */ private final DataOutputSerializer dataOutputView; /** * Update the constructor with input deserializer and output serializer */ SharedBufferAccessor(SharedBuffer<V> sharedBuffer) { this.sharedBuffer = sharedBuffer; this.dataInputView = new DataInputDeserializer(); this.dataOutputView = new DataOutputSerializer(128); } /** * Gets the accumulator stored in {@link SharedBuffer}. * * @param stateName The name of the aggregation state that needs to be unique. * @param computationState The {@link ComputationState} of NFA. * @param serializer The serializer of the accumulator. * @return An accumulator that accumulated value is stored. * @throws Exception This method may throw exceptions. Throwing an exception will fail to * get the accumulator with the given aggregation state. */ public <ACC> ACC getAccumulator(String stateName, ComputationState computationState, TypeSerializer<ACC> serializer) throws Exception {} /** * Sets the accumulator into {@link SharedBuffer}. * * @param stateName The name of the aggregation state that needs to be unique. * @param computationState The {@link ComputationState} of NFA. * @param accumulator The accumulator that accumulated value is stored. * @param serializer The serializer of the accumulator. * @throws Exception This method may throw exceptions. Throwing an exception will fail to * set the accumulator with the given aggregation state. */ public <ACC> void setAccumulator(String stateName, ComputationState computationState, ACC accumulator, TypeSerializer<ACC> serializer) throws Exception {} /** * Removes the accumulator stored in {@link SharedBuffer}. * * @param computationState The {@link ComputationState} of NFA. * @throws Exception This method may throw exceptions. Throwing an exception will fail to * remove the accumulator with the given aggregation state. */ public void removeAccumulator(ComputationState computationState) throws Exception {}
Adds branch in NFA StateTransitions
When traversing stateTransitions with condition, we call the accumulate method to process the aggregation operation in AggregationCondition if it is a AggregationCondition instance and clear a state in accumulator when SharedBuffer node is released or NFA stop state is reached. AggregationCondition is a type of Condition that could get from the stateTransition. In terms of NFA execution, AggregationCondition is different from ordinary Condition. A stateTransition will only perform the aggregation operation once to avoid repeated aggregation. For example, what the user actually wants is update the accumulate state when the take transition is met, but in fact, the current state may not only have take transition, but also ignore transition or proceed transition. If all transitions of this state call a aggregation operation once, it will cause double calculation which is not desirable.
Compatibility, Deprecation, and Migration Plan
No.
Test Plan
Unit tests of CEPStateInConditionTest will be added to verify the correctness of IterativeCondition with accumulator in Pattern.
Rejected Alternatives
No.