You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current stateUnder discussion

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

There was a first side input design document here: https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit#. A second attempt to solve the problem, based on the first one, was started here: https://docs.google.com/document/d/1hqUmrLY_wPTeS5bqG36Qq9P8LeDjZ_db61ky7OQy1hw/edit#heading=h.z6bw9wg0jj23. The original mailing list discussion is available here: https://lists.apache.org/thread.html/797df0ba066151b77c7951fd7d603a8afd7023920d0607a0c6337db3@1462181294@%3Cdev.flink.apache.org%3E.

Motivation

Side inputs are an extension of the idea of broadcast sets from the DataSet API to the DataStream API. The name side input (inspired by a similar feature in Apache Beam) is preliminary but we chose to diverge from the name broadcast set because 1) it is not necessarily broadcast, as described below and 2) it is not a set. A side input is an additional input to an operation that itself can result from a streaming computation. We propose adding support for side inputs in two stages. First, we would add support for non-windowed side inputs and then we would add support for windowed side inputs.

Several use cases would be enabled by or can benefit from native support for side inputs. In general, most of these follow the pattern of joining a main stream of high throughput with one or several inputs of slowly changing or static data:

  • Join stream with static data: In this use case we have a high-throughput main input and a static set of data that we want to use to filter elements in the main stream or enrich them. We would first like to load the static data into internal structures and then stream by the main stream. Since we are in a streaming environment we would have to buffer arriving elements on the main input until we know that we completely read the side input. In many cases the main input is keyed and we also want to key the side input such that we don’t have to ship all side-input elements to all parallel instances of an operation. (This is one reason for not calling the feature broadcast stream/set).
  • Join stream with slowly evolving data: This is very similar to the above case but the side input that we use for enriching is evolving over time. This can be done by waiting for some initial data to be available before processing the main input and the continuously ingesting new data into the internal side input structure as it arrives.
  • Evolving or static Filter/Enriching:  In this case we would learn some global piece of data or model that should be used to filter or otherwise affect processing of all elements. This piece of information would be broadcast to all parallel operator instances. For example, the value that we broadcast could be the average length of words that continuously changes and we downstream want to filter out words that are below or above this average. In general, this would go into the direction of streaming machine learning and dynamically updating models. In case of dynamic changes we again need some sort of triggering and continuous ingestion of new data into the side input data structures.
  • Window-Based Joins: In this example we would have two keyed streams that we want to join on possibly different (time-)windows. We need to map the windows of the main input to the correct window of the side input and also buffer elements of the main input if the data in the corresponding side input is not yet available. This would build on the changes proposed here (Improving Windowing in Flink) to correctly be able to garbage collect the windowed data of the side input.

All these cases incorporate some kind of waiting element: we have to buffer main-input data until a side input is available. The main-input itself can be either keyed or un-keyed and a side input can be either keyed (only if the main input is also keyed) or broadcast. In the former case the key must match the key of the main-input stream. If the side input is not keyed it has to be broadcast because otherwise results would depend on which parallel instance of an operation receives a given side input element.

Public Interfaces

 

 

Proposed Side-Input interface
DataStream<String> mainStream = …
DataStream<String> sideStream = …

SingletonSideInput<String> filterString =
  new SingletonSideInput<>(sideStream);

mainStream
  .filter(new RichFilterFunction<>() {
    boolean filter(String in) {
      String sideValue =
        getRuntimeContext().getSideInput(filterString);
      return in.contains(sideValue);
    }
  }).withSideInput(filterString);


 

Proposed Changes

We will first go into the underlying changes before again describing the proposed API in more detail.

Buffering Elements

While waiting for a side input to become available we must buffer the elements from the main input. Depending on whether the main-input is keyed or not we need to store the buffered elements in either operator state or in key-group state. We cannot store them in keyed state because we need to iterate over all buffered elements when we determine that a side input is ready. We also think that element buffering should be scoped to windows; when we know that we have a side input ready for a given window we only need to process the buffered elements that were buffered for that window.

