Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleAccumulationStateCondition
/**
 * A user-defined aggregation condition that decides if an aggregation should be accepted in the
 * pattern or not. Accepting an element also signals a state aggregation for the corresponding
 * {@link org.apache.flink.cep.nfa.NFA}.
 *
 * <p>A condition can be a aggregation condition that gets the aggregation over the previously
 * accepted elements in the pattern and decides to accept a new element or not based on some
 * statistic over these elements.
 *
 * <p>An iterative condition that accepts an element if i) its name is middle, and ii) the sum of
 * the prices of all accepted elements is less than {@code 5} would look like:
 *
 * <pre>{@code
 * private class MyCondition extends AggregationCondition<Event> {
 *
 * 		@Override
 *     	public boolean filter(Event value, AggregationContext<Event> ctx) throws Exception {
 *     		if (!value.getName().equals("middle")) {
 *     			return false;
 *     		}
 *
 *     		double sum = ctx.getAggregationResult("middle", "sum");
 *     		sum += value.getPrice();
 *     		return Double.compare(sum, 5.0) <= 0;
 *     	}
 *
 * 		@Override
 *     	public AggregateFunction<Event, Double, Double> getAggregateFunction() throws Exception {
 *     		return new Tuple2Tuple3("sum", new AggregateFunction<Event, Double, Double>() {
 *                 private static final long serialVersionUID = 1L;
 *
 *                 @Override
 *                 public Double createAccumulator() {
 *                     return 0;
 *                 }
 *
 *                 @Override
 *                 public Double add(Event value, Double accumulator) {
 *                     return value.getPrice() + value;
 *                 }
 *
 *                 @Override
 *                 public Double getResult(Double accumulator) {
 *                     return accumulator.toString();
 *                 }
 *
 *                 @Override
 *                 public Double merge(Double a, Double b) {
 *                     return a + b;
 *                 }
 *             },
 *             DoubleSerializer.INSTANCE);
 *     	}
 *
 *    }
 * }</pre>
 *
 * <b>ATTENTION: </b> The call to {@link AggregationContext#getAggregationResult(String, String)
 * getAggregationResult(...)} has to get the aggregation of the elements that belong to the pattern
 * among the elements stored by the NFA.
 */
@PublicEvolving
public abstract class AggregationCondition<T, ACC, OUT> extends IterativeCondition<T> {

    private static final long serialVersionUID = 1L;

    @Override
    public boolean filter(T value, Context<T> ctx) throws Exception {
        return filter(value, (AggregationContext<T, OUT>) ctx);
    }

    /**
     * The filter function that evaluates the predicate with the aggregation context.
     *
     * @param value The value to be tested.
     * @param ctx The {@link Context} used for the evaluation of the function and provides access to
     *     the detail and aggregation of the already accepted events in the pattern (see {@link
     *     Context#getEventsForPattern(String)} and {@link
     *     AggregationContext#getAggregationResult(String, String)}).
     * @return {@code true} for values that should be retained, {@code false} for values to be
     *     filtered out.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    public abstract boolean filter(T value, AggregationContext<T, OUT> ctx) throws Exception;

    /**
     * Gets a user-defined {@link AggregateFunction} to aggregate the accepted events in the
     * pattern.
     *
     * @return An {@link AggregateFunction} to aggregate filtered events of the pattern.
     * @throws Exception Throws exception if creating an {@link AggregateFunction} fails.
     */
    public abstract Tuple2<StringTuple3<String, AggregateFunction<T, ACC, OUT>> getAggregateFunction()OUT>, TypeSerializer<ACC>>
            getAggregateFunction() throws Exception;

    /** The context used when evaluating the {@link AggregationCondition condition}. */
    public interface AggregationContext<T, OUT> extends Context<T> {

        /**
         * @return The result of the aggregation from the accumulator over the already accepted
         *     elements for a given pattern.
         * @param patternName The name of the pattern.
         * @param aggregateName The name of the aggregate.
         */
        OUT getAggregationResult(String patternName, String aggregateName) throws Exception;
    }
}

...

Code Block
languagejava
titleOrderSumCondition
public class OrderSumCondition extends AggregationCondition<Event, Double, Double> {

    @Override
    public boolean filter(Event value, AggregationContext<Event, Double> ctx) throws Exception {
        return ctx.getAggregationResult("start", "sum") > 10.0;
    }

    @Override
    public Tuple2<StringTuple3<String, AggregateFunction<Event, Double, Double>> getAggregateFunction()
Double>, TypeSerializer<Double>>
            getAggregateFunction() throws Exception {
        return new Tuple2<>Tuple3<>(
                "sum",
                new AggregateFunction<Event, Double, Double>() {

                    @Override
                    public Double createAccumulator() {
                        return 0D;
                    }

                    @Override
                    public Double add(Event value, Double accumulator) {
                        return value.getPrice() + accumulator;
                    }

                    @Override
                    public Double getResult(Double accumulator) {
                        return accumulator;
                    }

                    @Override
                    public Double merge(Double a, Double b) {
                        return a + b;
                    }
                },
                DoubleSerializer.INSTANCE);
    }
}


The following end-to-end example will show how to use user-defined OrderSumCondition to construct cep pattern and show what output is obtained from a specific datastream input. 

...