As of October 2020 the area the Go SDK is most lacking is supporting streaming operation. This page intends to operate as a guide to areas related to streaming support that need contributions. Specific tasks are linked to their related JIRAs which have been updated with places to start looking for independent, orthogonal work.

These are not recommended as "starter" or "newbie" tasks as many require understanding of the Beam Model, and the Go Programming Language to implement. In particular, understanding the runtime execution model is critical in the implementation of these features, as well as understanding the Go SDK's exec package which implements it. 

As always, the right place to get help is from the #beam-go slack channel, or the dev@ mailing list, referring to the JIRA you're interested in working on. 

As Apache Beam is a unified model for batch and stream processing, supporting streaming is not simply adding a single feature, it's adding several features. The Go SDK during execution is generally unaware whether the bundle it's processing is "streaming" or not. As such, advanced features like Side Inputs, and State and Timers will work just as well for batch execution as for streaming execution. However, the nature of streaming makes certain improvements and optimizations more impactful for streaming, which is why they are mentioned here. 

Previous development has been focused on efficient execution within a batch global window environment, which is why the following hasn't been worked on yet. 

Overview

To get to streaming, we first need to validate the Go SDK's windowing implementation. While streaming can in principle operate only the global window, it by necessity means that all pcollections are bounded, which makes them of limited use in streaming contexts. So, first, windowing, as this gives us correct processing of data in a context that's useful for streaming.

Once we're assured that we're correctly processing the data WRT windows, we can work on getting data into the pipeline. The Go SDK can't currently produce streaming data. There are two options, native and cross language. Native requires augmenting related to Splittable DoFns with the ability to estimate watermarks and self checkpoint. This allows Go SDK users to author scalable streaming source DoFns in Go. Cross Language requires completing the cross language support implementation, and wrapping the already implemented streaming source DoFns from Java and Python, and making them available in the appropriate expansion service, and sibling package in the Go SDK. Both are valid and appropriate avenues to enable injestion of streaming data into a Go SDK pipeline.

At this point, we would have a minimum viable product to use the Go SDK in streaming pipelines. However, streaming pipelines are more performance sensitive than batch pipelines. Latency requirements are often stricter,  bundles are generally smaller, and the value of caching is higher.

To that end, improving the performance of side inputs via the Beam cache token mechanism, and adding random access side inputs as a user API helps improve efficiency. Further, features that really shine in streaming contexts will have more cause to be implemented, such as custom windowing, Triggers, and State and Timers, which enable additional native uses. 

Details


The rest of this page divides the work into broad feature related categories, outlining the feature, it's state at time of writing (October 2020), and links to their JIRAs. Other than validating windowing correctness, the remaining sections are largely independent and can be implemented in any order. Testing and documentation are required as well for successful completion of these tasks, but mentioned only in the JIRAs to reduce repetition here.

It's strongly recommended to develop these features against a fully vetted runner like the Python Portable Runner. See Go Tips for guidance on getting started using that.

Once the tasks marked [MVP] are accomplished, the SDK will support basic streaming.

Validate Windowing [MVP] [Correctness]  Unable to render Jira issues macro, execution error.

Foremost is work in validating that the existing streaming and windowing code is correct in the SDK.  The SDK permits emitting and observing event times and observing windows in DoFns.

  • Validate that users can observe windows in their DoFns explicitly.[MVP]   Unable to render Jira issues macro, execution error.
  • Validate existing windowing code is correct for simple GBKs and lifted combiners work.[MVP]   Unable to render Jira issues macro, execution error.
    • In particular sliding (multiple overlapping) windows where a single element could contribute to multiple aggregates.
  • Validate Side Input behavior WRT windows. (implicit observation of windows).[MVP]   Unable to render Jira issues macro, execution error.
    • Side Inputs are valid per window, and this should be validated and tested to be true for the Go SDK.

Testing Streaming   Unable to render Jira issues macro, execution error.

Beam runners can support a 'TestStream" primitive to emulate a streaming datasource for testing. This stands to make other tasks in this set much easier to implement.

  • Add TestStream support to allow simpler writing of “streaming” based tests against runners.

Go SDK Native Streaming Source DoFns [MVP Native]

At present it’s not possible to write Streaming Source DoFns. Ordinary DoFns work just fine for transforms that do not have significant fanout of downstream elements (high outputs per input) or have high processing time per fraction of elements (expensive outputs per input). SplittableDoFns help divide work of a single element in terms of their output elements across multiple workers, but can’t serve as more than a single bundle source. It’s necessary to be able to split a single bundle into multiple bundles not just in the work domain, but also across the time domain.

