...
- Introduce a new construct AdjunctDataStoreManager, its responsibility includes
- Maintains mapping between system streams and adjunct data stores
- Extract the key and value from an IncomingMessageEnvelop
- Save data in stores
- Introduce container level storage engines to support broadcast streams as sources
- TaskInstance will be modified to consult AdjunctDataStoreManager before delivering a message to a task; a message is stored in an adjunct store, it is not delivered to tasks
- Introduce new configurations to "associate" a Samza K/V store to a system stream
- Provide hooks to transform project an incoming message to desired types (this is useful to store a subset of the incoming message or customized transformation)
Data partitioning
For small datasets that can easily fit in a local store, the AD stream can be configured as broadcast stream and delivered to all containers. Within a container, one AD store instance per stream is instantiated and shared among all tasks within that container.
...
When an AD stream is marked as bootstrap, it guarantees that an initial snapshot is built before processing of input streams starts; otherwise input stream and AD streams are processed at the same time. After bootstrap, for change capture data, we keep updating its AD store when new updates arrives.The handling of bootstrap is different for streams associated with persistent and non-persistent stores
- for persistent stores, the underlying store is checked during startup. A bootstrap is forced if the store is not available or not valid (e.g. too old)
- for non-persistent stores, bootstrap is always forced
Key and Value
By Default, we use the key and value in IncomingMessageEnvelop to store an AD data item in a store, user provides serde as done today. Configuration options will be provided to allow user to
...
Broadcast streams are shared by all tasks within a container, likewise fashion when a broadcast stream is associated to a adjunct store, its content is visible to all tasks as well. To support this, we will introduce storage engines at container level. When a is marked broadcast using configuration task.broadcast.inputs and also associated to an adjunct data store, the storage engine will be instantiated at container level. In parallel to TaskStorageManager, we will introduce class ContainerStorageManager, which manages the lifecycle of underlying stores. The population of such a store is designated to one of the tasks, the rest of the tasks will only read from the store.
Recovery
To recover from previous states, there is two broad scenarios to consider: job restart and node failure. In either case, a container will go through startup, the handling of bootstrap is different for streams associated with persistent and non-persistent stores,
- For non-persistent stores, bootstrap is always forced if a stream is marked for bootstrap; otherwise continue from last offset
- The table below summarizes the decision to be taken to recover a system stream partition for persistent stores
No store found | Store found but not valid | Store found and valid | |
---|---|---|---|
Stream marked for bootstrap | Create new store Force bootstrap | Re-create store Force bootstrap | Continue from last offset |
Stream not marked for bootstrap | Create new store Continue from last offset | Re-create store Continue from last offset | Continue from last offset |
There is multiple ways to detect if a store is valid, the initial implementation will compare the latest timestamp of files with current time, if it's larger than a predefined value, a store is deemed invalid. In the future, we may record latest offset consumed before shutdown and compare it with current offset available.
Initialization
During initialization of a Samza container, the following steps will be added to the sequence
...