You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 19 Next »

Status

Current state: [ UNDER DISCUSSION | ACCEPTED | REJECTED ]

Discussion thread<link to mailing list DISCUSS thread>

JIRA: SAMZA-TBD

Released: 

Problem

Samza today supports RocksDB and MemDB as local data stores, which enables users to cache data for later usage during stream processing. However, the population of a data store is end user’s responsibility. This introduced additional effort from end user to develop for and maintain data stores, and also deal with corner cases such as reload after consumers falling off. To avoid these issues, some people employed alternative solutions such as Voldemort, CouchBase, etc. In addition, table oriented operations of fluent API would require working data to be made available by the system. As we look at the issue more closely, it appears generic enough to be addressed by data infrastructure.

A dataset delivered in a stream can be either bounded (such as files) or unbounded, this proposal focuses on unbounded datasets.

Motivation

We want to have an adjunct data (AD) store that is a read-only cache. It automatically stores streaming data for later usage. Adjunct data can be accessed the same way as accessing a key-value store in Samza, in addition we guarantee a consistent view of data from a Samza task’s perspective. Data can be either partitioned or unpartitioned. If the dataset is small enough to fit in a RocksDB instance, the same copy would be populated in every container via a broadcast stream; if it is large enough fit in one database instance it would be partitioned across containers of a Samza job. 

  

Theoretically an AD store could be either local (RocksDB and MemDB) or centralized (CouchBase), however for most cases the use of a centralized data store is more of a side effect of the lack of a local adjunct data store. For now we defer the support of a centralized adjunct data store until we see clear evidence.

Having adjunct data store would enable a number of use cases
  • Automatic maintenance of local cache
  • Table oriented operations

Proposed Changes

The proposed changes include

  • Introduce a new construct AdjunctDataStoreManager, its responsibility includes
    • Maintains mapping between system streams and adjunct data stores
    • Extract the key and value from an IncomingMessageEnvelop
    • Updates relevant adjunct data stores
  • TaskInstance will be modified to consult AdjunctDataStoreManager before delivering a message to a task, a message stored in an adjunct store is not delivered to task
  • Introduce new configurations to "bind" a Samza store to one or more system stream
  • Provide hooks to transform an incoming message to desired types (this is useful as not all everything needs to be cached)

Data partitioning

For small datasets that can easily fit in a local store, the AD stream can be configured as broadcast stream and delivered to all containers. Within a container, one AD store instance per stream is instantiated and shared among all tasks within that container. 

 For large datasets too large to fit in a local store, the AD stream has to be partitioned, there will be one AD store instance per stream and task. 

Consistency

For change capture data, we only serve one snapshot and the version data is naturally consistent across containers. File based data is out of scope of this proposal, it would result in bounded streams and is more complicated due to versions. If an stream is unpartitioned, we guarantee serving of consistent snapshot within a container. If a stream is partitioned, we guarantee one consistent version within a task. No consistency is offered across containers.

Bootstrap

When an AD stream is marked as bootstrap, it guarantees that an initial snapshot is built before processing of input streams starts; otherwise input stream and AD streams are processed at the same time. After bootstrap, for change capture data, we keep updating its AD store when new updates arrives.

Key and Value

By Default, we use the key and value in IncomingMessageEnvelop to store an AD data item in a store. However, this isn't flexible in many cases.

We will provide hooks to allow user to

  • construct user defined keys
  • construct user defined values

Configuration

NameMandatoryDefaultDescription
stores.adstore.manager.factory.classNoN/AFactory class to create an adjunct data store manager to allow user override the default implementation
stores.<store>.adstore.inputNoN/AThe name of the system stream to be associated with this store, a store can only be associated to one stream
stores.<store>.adstore.key.extractor.factory.classNoN/AFactory class to extract keys from an IncomingMessageEnvelop. If not provided, the key in IncomingMessageEnvelop is used.
stores.<store>.adstore.value.converter.factory.classNoN/AFactory class to convert an IncomingMessageEnvelop to a value object. If not provided, the value in IncomingMessageEnvelop is used.

Key classes

AdjunctDataStoreManagerFactory

/**
 * Factory class for AdjunctDataStoreManager
 */
public interface AdjunctDataStoreManagerFactory {
    AdjunctDataStoreManager getAdjunctDataStoreManager(
        Config config, 
        Set<SystemStreamPartition> ssps, 
        Map<String, StorageEngine> storageEngines);
}

AdjunctDataStoreManager

/**
 * An AdjunctDataStoreManager instance is responsible for
 * 1. maintaining the mapping between system streams and adjunct data stores
 * 2. populating adjunct data stores
 */
public interface AdjunctDataStoreManager {
   /**
    * Invoked before a message is passed to a task
    * @returns true if the message is saved in a store, false otherwise
    */
    AdjunctDataStoreManager boolean process(IncomingMessageEnvelop message);
}

Converter

/**
 * A converter converts an input object to another type
 */
public interface Converter {
   /**
    * Converts an input object to another type
    *
    * @param input the object to be converted
    * @returns the converted object
    */
    Object convert(Object input);
}

 

Public Interfaces

No changes to public interface

Implementation and Test Plan


Compatibility, Deprecation, and Migration Plan

 

Rejected Alternatives

Scope of an AD store

Within a container, we have multiple choices:
  1. One AD store instance per stream - this option guarantees a consistent view within a container with side effects. Since data in AD streams may arrive in different times, the maintenance of a consistent snapshot becomes difficult. As multiple versions of a dataset evolves, we potentially may have to maintain a large number of intermediate snapshot versions.

  2. One AD store instance per stream partition - this option guarantees a consistent view at stream partition level. Querying the store becomes a problem: when multiple partitions are assigned to a task, we have no way of knowing which store instance holds the value resides and end up querying all instances. The implementation might be a bit simpler, but the price is very high.

  3. One AD store instance per stream and task - this is the compromise between 1 and 2, here we guarantee a consistent view per task. If multiple partitions are assigned to a task, we potentially still have to maintain multiple intermediate snapshot versions of a AD store instance, but number of versions is lower than in #1. Since within LI no one is using the custom system partition grouper feature, only open source portion will be impacted.

#3 chosen

AD store abstraction

For bounded datasets delivered (such as files) as streams, the adjunct data store may need to maintain multiple version of underlying stores in order to guarantee serving of a consistent version. This would require the abstraction of adjunct data store, which manages internally versions of underlying stores.


 

  • No labels