You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

This enhancement proposes an improvement to the current behavior of Window Evictor, by providing more control on how the elements are to be evicted from the Window. Original Design Document of this proposal can be found here

Status

Current state: Under Discussion

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Enhance-Window-Evictor-in-Flink-td12406.html

JIRAhttps://issues.apache.org/jira/browse/FLINK-4174

Released: 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Right now, the ability of Window Evictor is limited

  • The Evictor is called only before the WindowFunction. (There can be use cases where the elements have to be evicted after the WindowFunction is applied)
  • Elements are evicted only from the beginning of the Window. (There can be cases where we need to allow eviction of elements from anywhere within in the Window as per the eviction logic that user wish to implement)

The current interface of Evictor is this:

Evictor.java
public interface Evictor<T, W extends Window> extends Serializable {
/**
* Computes how many elements should be removed from the pane. The result specifies how
* many elements should be removed from the beginning. *
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window} */
int evict(Iterable<StreamRecord<T>> elements, int size, W window); 
}

The Evictor gets an Iterable for all the window elements and the number of elements in the window. Then it can return a number that specifies how many elements should be evicted, starting from the beginning of the window buffer.

Public Interfaces

  • Changes to the Evictor interface :  addition two new methods evictBefore and evictAfter, removal of the existing evict method
  • Corresponding changes to CountEvicot, DeltaEvictor
  • Renaming of the existing TimeEvictor as ProcessingTimeEvictor(since the current behavior of TimeEvictor does not consider the EventTime of the records) and addition of new class EventTimeEvictor.
  • New overloaded method CountEvictor.of() that takes an additional parameter doEvictAfter, which decides whether to do eviction after the WindowFunction.
  • Similar overloaded methods in DeltaEvictor, ProcessingTimeEvictor and EventTimeEvictor.

Proposed Changes

We propose to change the interface of Evictor to this

Evictor.java
public interface Evictor<T, W extends Window> extends Serializable {

   /**
    * Optionally evicts elements. Called before windowing function.
    * @param elements The elements currently in the pane.
    * @param size The current number of elements in the pane.
    * @param window The {@link Window}
    * @param evictorContext The context for the Evictor
     */
   void evictBefore(Iterable<StreamRecord<T>> elements, int size, W window, EvictorContext evictorContext);

   /**
    * Optionally evicts elements. Called after windowing function.
    * @param elements The elements currently in the pane.
    * @param size The current number of elements in the pane.
    * @param window The {@link Window}
    * @param evictorContext The context for the Evictor
    */
   void evictAfter(Iterable<StreamRecord<T>> elements, int size, W window, EvictorContext evictorContext);
 
   /**
   * A context object that is given to {@link Evictor} methods
   */
   interface EvictorContext {}

The Evictor has two methods, evictBefore  - called before the WindowFunction - and evictAfter - called after the WindowFunction. These methods receive an Iterable of all the elements in the pane and the number of elements in the pane. Evictor can choose to remove elements from the Iterable based on any condition that a user wish to implement. The EvictorContext is an interface very similar to TriggerContext but for evictors, I’m omitting it for brevity.

This model allows to express everything that was possible with the current model but also 

  • Allows eviction of elements from the “middle” of the buffer. 

  • Allows eviction to be done before and/or after the WindowFunction.

CountEvictor, DeltaEvictor, ProcessingTimeEvictor and EventTimeEvictor by default will evict elements before the window function. 

DeltaEvictor will always iterate through the entire elements and evict if the element has higher delta than the threshold. EventTimeEvictor will iterate through all the elements and remove the elements with timestamp <= evictCutoff. ProcessingTimeEvictor will evict elements from the window till it finds the first element that has a timestamp <= the evictCutoff (Since we are not concerned about out-of-order events). CountEvictor will only iterate through the elements till the count of elements in the window is reduced to specified size. 

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels