Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

There was a first side input design document here: https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit#. A second attempt to solve the problem, based on the first one, was started here: https://docs.google.com/document/d/1hqUmrLY_wPTeS5bqG36Qq9P8LeDjZ_db61ky7OQy1hw/edit#heading=h.z6bw9wg0jj23. The original mailing list discussion is available here: https://lists.apache.org/thread.html/797df0ba066151b77c7951fd7d603a8afd7023920d0607a0c6337db3@1462181294@%3Cdev.flink.apache.org%3E.

Motivation

Side inputs are an extension of the idea of broadcast sets from the DataSet API to the DataStream API. The name side input (inspired by a similar feature in Apache Beam) is preliminary but we chose to diverge from the name broadcast set because 1) it is not necessarily broadcast, as described below and 2) it is not a set. A side input is an additional input to an operation that itself can result from a streaming computation. We propose adding support for side inputs in two stages. First, we would add support for non-windowed side inputs and then we would add support for windowed side inputs.

...

All these cases incorporate some kind of waiting element: we have to buffer main-input data until a side input is available. The main-input itself can be either keyed or un-keyed and a side input can be either keyed (only if the main input is also keyed) or broadcast. In the former case the key must match the key of the main-input stream. If the side input is not keyed it has to be broadcast because otherwise results would depend on which parallel instance of an operation receives a given side input element.

Public Interfaces

 

 

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
  • Classes marked with the @Public annotation
  • On-disk binary formats, such as checkpoints/savepoints
  • User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
  • Configuration settings
Code Block
languagejava
titleProposed Side-Input interface
DataStream<String> mainStream = …
DataStream<String> sideStream = …

SingletonSideInput<String> filterString =
  new SingletonSideInput<>(sideStream);

mainStream
  .filter(new RichFilterFunction<>() {
    boolean filter(String in) {
      String sideValue =
        getRuntimeContext().getSideInput(filterString);
      return in.contains(sideValue);
    }
  }).withSideInput(filterString);


 

...

Proposed Changes

We will first go into the underlying changes before again describing the proposed API in more detail.

...

Having to set the side input before the operation can seem somewhat counterintuitive.

Managing side inputs at the level of StreamTask

We could handle both the buffering of elements and of side input data itself at the StreamTask instead of at the operator. The StreamTask would just provide an interface to the operator that would allow it to query the side input. In this scenario, the StreamTask would need to implement the record buffering (either per key-group or in operator state) described above. The advantage of this solution is that the operator gets a clearly defined interface and doesn't have to do much else. The disadvantage is that we would not have a more primitive interface that users could use if they have use cases that go beyond the capabilities offered by side inputs. If we decided to put the side input functionality at the operator level users would be able to override it to fit their behaviour.

Allowing a stream operator to be extended with arbitrary inputs

This would roughly follow the ideas laid out here: https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit#heading=h.pqg5z6g0mjm7. If we can arbitrarily add inputs to an operator we can also add (keyed or broadcast) inputs to an operator for dealing with side inputs. This would have the advantage that we also provide a general n-ary operator interface that users can use. The disadvantage is that we put more (somewhat complex) logic into the stream operator.

API Changes

TODO

Compatibility, Deprecation, and Migration Plan

...

There are not backwards compatibility/migration/deprecation concerns since this only adds new API.

Test Plan

The new feature will be tested with unit tests and integration tests.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other wayWe list some implementation alternatives above. Once we decide on a final solution we will move the rejected alternatives here for history purposes.