Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Abruptly stopping a pipeline can, however, be a problem if there are backward incompatible changes in the intermediate data schema after restart. Any inflight data in intermediate streams will cause failure upon reprocessing as it cannot be deserialized due to the incompatible schema. The usual fix to get past the serde issue for shuffled data is to switch to a fresh intermediate stream. This is achieved by changing the opId for the PartitionBy operator in samza high-level API and the intermediate transform names in Beam API as the intermediate topic names are derived from them. Samza persists window and timer data in local Rocksdb state which could run into the serde issues as well. If the Samza pipeline is using timers and windows, any outstanding beam samza managed state would need to be reset along with its backing changelog stream.

These fixes only get the pipeline running again but the inflight intermediate stream data and any Beam samza managed state that was reset is consequently lost. Aside from the data loss, this process of a pipeline upgrade is evidently cumbersome. Therefore, it is important to support the ability to drain and stop a pipeline smoothly prior to making an upgrade which involves an incompatible intermediate schema change. The work here is scoped to drain only intermediate stream (shuffle) data. The onus of clearing user state, if required, is on the user and can be done by switching to a fresh state.

...

One approach is to perform drain by default on every pipeline stop which would obviate the need to write DrainNotification using the DrainManager. Rest of the operations would be the same. When samza engine drains inflight data, it will also have to deal with all the buffered window state and timer data. Given the nature of streaming pipelines, there will always be incomplete windows at any point in time. This means that always draining the pipelines will lead to emitting incorrect windows and firing all timers on every stop. Hence, draining should not be performed on every stop but instead be reserved for exceptional cases where the user has to make a backward incompatible change to intermediate schemas. 

Rollout

The plan is to release this feature with Samza 1.8  release