Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Page properties


Discussion thread
Vote thread
JIRA
Release


Please keep the discussion on the mailing list rather than commenting on the Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

IterativeCondition defines a user-defined condition that decides if an element should be accepted in the pattern or not. The condition iterates over the previously accepted elements in the pattern and decides to accept a new element or not based on some statistic over elements. The call to Context#getEventsForPattern(name) has to find the elements that belong to the pattern among the elements stored by the NFA. The cost of this operation can vary, for example, in aggregation scenarios like filtering goods with more than In certain accumulation scenarios, for example filtering goods with more than 1,000 orders within 10 minutes, aggregation accumulation operation needs to perform mutiple times caller of getEventsForPattern in IterativeCondition. The aggregation accumulation behaivor causes the repeated calculation of the aggregationaccumulation state, because an aggregation can accumulation state may execute multiple transitions with condition and each condition invoker will be accumulated once. IterativeCondition with AggregateFunction AccumulationStateCondition is proposed to define an aggregation the IterativeCondition with aggregate function accumulation and filter the aggregation of matched elements in contextaccumulation state with accumulator. The aggregation accumulation state is consistent within the lifecycle of a matching NFA, on other words, user doesn't need to pay attention to when the aggregation accumulation state of NFA is initialized and cleaned up. In addtion, the introduction of AggregationCondition reduces the cost of the operation of getEventsForPattern for aggregation with the minimum times caller of this method.

Public Interfaces


The following public interfaces are introduced to support IterativeCondition with AggregateFunction Accumulator functional requirement.

Adds

...

AccumulationStateCondition 


AggregationCondition AccumulationStateCondition is proposed to define an aggregation an accumulation state condition that decides if an aggregation should be accepted in the pattern or not. Accepting an element also signals a state aggregation for the corresponding. AggregationContext is used when evaluating the AggregationCondition for the result of the aggregation. AggregationCondition provides the aggregation functionality with the AggregateFunction and filters the aggregation result with the AggregationContextaccumulates the event element and filters the accumulation state with user-defined accumulator. AccumulationStateCondition provides users with the accumulation functionality and allows users to filter the accumulation state with the accumulator in context.


Code Block
languagejava
titleAccumulationStateCondition
/**
 * AAn user-definedaccumulation aggregationstate condition thatwhich decidesaccumulates ifthe anevent aggregationvalue shouldand befilters accepted in the
 *accumulation patternstate orwith notaccumulator.
 Accepting*
 an* element@param also<T> signalsThe a state aggregation fortype of the corresponding
 * {@link org.apache.flink.cep.nfa.NFA}.accumulated value.
 */
public *abstract <p>Aclass conditionAccumulationStateCondition<T> canextends beIterativeCondition<T> a{

 aggregation condition that gets the aggregation over the previously
 /**
     * acceptedAccumulates elementsthe ingiven thevalue patternwith andthe decidesaccumulator toin acceptcontext. aIn newthe elementcase orof nota based on some<i>sum</i> accumulator,
 * statistic over these elements.
 *
 *this <p>Anmethod iterativeadds conditionthe thatgiven acceptsvalue anto elementthe ifsum.
 i) its name is middle, and ii) the sum of
*
     * the@param pricesvalue ofThe allvalue acceptedto elementsbe isaccumulated.
 less than {@code 5} would* look@param like:
 *
 * <pre>{@code
 * private class MyCondition extends AggregationCondition<Event> {
 *
 * 		@Override
 *ctx The {@link Context} used for the evaluation of the function and provides access to
     * 	public boolean filter(Event value, AggregationContext<Event> ctx) throws Exception {
 *     		if (!value.getName().equals("middle")) {
 *     			return false;
 *     		}
 *
the already accepted events in the pattern (see {@link Context#getEventsForPattern(String)}).
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *   operation to 		doublefail sumand =may ctx.getAggregationResult("middle", "sum");trigger recovery.
 *     		sum += value.getPrice();*/
 *   public abstract 		return Double.compare(sum, 5.0) <= 0;
 *     	}
 *
 * 		@Override
 *     	public AggregateFunction<Event, Double, Double> getAggregateFunction() throws Exception {
 *     		return new Tuple2("sum", new AggregateFunction<Event, Double, Double>() {
 *                 private static final long serialVersionUID = 1L;
 *
 *                 @Override
 *                 public Double createAccumulator() {
 *                     return 0;
 *                 }
 *
 *                 @Override
 *                 public Double add(Event value, Double accumulator) {
 *                     return value.getPrice() + value;
 *                 }
 *
 *                 @Override
 *                 public Double getResult(Double accumulator) {
 *                     return accumulator.toString();
 *                 }
 *
 *                 @Override
 *                 public Double merge(Double a, Double b) {
 *                     return a + b;
 *                 }
 *             });
 *     	}
 *
 *    }
 * }</pre>
 *
 * <b>ATTENTION: </b> The call to {@link AggregationContext#getAggregationResult(String, String)
 * getAggregationResult(...)} has to get the aggregation of the elements that belong to the pattern
 * among the elements stored by the NFA.
 */
