Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Join stream with static data: In this use case we have a high-throughput main input and a static set of data that we want to use to filter elements in the main stream or enrich them. We would first like to load the static data into internal structures and then stream by the main stream. Since we are in a streaming environment we would have to buffer arriving elements on the main input until we know that we completely read the side input. In many cases the main input is keyed and we also want to key the side input such that we don’t have to ship all side-input elements to all parallel instances of an operation. (This is one reason for not calling the feature broadcast stream/set).
  • Join stream with slowly evolving data: This is very similar to the above case but the side input that we use for enriching is evolving over time. This can be done by waiting for some initial data to be available before processing the main input and the continuously ingesting new data into the internal side input structure as it arrives.
  • Evolving or static Filter/Enriching:  In this case we would learn some global piece of data or model that should be used to filter or otherwise affect processing of all elements. This piece of information would be broadcast to all parallel operator instances. For example, the value that we broadcast could be the average length of words that continuously changes and we downstream want to filter out words that are below or above this average. In general, this would go into the direction of streaming machine learning and dynamically updating models. In case of dynamic changes we again need some sort of triggering and continuous ingestion of new data into the side input data structures.
  • Window-Based Joins: In this example we would have two keyed streams that we want to join on possibly different (time-)windows. We need to map the windows of the main input to the correct window of the side input and also buffer elements of the main input if the data in the corresponding side input is not yet available. This would build on the changes proposed here (Improving Windowing in Flink) to correctly be able to garbage collect the windowed data of the side input.

Public Interfaces

The API needs two parts:

...

Code Block
languagejava
titleWindowed Side Input
// we have to specify windows/triggers on both of them...
WindowedStream<> mainStream = …
WindowedStream<T> sideStream = …

WindowedKeyedSingletonInput<> enrichInput =
  new WindowedKeyedSingletonInput<>(sideStream);

mainStream
  .apply( (key, window, values, out) -> {
    // this is scoped to the correct key and window
    T sideValue = getSideInput(enrichInput);
    for (value in values) {
      out.collect(value + sideValue);
    }
  })
  .withSideInput(enrichInput)

Side Input Semantics

We have to decide some things:

  • What side input does the user function "see" when processing a main input?
  • Do we wait for for side input to be available before processing a main-input element?
  • If yes, when do we consider a side input ready?

There are all sorts of tricky semantics that we could try and figure out but we propose to go with the minimal viable solution:

Side Input Visibility

Here we have to look at two cases which, if you think about it, boil down to one case in the end: non-windowed side input and windowed side input. For windowed main-input, the side input that the user function can access should be the side input for the same window or, for differing window functions, a "matching window". The case of non-windowed streams is covered by considering (theoretically) everything to be in the GlobalWindow.

Side Input Readyness

We propose that processing of a main input (in a given window) waits for the side input to be available for that window. We consider side input ready as soon as the first data arrives for that side input. If side input data is updated with more incoming data then successive main-input elements see the updated side input. 

Side Input Types

We propose several different types of side inputs (these are heavily influenced by the types of side input that are available in Apache Beam):

  • List/Iterable side input: The interface that a user function sees is a List/Bag, new side input data that arrives will be appended to the existing elements
  • Singleton Side Input: The interface that a user function sees is a single value of type T. We can either not allow several elements to arrive on the side input (for a given window) or replace the previous value when new data comes in.
  • Map Side Input: The interface that a user function sees is a Map<K, V> where the user function can iterate over all entries or access by a given key (this is not to be mistaken with keyed state). If new data for a key arrives we either replace the existing value or don't allow updates. This should be in line with singleton side input.
  • Multimap Side Input: The interface that a user function sees is a Map<K, Iterable<V>>. This is the combination of Map and List/Bag side input.

Proposed Changes

We will first go into the underlying changes before again describing the proposed API in more detail.

...

This would roughly follow the ideas laid out here: https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit#heading=h.pqg5z6g0mjm7. If we can arbitrarily add inputs to an operator we can also add (keyed or broadcast) inputs to an operator for dealing with side inputs. This would have the advantage that we also provide a general n-ary operator interface that users can use. The disadvantage is that we put more (somewhat complex) logic into the stream operator.

API Changes

TODO

Compatibility, Deprecation, and Migration Plan

There are not backwards compatibility/migration/deprecation concerns since this only adds new API.

...