...
For unbounded dataset, the version of the dataset is naturally consistent across containers (as there is only one version). If an stream is unpartitioned, we provide a consistent snapshot within a container. If a stream is partitioned, we provide a consistent snapshot of a version within a task. No consistency is offered across containers.
...
The latter is needed when a memory store is used
Implementation notes
In addition to TaskStorageManager, two additional classes will be introduced: AdjunctDataStoreManager, ContainerStorageManager. Class AdjunctDataStoreManager is introduced mainly to encapsulate adjunct data related functionality. See next section for its responsibilities. ContainerStorageManager provides a subset of functionality of TaskStorageManager, but at container level. It may be help to extract a base class from these two classes to avoid duplication. It's main responsibility includes
- Maintain directories for the underlying stores (creation, deletion)
- Detect if the stores under management is still valid during startup
- Delegate operations such as flush, commit to underlying stores
Key classes
AdjunctDataStoreManagerFactory
...
Code Block |
---|
/**
* An AdjunctDataStoreManager instance is responsible for
* 1. maintaining the mapping between system streams and adjunct data stores
* 2. extracting the key and value from an IncomingMessageEnvelop
* 3. populating adjunct data stores
* 4. Determine system stream partitions, whose offset need to be reset
* during startup
*/
public interface AdjunctDataStoreManager {
/**
* Invoked before a message is passed to a task
* @returns true if the message is saved in a store, false otherwise
*/
AdjunctDataStoreManager boolean process(IncomingMessageEnvelop message);
} |
KeyValueExtractorFactory
Code Block |
---|
/** * A factory to instantiate key/value extractors */ public interface KeyValueExtractorFactory { KeyValueExtractor getKeyValueExtractor(Config config); } |
KeyValueExtractor
Code Block |
---|
/** * A key/value extractor that extracts key and value from in incoming * message envelop */ public interface KeyValueExtractor { Object getKey(IncomingMessageEnvelop input); Object getValue(IncomingMessageEnvelop input); } |
Public Interfaces
No changes to public interface
...