Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Page properties


Discussion thread
Vote thread
JIRA
Release


Please keep Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation


IterativeCondition defines a user-defined condition that decides if an element should be accepted in the pattern or not. The condition iterates over the previously accepted elements in the pattern and decides to accept a new element or not based on some statistic over elements. In certain aggregation accumulation scenarios, for example filtering goods with more than 1,000 orders within 10 minutes, aggregation accumulation operation needs to perform in IterativeCondition. The aggregation accumulation behaivor causes the repeated calculation of the aggregation accumulation state, because an aggregation accumulation state may execute multiple transitions with condition and each condition invoker will be accumulated once. AggregationCondition AccumulationStateCondition is proposed to define the IterativeCondition with aggregation accumulation and filter the aggregation accumulation state with accumulator. The aggregation accumulation state is consistent within the lifecycle of a matching NFA, on other words, user doesn't need to pay attention to when the aggregation accumulation state is initialized and cleaned up.

Public Interfaces


The following public interfaces are introduced to support IterativeCondition with Aggregator Accumulator functional requirement.

Adds

...

AccumulationStateCondition 


AggregationCondition AccumulationStateCondition is proposed to define an aggregation an accumulation state condition that decides if an aggregation should be accepted in the pattern or not. Accepting an element also signals a state aggregation for the corresponding. AggregationCondition provides the aggregation functionality and filters the aggregation state with the AggregationContextaccumulates the event element and filters the accumulation state with user-defined accumulator. AccumulationStateCondition provides users with the accumulation functionality and allows users to filter the accumulation state with the accumulator in context.


Code Block
languagejava
titleAccumulationStateCondition
/**
 * AAn user-definedaccumulation aggregationstate condition that decides if an aggregation should be accepted in the
 * pattern or not. Accepting an element also signals a state aggregation for the corresponding
 * {@link org.apache.flink.cep.nfa.NFA}.
 *
 * <p>A condition can be a aggregation condition that gets the aggregation over the previously
 * accepted elements in the pattern and decides to accept a new element or not based on some
 * statistic over these elements.
 *
 * <p>An iterative condition that accepts an element if i) its name is middle, and ii) the sum of
 * the prices of all accepted elements is less than {@code 5} would look like:
 *
 * <pre>{@code
 * private class MyCondition extends AggregationCondition<Event> {
 *
 * 		@Override
 *     	public boolean filter(Event value, AggregationContext<Event> ctx) throws Exception {
 *     		if (!value.getName().equals("middle")) {
 *     			return false;
 *     		}
 *
 *     		double sum = ctx.getAggregate("middle");
 *     		sum += value.getPrice();
 *     		return Double.compare(sum, 5.0) <= 0;
 *     	}
 *
 * 		@Override
 *     	public Tuple2<Aggregator<A>, TypeSerializer<A>> getAggregator() throws Exception {
 *     		return new Tuple2(new DoubleSumAggregator(), DoubleSerializer.INSTANCE);
 *     	}
 *
 *    }
 * }</pre>
 *
 * <b>ATTENTION: </b> The call to {@link AggregationContext#getAggregate(String) getAggregate(...)}
 * has to get the aggregation of the elements that belong to the pattern among the elements stored
 * by the NFA.
 */
@PublicEvolving
public abstract class AggregationCondition<T, A extends Value> extends IterativeCondition<T> {

    private static final long serialVersionUID = 1L;

    public abstract Tuple2<Aggregator<A>, TypeSerializer<A>> getAggregator() throws Exception;

    /** The context used when evaluating the {@link AggregationCondition condition}. */
    public interface AggregationContext<T, A> extends Context<T> {

        /**
         * @return An {@link Aggregator}'s current aggregate over the already accepted elements for
         *     a given pattern.
         * @param name The name of the pattern.
         */
        A getAggregate(String name) throws Exception;
    }
}which accumulates the event value and filters the accumulation state with accumulator.
 *
 * @param <T> The type of the accumulated value.
 */
public abstract class AccumulationStateCondition<T> extends IterativeCondition<T> {

    /**
     * Accumulates the given value with the accumulator in context. In the case of a <i>sum</i> accumulator,
     * this method adds the given value to the sum.
     *
     * @param value The value to be accumulated.
     * @param ctx The {@link Context} used for the evaluation of the function and provides access to
     *   the already accepted events in the pattern (see {@link Context#getEventsForPattern(String)}).
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *   operation to fail and may trigger recovery.
     */
    public abstract void accumulate(T value, IterativeCondition.Context<T> ctx) throws Exception;
}


Adds getAccumulator(stateName, serializer) and setAccumulator(stateName, accumulator, serializer) in IterativeCondition#Context


getAccumulator(stateName, serializer) and setAccumulator(stateName, accumulator, serializer) interfaces are introduced in IterativeCondition#Context to set an user-defined accumulator with a given accumulation state, and get the accumulator with the given accumulation state. With the accumulator getter and setter, user accumulates with the event value via an user-defined accumulator, and filter the accumulation state with the corresponding accumulator.