To enable native Go streaming sources we would need to add at least self checkpointing and watermark estimation support, but additional related features can make the DoFns more versatile.

  • DoFn Self Checkpointing [MVP]  Unable to render Jira issues macro, execution error.
    • Allows DoFn to terminate a bundle, and tell the runner how  to restart processing elsewhere. Can be considered “splitting in time” but purposely permits a DoFn to finish a bundle.
  • Watermark Estimation [MVP]  Unable to render Jira issues macro, execution error.
    • Sets the minimum output watermark for a given restriction+element based on element timestamps or an external clock.
  • Truncating SDFs during drain  Unable to render Jira issues macro, execution error.
    • Allows for more aggressive shrinking of restrictions during a drain.
  • Bundle Finalization  Unable to render Jira issues macro, execution error.  
    • Allows a callback to be registered and side effects performed after bundle completion. For example, to acknowledge messages received from a message broker (kafka, Cloud PubSub) after the bundle results are persisted.

Cross Language Streaming Source DoFns [MVP Xlang] 

  • Clean up Prototype code  Unable to render Jira issues macro, execution error.
  • Provide example wrappers of existing XLang DoFns, 
    • like the Java Kafka transform.
    • Wrap advanced State and Timers transforms form Java and Python to allow their use in Go SDK pipelines - eg. GroupIntoBatches. 

Side Inputs [Performance]

Side inputs are a valuable feature for affecting computation with additional pipeline computed data. However they can also affect performance significantly.

At present, the Go SDK supports iterable side inputs, and they’re looked up from the runner every time.

  • Side Input Caching using tokens.  Unable to render Jira issues macro, execution error.
    • Side inputs are valid based on the window. Multiple elements and bundles processed in the same window can reuse the same side input data at the runner’s discretion. This is implemented with a key token. 
    • This is critical for Streaming performance to reduce data interchange with the Runner half of a worker.
  • Map Side Inputs / Random Access Side Inputs.  Unable to render Jira issues macro, execution error.
    • KV PCollections as side inputs currently support an iterable approach, however the beam protocol does support a Key Lookup approach as well.
    • This enables having significantly larger side inputs, as data can be looked up from the runner as needed, rather than possibly iterating over the entire PCollection for every element.
    • Similarly, an advanced feature would be to support ViewFns for side inputs user side processing of side inputs, orchestrated and garbage collected by the framework, rather than requiring users to do so manually.   Unable to render Jira issues macro, execution error.


Windowing [Basic]

FlumeGo supports simple fixed interval windowing, and global windows. The validation mentioned above needs to be handled before sliding windows can be declared to be correctly supported.

  • Complete  Session Windowing plumbing  Unable to render Jira issues macro, execution error.
  • Implement Custom Windowing  Unable to render Jira issues macro, execution error.
    • Requires special handling and encoding to the windowing space, notably in how to merge windows together SDK side. Likely handled as a special WindowFn struct with specialized method names, similar to DoFns and CombineFns.


Triggers [Basic]   Unable to render Jira issues macro, execution error.

Triggers are special runner side behavior indicating how to handle data WRT the watermark and window. Commonly configuring the processing for “late data” and similar.

These are not currently implemented for user use in the Go SDK. Reshuffle configures triggers, but it’s not accessible. A correct trigger implementation can at least re-implement Reshuffle in a user pipeline, rather than handled specially within the framework.

  • Requires extending the window package to be able to configure the various triggers.
    • Specifically being able to compose triggers as also permitted by the proto.
  • Requires updating the graphx package translate.go to marshal (and unmarshal?) the triggers to and from Beam PipelineProto Windowing strategies.
  • Requires supporting triggers with the beam.WindowInto transform for user pipeline use.
  • Extension: Handle pane propagation and observation in the exec package, and in user dofns. 
    • Panes indicate whether data was on time or not, and similar facets.
    • Might simply extend the existing window interface.


State and Timers [Advanced]  Unable to render Jira issues macro, execution error.  

The Go SDK currently doesn’t have any state and timers support. 

State and Timers is an API for manually managing per-key state allowing for fine-grain control over aggregations. State is scoped per key + window. Event Time Timers allow manipulation of the output watermark, and respond to the input watermark while processing time timers allow responding to wall time events, allowing advanced transforms like Group Into Batches.

  • Design an idiomatic Go approach to Timers and State processing for DoFns
    • Go doesn’t support annotation like constructs, with the exception of struct field tags so 
      • Design likely requires new framework side marker types
      • Design likely requires using field tags.
    • State concerns:
      • Needs to allow customization for state types. (easier post generics in Go, but an design that doesn’t require that would be viable sooner)
      • Should support deferred batch reads of multiple states
      • Needs to be expandable to handle ValueState, Combining State, and BagState
    • Timer concerns: 
      • Needs to handle Event and Processing Time timers.
      • Dynamic Timer tags (likely the one and only way to handle Go SDK timers)
      • Needs to introduce an “OnTimer” method, and associated validation.


  • No labels