Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Samza today supports RocksDB and MemDB as local data stores, which enables users to cache data for later usage during stream processing. However, the population of a data store is end user’s responsibility. This introduced complexity of maintaining local additional effort from end user to develop for and maintain data stores, especially and also deal with corner cases such as reload after consumers falling off. To avoid these issues, some people employed alternative solutions such as Voldemort, CouchBase, etc. In addition, table oriented operations of fluent API would require working data to be made available by the system. As we look at the issue more closely, it appears generic enough to be addressed by data infrastructure.

...

Theoretically an AD store could be either local (RocksDB and MemDB) or centralized (CouchBase), however we believe for most cases the use of a centralized data store is more of a side effect of the lack of a local adjunct data store. For now we defer the support of a centralized adjunct data store until we see clear evidence.

...

Having adjunct data store potentially enables would enable a number of use cases
  • Automatic maintenance of local cache
  • Table oriented operations for fluent API

Proposed Changes

The proposed changes include

  • Introduce a new construct AdjunctDataStoreManager, its responsibility includes
    • Maintains mapping between system streams and adjunct data stores
    • Intercept message delivery flow in TaskInstance and populate Populates and updates relevant adjunct data stores
  • TaskInstance will be modified to consult AdjunctDataStoreManager before delivering a message to a task, a message stored in an adjunct store is not delivered to task
  • Introduce new configurations to "bind" a Samza store to one or more system stream
  • Provide hooks to transform an incoming message to desired objects types (this is useful as not all everything needs to be cached)

...

For change capture data, we only serve one snapshot and the version data is naturally consistent across containers. File based data is out of scope of this proposal, it would result in bounded streams and is more complicated due to versions. If an stream is unpartitioned, we guarantee serving of consistent snapshot within a container. If a stream is partitioned, we guarantee consistency one consistent version within a task. No consistency is offered across containers.

...

We will provide hooks to allow user to

  • construct a user defined keyconvert the value
  • to a construct user defined objectvalues

Configuration

NameMandatoryDefaultDescription
stores.adstore.manager.factory.classNoN/AThe factory Factory class to create a an adjunct data store manager to allow user override the default implementation
stores.<store>.adstore.inputNoN/AThe name of the system stream to be bound associated with this store, an a store can only be bound associated to one stream
stores.<store>.adstore.key.extractor.factory.classNoN/AFactory class to extract keys from an IncomingMessageEnvelop. If not provided, the key in IncomingMessageEnvelop is used.
stores.<store>.adstore.value.converter.factory.classNoN/AFactory class to convert the value in an IncomingMessageEnvelop to an a value object. If not provided, the value in IncomingMessageEnvelop is used.

...