Code Block
languagejava
titlegetAccumulator(stateName, serializer) and setAccumulator(stateName, accumulator, serializer)
/**
 * Get the accumulator with a given accumulation state.
 *
 * @param stateName The name of the accumulation state.
 * @param serializer The serializer of the accumulator.
 * @return An accumulator corresponding to a given accumulation state.
 * @throws Exception This method may throw exceptions. Throwing an exception will cause the
 *   accumulation to fail.
 */
<ACC> ACC getAccumulator(String stateName, TypeSerializer<ACC> serializer) throws Exception;

/**
 * Set an accumulator with a given accumulation state.
 *
 * @param stateName The name of the accumulation state.
 * @param accumulator The accumulator of the accumulation state.
 * @param serializer The serializer of the accumulator.
 * @throws Exception This method may throw exceptions. Throwing an exception will cause the
 *   accumulation to fail.
 */
<ACC> void setAccumulator(String stateName, ACC accumulator, TypeSerializer<ACC> serializer) throws Exception;


Example Usage


Example section provides code snippets that shows how user invokes AggregationCondition AccumulationStateCondition to defined aggregation accumulation logic in condition. For example, in the user transaction analysis  scenario: find order id with an order amount greater than 10 within 5 seconds. In the following example, we define OrderSumCondition which extends AggregationCondition AccumulationStateCondition to express that the accumulative order sum is greater than 10.

...

Code Block
languagejava
titleOrderSumCondition
public class OrderSumCondition extends AccumulationStateCondition<Event> {

    @Override
    public void accumulate(Event value, Context<Event> ctx) throws Exception {
        Double sum = ctx.getAccumulator("sum(price)", DoubleSerializer.INSTANCE);
        ctx.setAccumulator("sum(price)", sum == null ? value.getPrice() : sum + value.getPrice(), DoubleSerializer.INSTANCE);
    }

