Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For non-keyed side input we need to store the side input in broadcast state. If we only allow broadcast state to be modified depending on the incoming (broadcast) side input data then this is state that is guaranteed to be the same on all parallel operator instances when checkpointing. Since side-input data can become somewhat big we should add a BroadcastStateBackend that has exactly the same interface as KeyedStateBackend but does not store data per key. When checkpointing, only one of the parallel instances of the operator needs to checkpoint its broadcast state. When restoring, all operators can be restored from that data.

Getting Side-Input Data into an Operator

There needs to be a way of somehow getting the actual side-input data into an operator. Right now, there are OneInputStreamOperator and TwoInputStreamOperator with have, respectively, one or two inputs. There can be zero or more side inputs. We now propose some ways of how this could be achieved

Using a tagged union input on the second input of a TwoInputStreamOperator

In this scenario, we would only allow side inputs for one-input operations. When we know that an operation is going to have side inputs we instantiate a special operator that reads the normal input on the first input and the (multiplexed) side input streams on the second input. For example, in addition the the normal StreamMap operator we would also have a StreamMapWithSideInputs operator that takes care of side inputs. For this scenario we would need to know what side inputs an operation has before instantiating the operator. This would (with the current API) lead to an API pattern like this:

Code Block
languagejava
titleSide Inputs before operation
DataStream<String> mainStream = …
DataStream<String> sideStream = …

BroadcastSingletonInput<String> filterString =
  new BroadcastSingletonInput<>(sideStream);

mainStream
  .map(new DoSomething())
  .withSideInputs(filterString) // this sets side input for following operation
  .filter(new RichFilterFunction<>() {
    boolean filter(String in) {
      String sideValue =
        getRuntimeContext().getSideInput(filterString);
      return in.contains(sideValue);
    }
  })


Having to set the side input before the operation can seem somewhat counterintuitive.

API Changes

Compatibility, Deprecation, and Migration Plan

...