We propose to add a new Operator-level service (similar to the timer service and state backends) for buffering records with this interface:

RecordBuffer
/**
 * @param T type of records in the buffer
 * @param N type of the namespace used for scoping
 */
interface RecordBuffer<T, N> {
  /** Add records to buffer for given namespace. */
  addRecord(N namespace, T record);
  
  /** Returns an iterator over the elements stored for the given namespace. */
  Iterator<T> getRecords(N namespace);
 
  /** Removes all elements for the given namespace from the buffer. */
  void clear(N namespace);
}

depending on whether the operator is keyed or not it has to either provide a RecordBuffer that stores data in operator state when checkpointing or in key-group state. The key-group version of the RecordBuffer will have to be provided by the KeyedStateBackend while the non-keyed version can be provided by the StateBackend.

Storing Side-Input Data

For keyed operators with keyed side input we can simply store the side input in the KeyedStateBackend.

For non-keyed side input we need to store the side input in broadcast state. If we only allow broadcast state to be modified depending on the incoming (broadcast) side input data then this is state that is guaranteed to be the same on all parallel operator instances when checkpointing. Since side-input data can become somewhat big we should add a BroadcastStateBackend that has exactly the same interface as KeyedStateBackend but does not store data per key. When checkpointing, only one of the parallel instances of the operator needs to checkpoint its broadcast state. When restoring, all operators can be restored from that data.

Getting Side-Input Data into an Operator

There needs to be a way of somehow getting the actual side-input data into an operator. Right now, there are OneInputStreamOperator and TwoInputStreamOperator with have, respectively, one or two inputs. There can be zero or more side inputs. We now propose some ways of how this could be achieved

Using a tagged union input on the second input of a TwoInputStreamOperator

In this scenario, we would only allow side inputs for one-input operations. When we know that an operation is going to have side inputs we instantiate a special operator that reads the normal input on the first input and the (multiplexed) side input streams on the second input. For example, in addition the the normal StreamMap operator we would also have a StreamMapWithSideInputs operator that takes care of side inputs. For this scenario we would need to know what side inputs an operation has before instantiating the operator. This would (with the current API) lead to an API pattern like this:

Side Inputs before operation
DataStream<String> mainStream = …
DataStream<String> sideStream = …

BroadcastSingletonInput<String> filterString =
  new BroadcastSingletonInput<>(sideStream);

mainStream
  .map(new DoSomething())
  .withSideInputs(filterString) // this sets side input for following operation
  .filter(new RichFilterFunction<>() {
    boolean filter(String in) {
      String sideValue =
        getRuntimeContext().getSideInput(filterString);
      return in.contains(sideValue);
    }
  })


Having to set the side input before the operation can seem somewhat counterintuitive.

Managing side inputs at the level of StreamTask

We could handle both the buffering of elements and of side input data itself at the StreamTask instead of at the operator. The StreamTask would just provide an interface to the operator that would allow it to query the side input. In this scenario, the StreamTask would need to implement the record buffering (either per key-group or in operator state) described above. The advantage of this solution is that the operator gets a clearly defined interface and doesn't have to do much else. The disadvantage is that we would not have a more primitive interface that users could use if they have use cases that go beyond the capabilities offered by side inputs. If we decided to put the side input functionality at the operator level users would be able to override it to fit their behaviour.

Allowing a stream operator to be extended with arbitrary inputs

This would roughly follow the ideas laid out here: https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit#heading=h.pqg5z6g0mjm7. If we can arbitrarily add inputs to an operator we can also add (keyed or broadcast) inputs to an operator for dealing with side inputs. This would have the advantage that we also provide a general n-ary operator interface that users can use. The disadvantage is that we put more (somewhat complex) logic into the stream operator.

API Changes

TODO

Compatibility, Deprecation, and Migration Plan

There are not backwards compatibility/migration/deprecation concerns since this only adds new API.

Test Plan

The new feature will be tested with unit tests and integration tests.

Rejected Alternatives

We list some implementation alternatives above. Once we decide on a final solution we will move the rejected alternatives here for history purposes.

  • No labels