    @Override
    public boolean filter(Event value, Context<Event> ctx) throws Exception {
        Double acc = ctx.getAccumulator("sum(price)", DoubleSerializer.INSTANCE);
        return acc != null && !(acc < 10.0public class OrderSumCondition extends AggregationCondition<Event> {

    @Override
    public boolean filter(Event value, Context<Event> ctx) throws Exception {
        return ctx.getAggregate("start") > 10.0;     
    }

     @Override
     public Tuple2<Aggregator<A>, TypeSerializer<A>> getAggregator() throws Exception {
        return new Tuple2(new DoubleSumAggregator(), DoubleSerializer.INSTANCE);
    }
}


The following end-to-end example will show how to use user-defined OrderSumCondition to construct cep pattern and show what output is obtained from a specific datastream input. 

...

Code Block
languagejava
titlePattern Example
Pattern.begin("start")
         
   .where((IterativeCondition) new   .oneOrMoreOrderSumCondition())
        .until(new OrderSumCondition(within(Time.seconds(5L));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input =
    env.fromElements(
        new Event(1, "barfoo", 1.0),
        new Event(2, "barfoo", 2.0),
        new Event(3, "foobar", 3.0),
        new Event(4, "foobar", 5.0),
        new Event(5, "foobar", 42.0),
        new Event(6, "foobar", 1.0));
DataStream<String> result =
        CEP.pattern(input, pattern)
				.inProcessingTime()
                .flatSelect(
                        (p, o) -> {
                            o.collect(String.valueOf(p.get("start").get(0).getId()));
                        },
                        Types.STRING);
result.print()

...

Based on the above example, we expect to obtain order id 4 and 5 will both be output. Because the total order amount for sequences with order id 1 to 4 is 11 which exceeds the threshold of 10 and the order amount of order id 5 is 42 that also exceeds the threshold of 10. When the result is printed we actually get order id 4 and 5.


Proposed Changes


Introduces AccumulatorId


AccumulatorId is introduced to identify the accumulator in SharedBuffer. AccumulatorId is a wrapper that accesses the accumulator in SharedBuffer, which contains the  name and version of accumulator, properties of ComputationState, and the built-in serializer.

...

Code Block
languagejava
titleAccumulatorId
public class AccumulatorId {

    /**
     * The accumulator version.
     */
    private final int version;

    /**
     * The {@link ComputationState} corresponding to accumulator.
     */
    private final ComputationState computationState;

    /**
     * The name of accumulator
     */
    private final String accumulatorKey;
    
    /**
    * The constructor of AccumulatorId
    */ 
    public AccumulatorId(ComputationState computationState, String accmulatorKey) {}


    /** Serializer for {@link AccumulatorId}. */
    public static class AccumulatorIdSerializer
            extends TypeSerializerSingleton<AccumulatorId> {

        /** Serializer configuration snapshot for compatibility and format evolution. */
        public static class AccumulatorIdSerializer extends SimpleTypeSerializerSnapshot<AccumulatorId> {
            public AccumulatorIdSerializerSnapshot() {}
        }
    }
}


Introduces the getter/setter of Accumulator in SharedBuffer


The accumulator passed in by the user will eventually be stored in the SharedBuffer and the accumulated value will be maintained here, therefore the SharedBuffer needs to have some supporting methods for the SharedBufferAccessor to call.

...

Code Block
languagejava
titlegetter/setter of Accumulator
/**
 * Gets the accumulator id that already exists.

 * @return An {@link Iterator} over the accumulator id that already exists.
 * @throws Exception This method may throw exceptions. Throwing an exception will fail to
 *   get the {@link Iterator} of the {@link AccumulatorId}.
 */
public Iterator<AccumulatorId> getAccumulatorIdIterator() throws Exception {}

/**
 * Gets the accumulator by the specified {@link AccumulatorId}.
 *
 * @param accumulatorId The {@link SharedBuffer} access wrapper for accumulator id.
 * @return The serialized data of the accumulator.
 * @throws Exception This method may throw exceptions. Throwing an exception will fail to
 *   get the accumulator of the {@link AccumulatorId}.
 */
public byte[] getAccumulator(AccumulatorId accumulatorId) throws Exception {}

/**
 * Sets the accumulator by specified {@link AccumulatorId}.
 *
 * @param accumulatorId The {@link SharedBuffer} access wrapper for accumulator id.
 * @param data The serialized data of the accumulator.
 * @throws Exception This method may throw exceptions. Throwing an exception will fail to
 *   set the accumulator of the {@link AccumulatorId}.
 */
public void setAccumulator(AccumulatorId accumulatorId, byte[] data) throws Exception {}


Introduces operations of Accumulator in SharedBufferAccessor


Set of methods which access the accumulator in SharedBuffer need to cooperate with SharedBufferAccessor and both the serializer and deserializer need to be specified so that accumulator could be maintenanced by AccumulatorId which is a accumulator id wrapper.

...

Code Block
languagejava
titleoperations of Accumulator in SharedBufferAccessor
/**
 * The deserializer of input value
 */
private final DataInputDeserializer dataInputView;

/**
 * The Serializer of output value
 */
private final DataOutputSerializer dataOutputView;

/**
 * Update the constructor with input deserializer and output serializer
 */
SharedBufferAccessor(SharedBuffer<V> sharedBuffer) {
    this.sharedBuffer = sharedBuffer;
    this.dataInputView = new DataInputDeserializer();
    this.dataOutputView = new DataOutputSerializer(128);
}

/**
 * Gets the accumulator stored in {@link SharedBuffer}.
 *
 * @param stateName The name of the aggregationaccumulation state that needs to be unique.
 * @param computationState The {@link ComputationState} of NFA.
 * @param serializer The serializer of the accumulator.
 * @return An accumulator that accumulated value is stored.
 * @throws Exception This method may throw exceptions. Throwing an exception will fail to
 *   get the accumulator with the given aggregationaccumulation state.
 */
public <ACC> ACC getAccumulator(String stateName, ComputationState computationState, TypeSerializer<ACC> serializer) throws Exception {}


/**
 * Sets the accumulator into {@link SharedBuffer}.
 *
 * @param stateName The name of the aggregationaccumulation state that needs to be unique.
 * @param computationState The {@link ComputationState} of NFA.
 * @param accumulator The accumulator that accumulated value is stored.
 * @param serializer The serializer of the accumulator.
 * @throws Exception This method may throw exceptions. Throwing an exception will fail to
 *   set the accumulator with the given aggregationaccumulation state.
 */
public <ACC> void setAccumulator(String stateName, ComputationState computationState, ACC accumulator, TypeSerializer<ACC> serializer) throws Exception {}

/**
 * Removes the accumulator stored in {@link SharedBuffer}.
 *
 * @param computationState The {@link ComputationState} of NFA.
 * @throws Exception This method may throw exceptions. Throwing an exception will fail to
 *   remove the accumulator with the given aggregationaccumulation state.
 */
public void removeAccumulator(ComputationState computationState) throws Exception {}


Adds branch in NFA StateTransitions


When traversing stateTransitions with condition,  we call the accumulate method to process the aggregation accumulation operation in AggregationCondition AccumulationStateCondition if it is a AggregationCondition AccumulationStateCondition instance and clear a state in accumulator when SharedBuffer node is released  or NFA stop state is reached. AggregationCondition AccumulationStateCondition is a type of Condition that could get from the stateTransition. In terms of NFA execution, AggregationCondition AccumulationStateCondition is different from ordinary Condition. A stateTransition will only perform the aggregation accumulation operation once to avoid repeated aggregationaccumulation. For example, what the user actually wants is update the accumulate state when the take transition is met, but in fact,  the current state may not only have take transition, but also ignore transition or proceed transition. If all transitions of this state call a aggregation accumulation operation once, it will cause double calculation which is not desirable.


Compatibility, Deprecation, and Migration Plan


No.


Test Plan


Unit tests of CEPStateInConditionTest will be added to verify the correctness of IterativeCondition with accumulator in Pattern.


Rejected Alternatives


No.