Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Introduce a new construct AdjunctDataStoreManager, its responsibility includes
    • Maintains mapping between system streams and adjunct data stores
    • Extract the key and value from an IncomingMessageEnvelop
    • Save data in stores
  • Introduce container level storage engines to support broadcast streams as sources
  • TaskInstance will be modified to consult AdjunctDataStoreManager before delivering a message to a task; a message is stored in an adjunct store, it is not delivered to tasks
  • Introduce new configurations to "associate" a Samza K/V store to a system stream
  • Provide hooks to transform project an incoming message to desired types (this is useful to store a subset of the incoming message or customized transformation)

Data partitioning

For small datasets that can easily fit in a local store, the AD stream can be configured as broadcast stream and delivered to all containers. Within a container, one AD store instance per stream is instantiated and shared among all tasks within that container. 

...

When an AD stream is marked as bootstrap, it guarantees that an initial snapshot is built before processing of input streams starts; otherwise input stream and AD streams are processed at the same time. After bootstrap, for change capture data, we keep updating its AD store when new updates arrives.The handling of bootstrap is different for streams associated with persistent and non-persistent stores

  • for persistent stores, the underlying store is checked during startup. A bootstrap is forced if the store is not available or not valid (e.g. too old)
  • for non-persistent stores, bootstrap is always forced

Key and Value

By Default, we use the key and value in IncomingMessageEnvelop to store an AD data item in a store, user provides serde as done today. Configuration options will be provided to allow user to

...

Broadcast streams are shared by all tasks within a container, likewise fashion when a broadcast stream is associated to a adjunct store, its content is visible to all tasks as well. To support this, we will introduce storage engines at container level. When a is marked broadcast using configuration task.broadcast.inputs and also associated to an adjunct data store, the storage engine will be instantiated at container level. In parallel to TaskStorageManager, we will introduce class ContainerStorageManager, which manages the lifecycle of underlying stores. The population of such a store is designated to one of the tasks, the rest of the tasks will only read from the store.

Recovery

To recover from previous states, there is two broad scenarios to consider: job restart and node failure. In either case, a container will go through startup, the handling of bootstrap is different for streams associated with persistent and non-persistent stores, 

  • For non-persistent stores, bootstrap is always forced if a stream is marked for bootstrap; otherwise continue from last offset
  • The table below summarizes the decision to be taken to recover a system stream partition for persistent stores
 No store foundStore found but not validStore found and valid
Stream marked for bootstrap

Create new store

Force bootstrap

Re-create store

Force bootstrap

Continue from last offset
Stream not marked for bootstrap

Create new store

Continue from last offset

Re-create store

Continue from last offset
Continue from last offset


There is multiple ways to detect if a store is valid, the initial implementation will compare the latest timestamp of files with current time, if it's larger than a predefined value, a store is deemed invalid. In the future, we may record latest offset consumed before shutdown and compare it with current offset available.

Initialization

During initialization of a Samza container, the following steps will be added to the sequence

...