You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current stateUnder discussion

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Side inputs are an extension of the idea of broadcast sets from the DataSet API to the DataStream API. The name side input (inspired by a similar feature in Apache Beam) is preliminary but we chose to diverge from the name broadcast set because 1) it is not necessarily broadcast, as described below and 2) it is not a set. A side input is an additional input to an operation that itself can result from a streaming computation. We propose adding support for side inputs in two stages. First, we would add support for non-windowed side inputs and then we would add support for windowed side inputs.

Several use cases would be enabled by or can benefit from native support for side inputs. In general, most of these follow the pattern of joining a main stream of high throughput with one or several inputs of slowly changing or static data:

  • Join stream with static data: In this use case we have a high-throughput main input and a static set of data that we want to use to filter elements in the main stream or enrich them. We would first like to load the static data into internal structures and then stream by the main stream. Since we are in a streaming environment we would have to buffer arriving elements on the main input until we know that we completely read the side input. In many cases the main input is keyed and we also want to key the side input such that we don’t have to ship all side-input elements to all parallel instances of an operation. (This is one reason for not calling the feature broadcast stream/set).
  • Join stream with slowly evolving data: This is very similar to the above case but the side input that we use for enriching is evolving over time. This can be done by waiting for some initial data to be available before processing the main input and the continuously ingesting new data into the internal side input structure as it arrives.
  • Evolving or static Filter/Enriching:  In this case we would learn some global piece of data or model that should be used to filter or otherwise affect processing of all elements. This piece of information would be broadcast to all parallel operator instances. For example, the value that we broadcast could be the average length of words that continuously changes and we downstream want to filter out words that are below or above this average. In general, this would go into the direction of streaming machine learning and dynamically updating models. In case of dynamic changes we again need some sort of triggering and continuous ingestion of new data into the side input data structures.
  • Window-Based Joins: In this example we would have two keyed streams that we want to join on possibly different (time-)windows. We need to map the windows of the main input to the correct window of the side input and also buffer elements of the main input if the data in the corresponding side input is not yet available. This would build on the changes proposed here (Improving Windowing in Flink) to correctly be able to garbage collect the windowed data of the side input.

All these cases incorporate some kind of waiting element: we have to buffer main-input data until a side input is available. The main-input itself can be either keyed or un-keyed and a side input can be either keyed (only if the main input is also keyed) or broadcast. In the former case the key must match the key of the main-input stream. If the side input is not keyed it has to be broadcast because otherwise results would depend on which parallel instance of an operation receives a given side input element.

Public Interfaces

 

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
  • Classes marked with the @Public annotation
  • On-disk binary formats, such as checkpoints/savepoints
  • User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
  • Configuration settings
  • Exposed monitoring information


Proposed Changes

We will first go into the underlying changes before again describing the proposed API in more detail.

Buffering Elements

While waiting for a side input to become available we must buffer the elements from the main input. Depending on whether the main-input is keyed or not we need to store the buffered elements in either operator state or in key-group state. We cannot store them in keyed state because we need to iterate over all buffered elements when we determine that a side input is ready. We also think that element buffering should be scoped to windows; when we know that we have a side input ready for a given window we only need to process the buffered elements that were buffered for that window.

We propose to add a new Operator-level service (similar to the timer service and state backends) for buffering records with this interface:

RecordBuffer
/**
 * @param T type of records in the buffer
 * @param N type of the namespace used for scoping
 */
interface RecordBuffer<T, N> {
  /** Add records to buffer for given namespace. */
  addRecord(N namespace, T record);
  
  /** Returns an iterator over the elements stored for the given namespace. */
  Iterator<T> getRecords(N namespace);
 
  /** Removes all elements for the given namespace from the buffer. */
  void clear(N namespace);
}

depending on whether the operator is keyed or not it has to either provide a RecordBuffer that stores data in operator state when checkpointing or in key-group state. The key-group version of the RecordBuffer will have to be provided by the KeyedStateBackend while the non-keyed version can be provided by the StateBackend.

Storing Side-Input Data

For keyed operators with keyed side input we can simply store the side input in the KeyedStateBackend.

For non-keyed side input we need to store the side input in broadcast state. If we only allow broadcast state to be modified depending on the incoming (broadcast) side input data then this is state that is guaranteed to be the same on all parallel operator instances when checkpointing. Since side-input data can become somewhat big we should add a BroadcastStateBackend that has exactly the same interface as KeyedStateBackend but does not store data per key. When checkpointing, only one of the parallel instances of the operator needs to checkpoint its broadcast state. When restoring, all operators can be restored from that data.

Getting Side-Input Data into an Operator

There needs to be a way of somehow getting the actual side-input data into an operator. Right now, there are OneInputStreamOperator and TwoInputStreamOperator with have, respectively, one or two inputs. There can be zero or more side inputs. We now propose some ways of how this could be achieved

Using a tagged union input on the second input of a TwoInputStreamOperator

In this scenario, we would only allow side inputs for one-input operations. When we know that an operation is going to have side inputs we instantiate a special operator that reads the normal input on the first input and the (multiplexed) side input streams on the second input. For example, in addition the the normal StreamMap operator we would also have a StreamMapWithSideInputs operator that takes care of side inputs. For this scenario we would need to know what side inputs an operation has before instantiating the operator. This would (with the current API) lead to an API pattern like this:

Side Inputs before operation
DataStream<String> mainStream = …
DataStream<String> sideStream = …

BroadcastSingletonInput<String> filterString =
  new BroadcastSingletonInput<>(sideStream);

mainStream
  .map(new DoSomething())
  .withSideInputs(filterString) // this sets side input for following operation
  .filter(new RichFilterFunction<>() {
    boolean filter(String in) {
      String sideValue =
        getRuntimeContext().getSideInput(filterString);
      return in.contains(sideValue);
    }
  })


Having to set the side input before the operation can seem somewhat counterintuitive.

API Changes

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

The new feature will be tested with unit tests and integration tests.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels