Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Introduce a new construct AdjunctDataStoreManager, its responsibility includes
    • Maintains mapping between system stream streams and adjunct data storestores
    • Intercept message delivery flow in TaskInstance and populate relevant adjunct data stores
  • TaskInstance will be modified to consult AdjunctDataStoreManager before delivering a message to a task, a message stored in an adjunct store is not delivered to task
  • Introduce new configurations to "bind" a Samza store to a one or more system stream
  • Provide hooks to transform an incoming message to desired objects (this is useful as not all everything needs to be cached)

Image Added

Data partitioning

For small datasets that can easily fit in a local store, the AD stream can be configured as broadcast stream and delivered to all containers. Within a container, one AD store instance per stream is instantiated and shared among all tasks within that container. 

 For large datasets too large to fit in a local store, the AD stream has to be partitioned, there will be one AD store instance per stream and task. 

Consistency

For change capture data, we only serve one snapshot and the version data is naturally consistent across containers. File based data is out of scope of this proposal, it would result in bounded streams and is more complicated due to versions. If an stream is unpartitioned, we guarantee serving of consistent snapshot within a container. If a stream is partitioned, we guarantee consistency within a task. No consistency is offered across containers.

Bootstrap

When an AD stream is marked as bootstrap, it guarantees that an initial snapshot is built before processing of input streams starts; otherwise input stream and AD streams are processed at the same time. After bootstrap, for change capture data, we keep updating its AD store when new updates arrives.

Key and Value

By Default, we use the key and value in IncomingMessageEnvelop to store an AD data item in a store. However, this isn't flexible in many cases.

We will provide hooks to allow user to

  • construct a user defined key
  • convert the value to a user defined object

Configuration

NameMandatoryDefaultDescription
stores.adstore.manager.factoryNoN/AThe factory class to create a adjunct data store manager to allow user override the default implementation
stores.<store>.adstore.inputNoN/AThe name of the system stream to be bound with this store, an store can only be bound to one stream
stores.<store>.adstore.key.extractorNoN/AA user defined class that allows extraction of a key from an IncomingMessageEnvelop. If not provided, the key in IncomingMessageEnvelop is used.
stores.<store>.adstore.value.converterNoN/AA user defined class that allows conversion of an IncomingMessageEnvelop to an object. If not provided, the value in IncomingMessageEnvelop is used.

Key classes

AdjunctDataStoreManagerFactory

Code Block
/**
 * Factory class for AdjunctDataStoreManager
 */
public interface AdjunctDataStoreManagerFactory {
    AdjunctDataStoreManager getAdjunctDataStoreManager(
        Config config, 
        Set<SystemStreamPartition> ssps, 
        Map<String, StorageEngine> storageEngines);
}

AdjunctDataStoreManager

Code Block
/**
 * An AdjunctDataStoreManager instances manages adjunct data stores for a task
 */
public interface AdjunctDataStoreManager {
    AdjunctDataStoreManager process(String taskName, IncomingMessageEnvelop message);
}

AdjunctDataStoreManagerFactory

Public Interfaces

No changes to public interface

...

Compatibility, Deprecation, and Migration Plan

 

Rejected Alternatives

Scope of an AD store

Within a container, we have multiple choices:
  1. One AD store instance per stream - this option guarantees a consistent view within a container with side effects. Since data in AD streams may arrive in different times, the maintenance of a consistent snapshot becomes difficult. As multiple versions of a dataset evolves, we potentially may have to maintain a large number of intermediate snapshot versions.

  2. One AD store instance per stream partition - this option guarantees a consistent view at stream partition level. Querying the store becomes a problem: when multiple partitions are assigned to a task, we have no way of knowing which store instance holds the value resides and end up querying all instances. The implementation might be a bit simpler, but the price is very high.

  3. One AD store instance per stream and task - this is the compromise between 1 and 2, here we guarantee a consistent view per task. If multiple partitions are assigned to a task, we potentially still have to maintain multiple intermediate snapshot versions of a AD store instance, but number of versions is lower than in #1. Since within LI no one is using the custom system partition grouper feature, only open source portion will be impacted.

#3 chosen