@PublicEvolving
public abstract class AggregationCondition<T, ACC, OUT> extends IterativeCondition<T> {

    private static final long serialVersionUID = 1L;

    @Override
    public boolean filter(T value, Context<T> ctx) throws Exception {
        return filter(value, (AggregationContext<T, OUT>) ctx);
    }

    /**
     * The filter function that evaluates the predicate with the aggregation context.
     *
     * @param value The value to be tested.
     * @param ctx The {@link Context} used for the evaluation of the function and provides access to
     *     the detail and aggregation of the already accepted events in the pattern (see {@link
     *     Context#getEventsForPattern(String)} and {@link
     *     AggregationContext#getAggregationResult(String, String)}).
     * @return {@code true} for values that should be retained, {@code false} for values to be
     *     filtered out.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    public abstract boolean filter(T value, AggregationContext<T, OUT> ctx) throws Exception;

    /**
     * Gets a user-defined {@link AggregateFunction} to aggregate the accepted events in the
     * pattern.
     *
     * @return An {@link AggregateFunction} to aggregate filtered events of the pattern.
     * @throws Exception Throws exception if creating an {@link AggregateFunction} fails.
     */
    public abstract Tuple2<String, AggregateFunction<T, ACC, OUT>> getAggregateFunction()
            throws Exception;

    /** The context used when evaluating the {@link AggregationCondition condition}. */
    public interface AggregationContext<T, OUT> extends Context<T> {

        /**
         * @return The result of the aggregation from the accumulator over the already accepted
         *     elements for a given pattern.
         * @param patternName The name of the pattern.
         * @param aggregateName The name of the aggregate.
         */
        OUT getAggregationResult(String patternName, String aggregateName) throws Exception;
    }
}

Example Usage

Example section provides code snippets that shows how to use AggregationCondition to define the aggregator of condition and filter the aggregation with the aggregator. For example, in the user transaction analysis  scenario: find order id with an order amount greater than 10 within 5 seconds. In the following example, we define OrderSumCondition which extends AggregationCondition to express that the accumulative order sum is greater than 10.

void accumulate(T value, IterativeCondition.Context<T> ctx) throws Exception;
}


Adds getAccumulator(stateName, serializer) and setAccumulator(stateName, accumulator, serializer) in IterativeCondition#Context


getAccumulator(stateName, serializer) and setAccumulator(stateName, accumulator, serializer) interfaces are introduced in IterativeCondition#Context to set an user-defined accumulator with a given accumulation state, and get the accumulator with the given accumulation state. With the accumulator getter and setter, user accumulates with the event value via an user-defined accumulator, and filter the accumulation state with the corresponding accumulator.


Code Block
languagejava
titlegetAccumulator(stateName, serializer) and setAccumulator(stateName, accumulator, serializer)
/**
 * Get the accumulator with a given accumulation state.
 *
 * @param stateName The name of the accumulation state.
 * @param serializer The serializer of the accumulator.
 * @return An accumulator corresponding to a given accumulation state.
 * @throws Exception This method may throw exceptions. Throwing an exception will cause the
 *   accumulation to fail.
 */
<ACC> ACC getAccumulator(String stateName, TypeSerializer<ACC> serializer) throws Exception;

/**
 * Set an accumulator with a given accumulation state.
 *
 * @param stateName The name of the accumulation state.
 * @param accumulator The accumulator of the accumulation state.
 * @param serializer The serializer of the accumulator.
 * @throws Exception This method may throw exceptions. Throwing an exception will cause the
 *   accumulation to fail.
 */
<ACC> void setAccumulator(String stateName, ACC accumulator, TypeSerializer<ACC> serializer) throws Exception;


Example Usage


Example section provides code snippets that shows how user invokes AccumulationStateCondition to defined accumulation logic in condition. For example, in the user transaction analysis  scenario: find order id with an order amount greater than 10 within 5 seconds. In the following example, we define OrderSumCondition which extends AccumulationStateCondition to express that the accumulative order sum is greater than 10.



Code Block
languagejava
titleOrderSumCondition
public class OrderSumCondition extends AccumulationStateCondition<Event> {

    @Override
    public void accumulate(Event value, Context<Event> ctx) throws Exception {
        Double sum = ctx.getAccumulator("sum(price)", DoubleSerializer.INSTANCE);
        ctx.setAccumulator("sum(price)", sum == null ? value.getPrice() : sum + value.getPrice(), DoubleSerializer.INSTANCE);
    }

