Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For unbounded dataset, the version of the dataset is naturally consistent across containers (as there is only one version). If an stream is unpartitioned, we provide a consistent snapshot within a container. If a stream is partitioned, we provide a consistent snapshot of a version within a task. No consistency is offered across containers.

...

The latter is needed when a memory store is used

Implementation notes

In addition to TaskStorageManager, two additional classes will be introduced: AdjunctDataStoreManager, ContainerStorageManager. Class AdjunctDataStoreManager is introduced mainly to encapsulate adjunct data related functionality. See next section for its responsibilities. ContainerStorageManager provides a subset of functionality of TaskStorageManager, but at container level. It may be help to extract a base class from these two classes to avoid duplication. It's main responsibility includes

  • Maintain directories for the underlying stores (creation, deletion)
  • Detect if the stores under management is still valid during startup
  • Delegate operations such as flush, commit to underlying stores

Key classes

AdjunctDataStoreManagerFactory

...

Code Block
/**
 * An AdjunctDataStoreManager instance is responsible for
 * 1. maintaining the mapping between system streams and adjunct data stores
 * 2. extracting the key and value from an IncomingMessageEnvelop
 * 3. populating adjunct data stores
 * 4. Determine system stream partitions, whose offset need to be reset 
 *    during startup
 */
public interface AdjunctDataStoreManager {
   /**
    * Invoked before a message is passed to a task
    * @returns true if the message is saved in a store, false otherwise
    */
    AdjunctDataStoreManager boolean process(IncomingMessageEnvelop message);
}

KeyValueExtractorFactory

Code Block
/**
 * A factory to instantiate key/value extractors
 */
public interface KeyValueExtractorFactory {
    KeyValueExtractor getKeyValueExtractor(Config config);
}

KeyValueExtractor

Code Block
/**
 * A key/value extractor that extracts key and value from in incoming
 * message envelop
 */
public interface KeyValueExtractor {
    Object getKey(IncomingMessageEnvelop input);
    Object getValue(IncomingMessageEnvelop input);
}
 

Public Interfaces

No changes to public interface

...