Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleAccumulationStateCondition
/**
 * A user-defined aggregation condition that decides if an aggregation should be accepted in the
 * pattern or not. Accepting an element also signals a state aggregation for the corresponding
 * {@link org.apache.flink.cep.nfa.NFA}.
 *
 * <p>A condition can be a aggregation condition that gets the aggregation over the previously
 * accepted elements in the pattern and decides to accept a new element or not based on some
 * statistic over these elements.
 *
 * <p>An iterative condition that accepts an element if i) its name is middle, and ii) the sum of
 * the prices of all accepted elements is less than {@code 5} would look like:
 *
 * <pre>{@code
 * private class MyCondition extends AggregationCondition<Event> {
 *
 * 		@Override
 *     	public boolean filter(Event value, AggregationContext<Event> ctx) throws Exception {
 *     		if (!value.getName().equals("middle")) {
 *     			return false;
 *     		}
 *
 *     		double sum = ctx.getAggregationResult("middle", "sum");
 *     		sum += value.getPrice();
 *     		return Double.compare(sum, 5.0) <= 0;
 *     	}
 *
 * 		@Override
 *     	public AggregateFunction<Event, Double, Double> getAggregateFunction() throws Exception {
 *     		return new Tuple2("sum", new AggregateFunction<Event, Double, Double>() {
 *                 private static final long serialVersionUID = 1L;
 *
 *                 @Override
 *                 public Double createAccumulator() {
 *                     return 0D0;
 *                 }
 *
 *                 @Override
 *                 public Double add(Event value, Double accumulator) {
 *                     return value.getPrice() + value;
 *                 }
 *
 *                 @Override
 *                 public Double getResult(Double accumulator) {
 *                     return accumulator.toString();
 *                 }
 *
 *                 @Override
 *                 public Double merge(Double a, Double b) {
 *                     return a + b;
 *                 }
 *             });
 *     	}
 *
 *    }
 * }</pre>
 *
 * <b>ATTENTION: </b> The call to {@link AggregationContext#getAggregationResult(String, String)
 * getAggregationResult(...)} has to get the aggregation of the elements that belong to the pattern
 * among the elements stored by the NFA.
 */
@PublicEvolving
public abstract class AggregationCondition<T, ACC, OUT> extends IterativeCondition<T> {

    private static final long serialVersionUID = 1L;

    @Override
    public boolean filter(T value, Context<T> ctx) throws Exception {
        return filter(value, (AggregationContext<T, OUT>) ctx);
    }

    public/**
  abstract boolean filter(T value,* AggregationContext<T,The OUT>filter ctx)function throws Exception;

    public abstract AggregateFunction<T, ACC, OUT> getAggregateFunction() throws Exception;

that evaluates the predicate with the aggregation context.
     *
    / ** The@param contextvalue usedThe whenvalue evaluatingto the {@link AggregationCondition condition}. */be tested.
    public interface* AggregationContext<T,@param OUT>ctx extends Context<T>The {

@link Context} used for the evaluation of the /**
function and provides access to
     * @return The result of the detail and aggregation fromof the accumulatoralready overaccepted theevents alreadyin accepted
the pattern (see {@link
      *     elements for a given pattern.Context#getEventsForPattern(String)} and {@link
         * @param name The name of the patternAggregationContext#getAggregationResult(String, String)}).
     * @return {@code true} */
for values that should be retained, {@code  OUT getAggregationResult(String name) throws Exception;
    }
}

Example Usage

Example section provides code snippets that shows how to use AggregationCondition to define the aggregator of condition and filter the aggregation with the aggregator. For example, in the user transaction analysis  scenario: find order id with an order amount greater than 10 within 5 seconds. In the following example, we define OrderSumCondition which extends AggregationCondition to express that the accumulative order sum is greater than 10.

false} for values to be
     *     filtered out.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    public abstract boolean filter(T value, AggregationContext<T, OUT> ctx) throws Exception;

    /**
     * Gets a user-defined {@link AggregateFunction} to aggregate the accepted events in the
     * pattern.
     *
     * @return An {@link AggregateFunction} to aggregate filtered events of the pattern.
     * @throws Exception Throws exception if creating an {@link AggregateFunction} fails.
     */
    public abstract Tuple2<String, AggregateFunction<T, ACC, OUT>> getAggregateFunction()
            throws Exception;

    /** The context used when evaluating the {@link AggregationCondition condition}. */
    public interface AggregationContext<T, OUT> extends Context<T> {

        /**
         * @return The result of the aggregation from the accumulator over the already accepted
         *     elements for a given pattern.
         * @param patternName The name of the pattern.
         * @param aggregateName The name of the aggregate.
         */
        OUT getAggregationResult(String patternName, String aggregateName) throws Exception;
    }
}


Example Usage


Example section provides code snippets that shows how to use AggregationCondition to define the aggregator of condition and filter the aggregation with the aggregator. For example, in the user transaction analysis  scenario: find order id with an order amount greater than 10 within 5 seconds. In the following example, we define OrderSumCondition which extends AggregationCondition to express that the accumulative order sum is greater than 10.


Code Block
languagejava
titleOrderSumCondition
public class OrderSumCondition extends AggregationCondition<Event, Double, Double> {

    @Override
    public boolean filter(Event value, AggregationContext<Event, Double> ctx) throws Exception {
        return ctx.getAggregationResult("start", "sum") > 10.0;
    }

    @Override
    public Tuple2<String, AggregateFunction<Event, Double, Double>> getAggregateFunction()
           
Code Block
languagejava
titleOrderSumCondition
public class OrderSumCondition extends AggregationCondition<Event, Double, Double> {

    @Override
    public boolean filter(Event value, AggregationContext<Event, Double> ctx) throws Exception {
        return ctx.getAggregationResult("start") > 10.0;
new Tuple2<>(
       }

    @Override
    public AggregateFunction<Event"sum",
 Double,  Double> getAggregateFunction() throws Exception {
        return new AggregateFunction<Event, Double, Double>() {

                    @Override
         @Override
            public Double createAccumulator() {
                        return 0D;
                    }

                    @Override
                    public Double add(Event value, Double accumulator) {
                        return value.getPrice() + accumulator;
) + accumulator;
                    }

                }

    @Override
        @Override
            public Double getResult(Double accumulator) {
                        return accumulator;
                    }

                    @Override
                    public Double merge(Double a, Double b) {
                        return a + b;
                    }
                });
    }
}


The following end-to-end example will show how to use user-defined OrderSumCondition to construct cep pattern and show what output is obtained from a specific datastream input. 

...