THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * A user-defined aggregation condition that decides if an aggregation should be accepted in the * pattern or not. Accepting an element also signals a state aggregation for the corresponding * {@link org.apache.flink.cep.nfa.NFA}. * * <p>A condition can be a aggregation condition that gets the aggregation over the previously * accepted elements in the pattern and decides to accept a new element or not based on some * statistic over these elements. * * <p>An iterative condition that accepts an element if i) its name is middle, and ii) the sum of * the prices of all accepted elements is less than {@code 5} would look like: * * <pre>{@code * private class MyCondition extends AggregationCondition<Event> { * * @Override * public boolean filter(Event value, AggregationContext<Event> ctx) throws Exception { * if (!value.getName().equals("middle")) { * return false; * } * * double sum = ctx.getAggregationResult("middle", "sum"); * sum += value.getPrice(); * return Double.compare(sum, 5.0) <= 0; * } * * @Override * public AggregateFunction<Event, Double, Double> getAggregateFunction() throws Exception { * return new Tuple3("sum", new AggregateFunction<Event, Double, Double>() { * private static final long serialVersionUID = 1L; * * @Override * public Double createAccumulator() { * return 0; * } * * @Override * public Double add(Event value, Double accumulator) { * return value.getPrice() + value; * } * * @Override * public Double getResult(Double accumulator) { * return accumulator.toString(); * } * * @Override * public Double merge(Double a, Double b) { * return a + b; * } * }, * DoubleSerializer.INSTANCE); * } * * } * }</pre> * * <b>ATTENTION: </b> The call to {@link AggregationContext#getAggregationResult(String, String) * getAggregationResult(...)} has to get the aggregation of the elements that belong to the pattern * among the elements stored by the NFA. */ @PublicEvolving public abstract class AggregationCondition<T, ACC, OUT> extends IterativeCondition<T> { private static final long serialVersionUID = 1L; @Override public boolean filter(T value, Context<T> ctx) throws Exception { return filter(value, (AggregationContext<T, OUT>) ctx); } /** * The filter function that evaluates the predicate with the aggregation context. * * @param value The value to be tested. * @param ctx The {@link Context} used for the evaluation of the function and provides access to * the detail and aggregation of the already accepted events in the pattern (see {@link * Context#getEventsForPattern(String)} and {@link * AggregationContext#getAggregationResult(String, String)}). * @return {@code true} for values that should be retained, {@code false} for values to be * filtered out. * @throws Exception This method may throw exceptions. Throwing an exception will cause the * operation to fail and may trigger recovery. */ public abstract boolean filter(T value, AggregationContext<T, OUT> ctx) throws Exception; /** * Gets a user-defined {@link AggregateFunction} to aggregate the accepted events in the * pattern. * * @return An {@link AggregateFunction} to aggregate filtered events of the pattern. * @throws Exception Throws exception if creating an {@link AggregateFunction} fails. */ public abstract Tuple3<String, AggregateFunction<T, ACC, OUT>, TypeSerializer<ACC>> getAggregateFunction() throws Exception; /** The context used when evaluating the {@link AggregationCondition condition}. */ public interface AggregationContext<T, OUT> extends Context<T> { /** * @return The result of the aggregation from the accumulator over the already accepted * elements for a given pattern. * @param patternName The name of the pattern. * @param aggregateName The name of the aggregate. */ OUT getAggregationResult(String patternName, String aggregateName) throws Exception; } } |
...