Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

 This enhancement proposes an improvement to the current behavior of Window Evictor, by providing more control on how the elements are to be evicted from the Window. Original Design Document of this proposal can be found here

Status

Current state: Under Discussion

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Enhance-Window-Evictor-in-Flink-td12406.html

JIRAhttps://issues.apache.org/jira/browse/FLINK-4174

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-4174

Release1.2

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

This enhancement proposes an improvement to the current behavior of Window Evictor, by providing more control on how the elements are to be evicted from the Window. Origianal Design Document of this proposal can be found here

 

Motivation

Right now, the ability of Window Evictor is limited

...

  • Changes to the Evictor interface :  addition of two new methods evictBefore and evictAfter, removal of the existing evict method
  • Corresponding changes to CountEvicotCountEvictor, DeltaEvictor
  • Renaming of the existing TimeEvictor as ProcessingTimeEvictor and addition of new class EventTime evictorNew class TimestampedValue to store records with a timestamp. This class is exposed to the users in the evictBefore and evictAfter methods.
  • New overloaded method CountEvictor.of() that takes an additional parameter doEvictAfter, which decides whether to do eviction after the WindowFunction.
  • Similar overloaded methods in DeltaEvictor , ProcessingTimeEvictor and EventTimeEvictor.

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
  • Classes marked with the @Public annotation
  • On-disk binary formats, such as checkpoints/savepoints
  • User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
  • Configuration settings
  • Exposed monitoring information

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

...

  • and TimeEvictor

Proposed Changes

We propose to change the interface of Evictor to this

Code Block
languagejava
themeConfluence
titleEvictor.java
linenumberstrue
public interface Evictor<T, W extends Window> extends Serializable {

   /**
    * Optionally evicts elements. Called before windowing function.
    * @param elements The elements currently in the pane.
    * @param size The current number of elements in the pane.
    * @param window The {@link Window}
    * @param evictorContext The context for the Evictor
     */
   void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

   /**
    * Optionally evicts elements. Called after windowing function.
    * @param elements The elements currently in the pane.
    * @param size The current number of elements in the pane.
    * @param window The {@link Window}
    * @param evictorContext The context for the Evictor
    */
   void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
 
  /**
   * A context object that is given to {@link Evictor} methods
   */
   interface EvictorContext {}
}

The Evictor has two methods, evictBefore  - called before the WindowFunction - and evictAfter - called after the WindowFunction. These methods receive an Iterable of all the elements in the pane and the number of elements in the pane. Evictor can choose to remove elements from the Iterable based on any condition that a user wishes to implement. The EvictorContext is an interface very similar to TriggerContext but for Evictors, I’m omitting it for brevity.

This model allows to express everything that was possible with the current model but also 

  • Allows eviction of elements from the “middle” of the buffer. 

  • Allows eviction to be done before and/or after the WindowFunction.

CountEvictor, DeltaEvictor, and TimeEvictor by default will evict elements before the window function. 

DeltaEvictor will iterate through the entire elements and evict if the element has higher delta than the threshold. TimeEvictor will evict all the elements that have a timestamp <= the evictCutoff. CountEvictor will only iterate through the elements till the count of elements in the window is reduced to specified size. CountEvictor will do the iteration only till the count of elements in the window is reduced to specified size. 

Overloaded method in DeltaEvictor to make the Evictor evict after the windowFunction:

Code Block
languagejava
themeConfluence
titleDeltaEvictor.java
linenumberstrue
    /**
	 *
	 * @param threshold The threshold
	 * @param deltaFunction The {@code DeltaFunction}
	 * @param doEvictAfter Whether eviction should be done after window function
     * @return
     */
	public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction, boolean doEvictAfter) {
		return new DeltaEvictor<>(threshold, deltaFunction, doEvictAfter);
	}

Similar overloaded methods are added to CountEvictor and TimeEvictor.

Compatibility, Deprecation, and Migration Plan

  • Users currently using DeltaEvictor will have to be aware that the new implementation will apply the delta function to all the elements in the window(not till it finds the first element in the pane)
  • Users who have currently implemented a custom Evictor will have to adapt to the new interface of the Evictor.
  • CountEvictor will behave the same even after these changes.
  • By default, all Evictors will evict before the WindowFunction, which matches the existing behavior.