    @Override
    public boolean filter(Event value, Context<Event> ctx) throws Exception {
        Double acc = ctx.getAccumulator("sum(price)", DoubleSerializer.INSTANCE);
Code Block
languagejava
titleOrderSumCondition
public class OrderSumCondition extends AggregationCondition<Event, Double, Double> {

    @Override
    public boolean filter(Event value, AggregationContext<Event, Double> ctx) throws Exception {
        return ctx.getAggregationResult("start", "sum") > 10.0;
    }

    @Override
    public Tuple2<String, AggregateFunction<Event, Double, Double>> getAggregateFunction()
            throws Exception {
        return new Tuple2<>(
                "sum",
                new AggregateFunction<Event, Double, Double>() {

                    @Override
                    public Double createAccumulator() {
                        return 0D;
                    }

                    @Override
                    public Double add(Event value, Double accumulator) {
                        return value.getPrice() + accumulator;
                    }

                    @Override
                    public Double getResult(Double accumulator) {
                        return accumulator;
                    }

                    @Override
                    public Double merge(Double a, Double b) {
                        return a + b;
                    }
        return acc !=  null && !(acc < }10.0);
    }
}


The following end-to-end example will show how to use user-defined OrderSumCondition to construct cep pattern and show what output is obtained from a specific datastream input. 

...

Code Block
languagejava
titleOrder Pattern Example
Pattern.begin("start")    
     
     .where((IterativeCondition) new .oneOrMoreOrderSumCondition())
        .until(new OrderSumCondition(within(Time.seconds(5L));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input =
    env.fromElements(
        new Event(1, "barfoo", 1.0),
        new Event(2, "barfoo", 2.0),
        new Event(3, "foobar", 3.0),
        new Event(4, "foobar", 5.0),
        new Event(5, "foobar", 42.0),
        new Event(6, "foobar", 1.0));
DataStream<String> result =
        CEP.pattern(input, pattern)
				.inProcessingTime()
                .flatSelect(
                        (p, o) -> {
                            o.collect(String.valueOf(p.get("start").get(0).getId()));
                        },
                        Types.STRING);
result.print()

...

Code Block
languagejava
titleoperations of Accumulator in SharedBufferAccessor
/**
 * The deserializer of input value
 */
private final DataInputDeserializer dataInputView;

/**
 * The Serializer of output value
 */
private final DataOutputSerializer dataOutputView;

/**
 * Update the constructor with input deserializer and output serializer
 */
SharedBufferAccessor(SharedBuffer<V> sharedBuffer) {
    this.sharedBuffer = sharedBuffer;
    this.dataInputView = new DataInputDeserializer();
    this.dataOutputView = new DataOutputSerializer(128);
}

/**
 * Gets the accumulator stored in {@link SharedBuffer}.
 *
 * @param stateName The name of the aggregationaccumulation state that needs to be unique.
 * @param computationState The {@link ComputationState} of NFA.
 * @param serializer The serializer of the accumulator.
 * @return An accumulator that accumulated value is stored.
 * @throws Exception This method may throw exceptions. Throwing an exception will fail to
 *   get the accumulator with the given aggregationaccumulation state.
 */
public <ACC> ACC getAccumulator(String stateName, ComputationState computationState, TypeSerializer<ACC> serializer) throws Exception {}


/**
 * Sets the accumulator into {@link SharedBuffer}.
 *
 * @param stateName The name of the aggregationaccumulation state that needs to be unique.
 * @param computationState The {@link ComputationState} of NFA.
 * @param accumulator The accumulator that accumulated value is stored.
 * @param serializer The serializer of the accumulator.
 * @throws Exception This method may throw exceptions. Throwing an exception will fail to
 *   set the accumulator with the given aggregationaccumulation state.
 */
public <ACC> void setAccumulator(String stateName, ComputationState computationState, ACC accumulator, TypeSerializer<ACC> serializer) throws Exception {}

/**
 * Removes the accumulator stored in {@link SharedBuffer}.
 *
 * @param computationState The {@link ComputationState} of NFA.
 * @throws Exception This method may throw exceptions. Throwing an exception will fail to
 *   remove the accumulator with the given aggregationaccumulation state.
 */
public void removeAccumulator(ComputationState computationState) throws Exception {}

...

When traversing stateTransitions with condition,  we call the accumulate method to process the aggregation accumulation operation in AggregationCondition AccumulationStateCondition if it is a AggregationCondition AccumulationStateCondition instance and clear a state in accumulator when SharedBuffer node is released  or NFA stop state is reached. AggregationCondition AccumulationStateCondition is a type of Condition that could get from the stateTransition. In terms of NFA execution, AggregationCondition AccumulationStateCondition is different from ordinary Condition. A stateTransition will only perform the aggregation accumulation operation once to avoid repeated aggregationaccumulation. For example, what the user actually wants is update the accumulate state when the take transition is met, but in fact,  the current state may not only have take transition, but also ignore transition or proceed transition. If all transitions of this state call a aggregation accumulation operation once, it will cause double calculation which is not desirable.

...