You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 17 Next »

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation


IterativeCondition defines a user-defined condition that decides if an element should be accepted in the pattern or not. The condition iterates over the previously accepted elements in the pattern and decides to accept a new element or not based on some statistic over elements. The call to Context#getEventsForPattern(name) has to find the elements that belong to the pattern among the elements stored by the NFA. The cost of this operation can vary, for example, in aggregation scenarios like filtering goods with more than 1,000 orders within 10 minutes, aggregation operation needs to perform mutiple times caller of getEventsForPattern in IterativeCondition. The aggregation behaivor causes the repeated calculation of the aggregation, because an aggregation can execute multiple transitions with condition and each condition invoker will be accumulated once.


IterativeCondition with AggregateFunction is proposed to define an aggregation IterativeCondition with aggregate function and filter the aggregation of matched elements in context. The aggregation is consistent within the lifecycle of a matching NFA, on other words, user doesn't need to pay attention to when the aggregation state of NFA is initialized and cleaned up. In addtion, the introduction of AggregationCondition reduces the cost of the operation of getEventsForPattern for aggregation with the minimum times caller of this method.


Public Interfaces


The following public interfaces are introduced to support IterativeCondition with AggregateFunction functional requirement.


Adds AggregationCondition 


AggregationCondition is proposed to define an aggregation condition that decides if an aggregation should be accepted in the pattern or not. Accepting an element also signals a state aggregation for the corresponding. AggregationContext is used when evaluating the AggregationCondition for the result of the aggregation. AggregationCondition provides the aggregation functionality with the AggregateFunction and filters the aggregation result with the AggregationContext.


AccumulationStateCondition
/**
 * A user-defined aggregation condition that decides if an aggregation should be accepted in the
 * pattern or not. Accepting an element also signals a state aggregation for the corresponding
 * {@link org.apache.flink.cep.nfa.NFA}.
 *
 * <p>A condition can be a aggregation condition that gets the aggregation over the previously
 * accepted elements in the pattern and decides to accept a new element or not based on some
 * statistic over these elements.
 *
 * <p>An iterative condition that accepts an element if i) its name is middle, and ii) the sum of
 * the prices of all accepted elements is less than {@code 5} would look like:
 *
 * <pre>{@code
 * private class MyCondition extends AggregationCondition<Event> {
 *
 * 		@Override
 *     	public boolean filter(Event value, AggregationContext<Event> ctx) throws Exception {
 *     		if (!value.getName().equals("middle")) {
 *     			return false;
 *     		}
 *
 *     		double sum = ctx.getAggregationResult("middle", "sum");
 *     		sum += value.getPrice();
 *     		return Double.compare(sum, 5.0) <= 0;
 *     	}
 *
 * 		@Override
 *     	public AggregateFunction<Event, Double, Double> getAggregateFunction() throws Exception {
 *     		return new Tuple2("sum", new AggregateFunction<Event, Double, Double>() {
 *                 private static final long serialVersionUID = 1L;
 *
 *                 @Override
 *                 public Double createAccumulator() {
 *                     return 0;
 *                 }
 *
 *                 @Override
 *                 public Double add(Event value, Double accumulator) {
 *                     return value.getPrice() + value;
 *                 }
 *
 *                 @Override
 *                 public Double getResult(Double accumulator) {
 *                     return accumulator.toString();
 *                 }
 *
 *                 @Override
 *                 public Double merge(Double a, Double b) {
 *                     return a + b;
 *                 }
 *             });
 *     	}
 *
 *    }
 * }</pre>
 *
 * <b>ATTENTION: </b> The call to {@link AggregationContext#getAggregationResult(String, String)
 * getAggregationResult(...)} has to get the aggregation of the elements that belong to the pattern
 * among the elements stored by the NFA.
 */
@PublicEvolving
public abstract class AggregationCondition<T, ACC, OUT> extends IterativeCondition<T> {

    private static final long serialVersionUID = 1L;

    @Override
    public boolean filter(T value, Context<T> ctx) throws Exception {
        return filter(value, (AggregationContext<T, OUT>) ctx);
    }

    /**
     * The filter function that evaluates the predicate with the aggregation context.
     *
     * @param value The value to be tested.
     * @param ctx The {@link Context} used for the evaluation of the function and provides access to
     *     the detail and aggregation of the already accepted events in the pattern (see {@link
     *     Context#getEventsForPattern(String)} and {@link
     *     AggregationContext#getAggregationResult(String, String)}).
     * @return {@code true} for values that should be retained, {@code false} for values to be
     *     filtered out.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    public abstract boolean filter(T value, AggregationContext<T, OUT> ctx) throws Exception;

    /**
     * Gets a user-defined {@link AggregateFunction} to aggregate the accepted events in the
     * pattern.
     *
     * @return An {@link AggregateFunction} to aggregate filtered events of the pattern.
     * @throws Exception Throws exception if creating an {@link AggregateFunction} fails.
     */
    public abstract Tuple2<String, AggregateFunction<T, ACC, OUT>> getAggregateFunction()
            throws Exception;

