Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleAccumulationStateCondition
/**
 * A user-defined aggregation condition that decides if an aggregation should be accepted in the
 * pattern or not. Accepting an element also signals a state aggregation for the corresponding
 * {@link org.apache.flink.cep.nfa.NFA}.
 *
 * <p>A condition can be a aggregation condition that gets the aggregation over the previously
 * accepted elements in the pattern and decides to accept a new element or not based on some
 * statistic over these elements.
 *
 * <p>An iterative condition that accepts an element if i) its name is middle, and ii) the sum of
 * the prices of all accepted elements is less than {@code 5} would look like:
 *
 * <pre>{@code
 * private class MyCondition extends AggregationCondition<Event, Double> {
 *
 * 		@Override
 *     	public boolean filter(Event value, AggregationContext<Event> ctx) throws Exception {
 *     		if (!value.getName().equals("middle")) {
 *     			return false;
 *     		}
 *
 *     		double sum = ctx.getAggregate("middle", "sum");
 *     		sum += value.getPrice();
 *     		return Double.compare(sum, 5.0) <= 0;
 *     	}
 *
 * 		@Override
 *     	public Tuple3<String, Aggregator<Double>Tuple2<Aggregator<Double>, TypeSerializer<Double>> getAggregator() throws Exception {
 *     		return new Tuple3Tuple2("sum", new DoubleSumAggregator(), DoubleSerializer.INSTANCE);
 *     	}
 *
 *    }
 * }</pre>
 *
 * <b>ATTENTION: </b> The call to {@link AggregationContext#getAggregate(String) getAggregate(...)}
 * has to get the aggregation of the elements that belong to the pattern among the elements stored
 * by the NFA.
 */
@PublicEvolving
public abstract class AggregationCondition<T, A extends Value> extends IterativeCondition<T> {

    private static final long serialVersionUID = 1L;

    public abstract Tuple2<Aggregator<A>, TypeSerializer<A>> getAggregator() throws Exception;

    /** The context used when evaluating the {@link AggregationCondition condition}. */
    public interface AggregationContext<T, A> extends Context<T> {

        /**
         * @return An {@link Aggregator}'s current aggregate over the already accepted elements for
         *     a given pattern.
         * @param patternName The name of the pattern.
         * @param aggregatorName The namem of the aggregator.
         */
        A getAggregate(String patternName, String aggregatorName) throws Exception;
    }
}


Example Usage

...

Code Block
languagejava
titleOrderSumCondition
public class OrderSumCondition extends AggregationCondition<Event, Double> {

    @Override
    public boolean filter(Event value, Context<Event> ctx) throws Exception {
        return ctx.getAggregate("start", "sum") > 10.0;     
    }

     @Override
     public Tuple3<StringTuple2<Aggregator<Double>, Aggregator<Double>, TypeSerializer<Double>> getAggregator() throws Exception {
        return new Tuple2("sum", new DoubleSumAggregator(), DoubleSerializer.INSTANCE);
    }
}

...