...
Broadcast streams are shared by all tasks within a container, likewise fashion when a broadcast stream is associated to a adjunct store, its content is visible to all tasks as well. To support this, we will introduce storage engines at container level. When a is marked broadcast using configuration task.broadcast.inputs and also associated to an adjunct data store, the storage engine will be instantiated at container level. In parallel to TaskStorageManager, we will introduce class ContainerStorageManager, which manages the lifecycle of underlying stores. The population of such a store is designated to one of the tasks, the rest of the tasks will only read from the store.
Recovery
To recover from previous states, there is two broad scenarios to consider: job restart and node failure. In either case, a container will go through startup, the handling of bootstrap is different for streams associated with persistent and non-persistent stores,
...
A job or container could be bought down intentionally or unintentionally, in either case it is required to recover the states before shutdown. The recovery process is carry out during startup, specifically for adjunct data stores, the requirement is to recover the state of stores to prior to shutdown as much as possible. Aspects to be considered are bootstrap/non-bootstrap and persistent/non-persistent stores.
Persistent stores
The table below summarizes the decision to be taken to recover
...
for persistent stores
No store found | Store found but not valid | Store found and valid | |
---|---|---|---|
Stream marked for bootstrap | Create new store Force bootstrap | Re-create store Force bootstrap | Continue from last offset |
Stream not marked for bootstrap | Create new store Continue from last offset | Re-create store Continue from last offset | Continue from last offset |
Non-persistent stores
For non-persistent stores, bootstrap is always forced if a stream is marked for bootstrap; otherwise continue from last offset
Store validity detection
There is multiple ways to detect if a store is valid, the initial implementation will compare the latest timestamp of files with current time, if it's larger than a predefined value, a store is deemed invalid. In the future, we may record latest offset consumed before shutdown and compare it with current offset available.
...