Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Side outputs(a.k.a Multi-outputs)  is one of highly requested features in high fidelity stream processing use cases. With this feature, Flink can

 

  • Side output corrupted input data and avoid job fall into “fail -> restart -> fail” cycle

 

  • Side output sparsely received late arriving events while issuing aggressive watermarks in window computation.

...

User may declare multiple output tags and use get output stream with one previously defined outputtag.

final OutputTag<String> sideOutput1 = new OutputTag<String>() {}; 

Update userFunctions using RichCollector(Or MultiCollector) the rationale behind is Collector has been used in lot of places other than stream and transformation, adding direct to Collector interface will introduce many empty methods in those classes.

public interface RichCollector<T> extends Collector<T>{

...

 <S> void collect(OutputTag<S> tag, S value);
}

FlatMapFunction as example

flatMap(String value, RichCollector<Tuple2<String, Integer>> out){
out.collect(new Tuple2<String, Integer>(token, 1));
collector.collect(sideOutput1, "sideout");
}

WindowFunction

public interface

...

 WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable{
void apply(KEY key, W window,

...

 Iterable<IN> input,

...

 RichCollector<OUT> out) throws Exception;
}

User may pass outputtag defined earlier here and get a corresponding outputstream.

flatMap(..).getSideOutput(sideOutput1) 

...

It allows good compatibility without drastic change of current “single typed” output model.

public

...

 class SideOutputTransformation<T> extends StreamTransformation<T> {
  public 

...

SideOutputTransformation(StreamTransformation input,

...

 OutputTag<T> tag) {

...

     super("SideOutput", tag.getTypeInformation(), input.getParallelism());

...

     this.input = input;
  }

...

In current form, many functions assume single OUT type.

@Public
public interface

...

 JoinFunction<IN1, IN2,

...

 OUT> extends Function, Serializable {

...

  OUT join(IN1 first, IN2 second) throws Exception;
}
...
public void join(IN1 left, IN2 right,

...

 Collector<OUT> out) throws Exception {
  out.collect(this.wrappedFunction.join(left, right));
}

In order to make it working for multiple outputs, it should be able to output multiple output types. I think just change join implementation to RichCollector fail to expose flexible collector.collect(tag, value) to user. A possible approach to this problem is to expose composite return type other than OUT where user may choose to return OUT typed instance or <OutputTag, value> tuple or pure unified <OutputTag, value> tuple where framework internal handles build instance from user return. It allocate default OutputTag for default OUT record.

...