    /** The context used when evaluating the {@link AggregationCondition condition}. */
    public interface AggregationContext<T, OUT> extends Context<T> {

        /**
         * @return The result of the aggregation from the accumulator over the already accepted
         *     elements for a given pattern.
         * @param patternName The name of the pattern.
         * @param aggregateName The name of the aggregate.
         */
        OUT getAggregationResult(String patternName, String aggregateName) throws Exception;
    }
}

Example Usage


Example section provides code snippets that shows how to use AggregationCondition to define the aggregator of condition and filter the aggregation with the aggregator. For example, in the user transaction analysis  scenario: find order id with an order amount greater than 10 within 5 seconds. In the following example, we define OrderSumCondition which extends AggregationCondition to express that the accumulative order sum is greater than 10.


OrderSumCondition
public class OrderSumCondition extends AggregationCondition<Event, Double, Double> {

    @Override
    public boolean filter(Event value, AggregationContext<Event, Double> ctx) throws Exception {
        return ctx.getAggregationResult("start", "sum") > 10.0;
    }

    @Override
    public Tuple2<String, AggregateFunction<Event, Double, Double>> getAggregateFunction()
            throws Exception {
        return new Tuple2<>(
                "sum",
                new AggregateFunction<Event, Double, Double>() {

                    @Override
                    public Double createAccumulator() {
                        return 0D;
                    }

                    @Override
                    public Double add(Event value, Double accumulator) {
                        return value.getPrice() + accumulator;
                    }

                    @Override
                    public Double getResult(Double accumulator) {
                        return accumulator;
                    }

                    @Override
                    public Double merge(Double a, Double b) {
                        return a + b;
                    }
                });
    }
}


The following end-to-end example will show how to use user-defined OrderSumCondition to construct cep pattern and show what output is obtained from a specific datastream input. 


Order Pattern
Pattern.begin("start")         
       .oneOrMore()
       .until(new OrderSumCondition())
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input =
    env.fromElements(
        new Event(1, "barfoo", 1.0),
        new Event(2, "barfoo", 2.0),
        new Event(3, "foobar", 3.0),
        new Event(4, "foobar", 5.0),
        new Event(5, "foobar", 42.0),
        new Event(6, "foobar", 1.0));
DataStream<String> result =
        CEP.pattern(input, pattern)
				.inProcessingTime()
                .flatSelect(
                        (p, o) -> {
                            o.collect(String.valueOf(p.get("start").get(0).getId()));
                        },
                        Types.STRING);
result.print()


Based on the above example, we expect to obtain order id 4 and 5 will both be output. Because the total order amount for sequences with order id 1 to 4 is 11 which exceeds the threshold of 10 and the order amount of order id 5 is 42 that also exceeds the threshold of 10. When the result is printed we actually get order id 4 and 5.


Proposed Changes


Introduces AccumulatorId


AccumulatorId is introduced to identify the accumulator in SharedBuffer. AccumulatorId is a wrapper that accesses the accumulator in SharedBuffer, which contains the  name and version of accumulator, properties of ComputationState, and the built-in serializer.


AccumulatorId
public class AccumulatorId {

    /**
     * The accumulator version.
     */
    private final int version;

    /**
     * The {@link ComputationState} corresponding to accumulator.
     */
    private final ComputationState computationState;

    /**
     * The name of accumulator
     */
    private final String accumulatorKey;
    
    /**
    * The constructor of AccumulatorId
    */ 
    public AccumulatorId(ComputationState computationState, String accmulatorKey) {}


    /** Serializer for {@link AccumulatorId}. */
    public static class AccumulatorIdSerializer
            extends TypeSerializerSingleton<AccumulatorId> {

        /** Serializer configuration snapshot for compatibility and format evolution. */
        public static class AccumulatorIdSerializer extends SimpleTypeSerializerSnapshot<AccumulatorId> {
            public AccumulatorIdSerializerSnapshot() {}
        }
    }
}


Introduces the getter/setter of Accumulator in SharedBuffer


The accumulator passed in by the user will eventually be stored in the SharedBuffer and the accumulated value will be maintained here, therefore the SharedBuffer needs to have some supporting methods for the SharedBufferAccessor to call.


