Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status:

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-4460

Release1.3


Motivation

Side outputs(a.k.a Multi-outputs)  is one of highly requested features in high fidelity stream processing use cases. With this feature, Flink can

...

  • Side output sparsely received late arriving events while issuing aggressive watermarks in window computation.

Public Interfaces

  • Add CollectorWrapper in flink-core util folder, user can wrap Collector and emit side outputs with predefined outputtags

Proposed Changes

We want to introduce outputTag and support operator collect arbitrary types of records with defined output Tags. In this prototype, it demonstrated how things works in raw/hacky form.

User may declare multiple output tags and use get output stream with one previously defined outputtag.

final OutputTag<S> sideOutput1 = new OutputTag<S>(SValue) {}; 

Update userFunctions using RichCollector(Or MultiCollector) the rationale behind is Collector has been used in lot of places other than stream and transformation, adding direct to Collector interface will introduce many empty methods in those classes.

...

        wrapper.collect(sideOutput1, "sideout");
}

User may pass outputtag defined eearlier and get a corresponding outputstream.

...

Record writer can utilize this information and compare with outputType( impl in prototype) or OutputTag (better implementation) or each Output<OUT> and just write matched stream record to channel

It allows good compatibility without drastic change of current “single typed” output model.

...

Instead of assuming output stream record values are same type, it will need to check output type and stream record type, only output with same outputtag typeinfo.


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?  We would like to approach with two phase approach. Step 1 would be backward compatible manner. Other than RichCollector change can affects some existing userFunction binary compatibility, code level upgrade effort is very little other than change collector type class name from Collector -> RichCollector. 
  • Moving forward, code refactors would create bigger impact to framework backwards compatbility. We would like to revisit and discuss once we understand benefit and impact balance better. 

Test Plan

Phase 1 will follow same as adding feature to Flink API, adding unit tests and run local env mock tests