Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

IterativeCondition defines a user-defined condition that decides if an element should be accepted in the pattern or not. The condition iterates over the previously accepted elements in the pattern and decides to accept a new element or not based on some statistic over elements. The call to Context#getEventsForPattern(name) has to find the elements that belong to the pattern among the elements stored by the NFA. The cost of this operation can vary, for example, in certain aggregation scenarios , for example like filtering goods with more than 1,000 orders within 10 minutes, aggregation operation needs to perform mutiple times caller of getEventsForPattern in IterativeCondition. The aggregation behaivor causes the repeated calculation of the aggregation state, because an aggregation state may can execute multiple transitions with condition and each condition invoker will be accumulated once.

...

IterativeCondition with Aggregator is proposed to define an aggregation IterativeCondition with aggregator and filter the aggregation of matched elements in context. The aggregation state is consistent within the lifecycle of a matching NFA, on other words, user doesn't need to pay attention to when the aggregation state of NFA is initialized and cleaned up. In addtion, the introduction of AggregationCondition reduces the cost of the operation of Context#getEventsForPattern(name) getEventsForPattern for aggregation with the minimum times caller of this method.

...

Code Block
languagejava
titleAccumulationStateCondition
/**
 * A user-defined aggregation condition that decides if an aggregation should be accepted in the
 * pattern or not. Accepting an element also signals a state aggregation for the corresponding
 * {@link org.apache.flink.cep.nfa.NFA}.
 *
 * <p>A condition can be a aggregation condition that gets the aggregation over the previously
 * accepted elements in the pattern and decides to accept a new element or not based on some
 * statistic over these elements.
 *
 * <p>An iterative condition that accepts an element if i) its name is middle, and ii) the sum of
 * the prices of all accepted elements is less than {@code 5} would look like:
 *
 * <pre>{@code
 * private class MyCondition extends AggregationCondition<Event>AggregationCondition<Event, Double> {
 *
 * 		@Override
 *     	public boolean filter(Event value, AggregationContext<Event> ctx) throws Exception {
 *     		if (!value.getName().equals("middle")) {
 *     			return false;
 *     		}
 *
 *     		double sum = ctx.getAggregate("middle");
 *     		sum += value.getPrice();
 *     		return Double.compare(sum, 5.0) <= 0;
 *     	}
 *
 * 		@Override
 *     	public Tuple2<Aggregator<A>Tuple2<Aggregator<Double>, TypeSerializer<A>>TypeSerializer<Double>> getAggregator() throws Exception {
 *     		return new Tuple2(new DoubleSumAggregator(), DoubleSerializer.INSTANCE);
 *     	}
 *
 *    }
 * }</pre>
 *
 * <b>ATTENTION: </b> The call to {@link AggregationContext#getAggregate(String) getAggregate(...)}
 * has to get the aggregation of the elements that belong to the pattern among the elements stored
 * by the NFA.
 */
@PublicEvolving
public abstract class AggregationCondition<T, A extends Value> extends IterativeCondition<T> {

    private static final long serialVersionUID = 1L;

    public abstract Tuple2<Aggregator<A>, TypeSerializer<A>> getAggregator() throws Exception;

    /** The context used when evaluating the {@link AggregationCondition condition}. */
    public interface AggregationContext<T, A> extends Context<T> {

        /**
         * @return An {@link Aggregator}'s current aggregate over the already accepted elements for
         *     a given pattern.
         * @param name The name of the pattern.
         */
        A getAggregate(String name) throws Exception;
    }
}

...

Code Block
languagejava
titleOrderSumCondition
public class OrderSumCondition extends AggregationCondition<Event>AggregationCondition<Event, Double> {

    @Override
    public boolean filter(Event value, Context<Event> ctx) throws Exception {
        return ctx.getAggregate("start") > 10.0;     
    }

     @Override
     public Tuple2<Aggregator<A>Tuple2<Aggregator<Double>, TypeSerializer<A>>TypeSerializer<Double>> getAggregator() throws Exception {
        return new Tuple2(new DoubleSumAggregator(), DoubleSerializer.INSTANCE);
    }
}

...