getter/setter of Accumulator
/**
 * Gets the accumulator id that already exists.

 * @return An {@link Iterator} over the accumulator id that already exists.
 * @throws Exception This method may throw exceptions. Throwing an exception will fail to
 *   get the {@link Iterator} of the {@link AccumulatorId}.
 */
public Iterator<AccumulatorId> getAccumulatorIdIterator() throws Exception {}

/**
 * Gets the accumulator by the specified {@link AccumulatorId}.
 *
 * @param accumulatorId The {@link SharedBuffer} access wrapper for accumulator id.
 * @return The serialized data of the accumulator.
 * @throws Exception This method may throw exceptions. Throwing an exception will fail to
 *   get the accumulator of the {@link AccumulatorId}.
 */
public byte[] getAccumulator(AccumulatorId accumulatorId) throws Exception {}

/**
 * Sets the accumulator by specified {@link AccumulatorId}.
 *
 * @param accumulatorId The {@link SharedBuffer} access wrapper for accumulator id.
 * @param data The serialized data of the accumulator.
 * @throws Exception This method may throw exceptions. Throwing an exception will fail to
 *   set the accumulator of the {@link AccumulatorId}.
 */
public void setAccumulator(AccumulatorId accumulatorId, byte[] data) throws Exception {}


Introduces operations of Accumulator in SharedBufferAccessor


Set of methods which access the accumulator in SharedBuffer need to cooperate with SharedBufferAccessor and both the serializer and deserializer need to be specified so that accumulator could be maintenanced by AccumulatorId which is a accumulator id wrapper.


operations of Accumulator in SharedBufferAccessor
/**
 * The deserializer of input value
 */
private final DataInputDeserializer dataInputView;

/**
 * The Serializer of output value
 */
private final DataOutputSerializer dataOutputView;

/**
 * Update the constructor with input deserializer and output serializer
 */
SharedBufferAccessor(SharedBuffer<V> sharedBuffer) {
    this.sharedBuffer = sharedBuffer;
    this.dataInputView = new DataInputDeserializer();
    this.dataOutputView = new DataOutputSerializer(128);
}

/**
 * Gets the accumulator stored in {@link SharedBuffer}.
 *
 * @param stateName The name of the aggregation state that needs to be unique.
 * @param computationState The {@link ComputationState} of NFA.
 * @param serializer The serializer of the accumulator.
 * @return An accumulator that accumulated value is stored.
 * @throws Exception This method may throw exceptions. Throwing an exception will fail to
 *   get the accumulator with the given aggregation state.
 */
public <ACC> ACC getAccumulator(String stateName, ComputationState computationState, TypeSerializer<ACC> serializer) throws Exception {}


/**
 * Sets the accumulator into {@link SharedBuffer}.
 *
 * @param stateName The name of the aggregation state that needs to be unique.
 * @param computationState The {@link ComputationState} of NFA.
 * @param accumulator The accumulator that accumulated value is stored.
 * @param serializer The serializer of the accumulator.
 * @throws Exception This method may throw exceptions. Throwing an exception will fail to
 *   set the accumulator with the given aggregation state.
 */
public <ACC> void setAccumulator(String stateName, ComputationState computationState, ACC accumulator, TypeSerializer<ACC> serializer) throws Exception {}

/**
 * Removes the accumulator stored in {@link SharedBuffer}.
 *
 * @param computationState The {@link ComputationState} of NFA.
 * @throws Exception This method may throw exceptions. Throwing an exception will fail to
 *   remove the accumulator with the given aggregation state.
 */
public void removeAccumulator(ComputationState computationState) throws Exception {}


Adds branch in NFA StateTransitions


When traversing stateTransitions with condition,  we call the accumulate method to process the aggregation operation in AggregationCondition if it is a AggregationCondition instance and clear a state in accumulator when SharedBuffer node is released  or NFA stop state is reached. AggregationCondition is a type of Condition that could get from the stateTransition. In terms of NFA execution, AggregationCondition is different from ordinary Condition. A stateTransition will only perform the aggregation operation once to avoid repeated aggregation. For example, what the user actually wants is update the accumulate state when the take transition is met, but in fact,  the current state may not only have take transition, but also ignore transition or proceed transition. If all transitions of this state call a aggregation operation once, it will cause double calculation which is not desirable.


Compatibility, Deprecation, and Migration Plan


No.


Test Plan


Unit tests of CEPStateInConditionTest will be added to verify the correctness of IterativeCondition with accumulator in Pattern.


Rejected Alternatives


No.

  • No labels