THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * A user-defined aggregation condition that decides if an aggregation should be accepted in the * pattern or not. Accepting an element also signals a state aggregation for the corresponding * {@link org.apache.flink.cep.nfa.NFA}. * * <p>A condition can be a aggregation condition that gets the aggregation over the previously * accepted elements in the pattern and decides to accept a new element or not based on some * statistic over these elements. * * <p>An iterative condition that accepts an element if i) its name is middle, and ii) the sum of * the prices of all accepted elements is less than {@code 5} would look like: * * <pre>{@code * private class MyCondition extends AggregationCondition<Event, Double> { * * @Override * public boolean filter(Event value, AggregationContext<Event> ctx) throws Exception { * if (!value.getName().equals("middle")) { * return false; * } * * double sum = ctx.getAggregate("middle", "sum"); * sum += value.getPrice(); * return Double.compare(sum, 5.0) <= 0; * } * * @Override * public Tuple2<Aggregator<Double>Tuple3<String, Aggregator<Double>, TypeSerializer<Double>> getAggregator() throws Exception { * return new Tuple2Tuple3("sum", new DoubleSumAggregator(), DoubleSerializer.INSTANCE); * } * * } * }</pre> * * <b>ATTENTION: </b> The call to {@link AggregationContext#getAggregate(String) getAggregate(...)} * has to get the aggregation of the elements that belong to the pattern among the elements stored * by the NFA. */ @PublicEvolving public abstract class AggregationCondition<T, A extends Value> extends IterativeCondition<T> { private static final long serialVersionUID = 1L; public abstract Tuple2<Aggregator<A>, TypeSerializer<A>> getAggregator() throws Exception; /** The context used when evaluating the {@link AggregationCondition condition}. */ public interface AggregationContext<T, A> extends Context<T> { /** * @return An {@link Aggregator}'s current aggregate over the already accepted elements for * a given pattern. * @param namepatternName The name of the pattern. * @param aggregatorName The namem of the aggregator. */ A getAggregate(String patternName, String nameaggregatorName) throws Exception; } } |
Example Usage
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class OrderSumCondition extends AggregationCondition<Event, Double> { @Override public boolean filter(Event value, Context<Event> ctx) throws Exception { return ctx.getAggregate("start", "sum") > 10.0; } @Override public Tuple3<String, Tuple2<Aggregator<Double>Aggregator<Double>, TypeSerializer<Double>> getAggregator() throws Exception { return new Tuple2("sum", new DoubleSumAggregator(), DoubleSerializer.INSTANCE); } } |
...