Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Side output sparsely received late arriving events while issuing aggressive watermarks in window computation.

Public Interfaces

  • Add org.apache.flink.util.RichCollector.
  • offer RichCollector from RichFunction RuntimeContext instance
  • CollectorWrapper in flink-core util folder, user can wrap Collector and emit side outputs with predefined outputtags

Proposed Changes

We want to introduce outputTag and support operator collect arbitrary types of records with defined output Tags. In this prototype, it demonstrated how things works in raw/hacky form.

...

User may declare multiple output tags and use get output stream with one previously defined outputtag.

final OutputTag<String>OutputTag<S> sideOutput1 = new OutputTag<String>OutputTag<S>(SValue) {}; 
 

Update userFunctions using RichCollector(Or MultiCollector) the rationale behind is Collector has been used in lot of places other than stream and transformation, adding direct to Collector interface will introduce many empty methods in those classes.

...

flatMap(String value, Collector<Tuple2<String, Integer>> out){
       CollectorWrapper wrapper = new CollectorWrapper<>(out);
//out.collect(new Tuple2<String, Integer>(token, 1));

               wrapper.collect(new Tuple2<String, Integer>(

...

token,

...

1)

...

);

wrapper.collect(sideOutput1, "sideout");
}

WindowFunction

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable{
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

 

In RichFunction, user can getRuntimeContext().getCollector(), it returns a RichCollector

  • Add <S> void collect(OutputTag<S> tag, S value) method to Collector interface 

...

  • Add getOutput(OutputTag) to SingleOutputStreamOperator

User may pass outputtag defined eearlier and get a corresponding outputstream. There can be more than one outputtag share same type, however, getSideOutput only returns collected record with exams same outputtag type and value.

       flatMap(..).getSideOutput(sideOutput1) 

...

Record writer can utilize this information and compare with outputType( impl in prototype) or OutputTag (better implementation) or each Output<OUT> and just write matched stream record to channel

...