Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion thread<link to mailing list DISCUSS thread>

JIRA: SAMZA-TBD1278

Released: 

Problem

Samza today supports RocksDB and MemDB as local data stores, which enables users to cache adjunct data for later usage during stream processing. However, the population of a data store is end user’s responsibility. This introduced additional effort from end user to develop for and maintain data stores, and also to deal with corner cases such as reload after consumers falling off. To avoid these issues, some people employed alternative solutions such as Voldemort, CouchBase, etc. In addition, table oriented operations of fluent API would require working data to be made available by the system. As we look at the issue more closely, it appears generic enough to be addressed by data infrastructure.

A dataset delivered in a stream can be either bounded or unbounded, an example of an unbounded dataset could be a database change stream, and an example of a bounded dataset could be the content of a file. When Samza is running in 24x7 mode, the stream for a bounded dataset may deliver multiple versions.

This proposal focuses on unbounded datasets.

...

We want to have an adjunct data (AD) store that is a read-only cache. It automatically stores streaming data for later usage. Adjunct data can be accessed the same way as accessing a key-value store in Samza, in addition we guarantee provide a consistent view of data from a Samza task’s or container's perspective. Data can be either partitioned or unpartitioned. If the dataset is small enough to fit in a RocksDB instance, the same copy would be populated in every container via a broadcast stream; if it is too large enough fit in one database instance it would be partitioned across containers of a Samza job. 

...

  • Introduce a new construct AdjunctDataStoreManager, its responsibility includes
    • Maintains mapping between system streams and adjunct data stores
    • Extract the key and value from an IncomingMessageEnvelop
    • Updates relevant adjunct Save data in stores
  • TaskInstance will be modified to consult AdjunctDataStoreManager before delivering a message to a task, ; a message is stored in an adjunct store, it is not delivered to tasktasks
  • Introduce new configurations to "bindassociate" a Samza K/V store to one or more a system stream
  • Provide hooks to transform an incoming message to desired types (this is useful as not all everything needs to be cachedto store a subset of the incoming message)

Data partitioning

For small datasets that can easily fit in a local store, the AD stream can be configured as broadcast stream and delivered to all containers. Within a container, one AD store instance per stream is instantiated and shared among all tasks within that container. 

 For large datasets too large to fit in a local store, the AD stream has to be partitioned, there will be one AD store instance per stream and task. 

Consistency

For change capture data, we only serve one snapshot and the version data unbounded dataset, the version of the dataset is naturally consistent across containers . File based data is out of scope of this proposal, it would result in bounded streams and is more complicated due to versions(as there is only one). If an stream is unpartitioned, we guarantee serving of provide a consistent snapshot within a container. If a stream is partitioned, we guarantee one consistent provide a consistent snapshot of a version within a task. No consistency is offered across containers.

...

By Default, we use the key and value in IncomingMessageEnvelop to store an AD data item in a store. However, this isn't flexible in many cases. We will provide hooks to allow user toConfiguration options will be provided to 

  • construct user defined keys
  • construct user defined values

Configuration

MandatoryNo
NameDefaultDescription
stores.adstore.manager.factory.classNoN/A 

Factory class to

create

instantiate an adjunct data store manager

to allow user override the default implementation

. If not configured, a built-in implementation will be used.

stores.<store>.adstore.inputNoN/A The name of the system stream to be associated with this store, a store can only be associated to one stream
stores.<store>.adstore.key.extractor.factory.class N/AFactory class to extract instantiate a converter object that extracts keys from an IncomingMessageEnvelop. If not provided, the key in IncomingMessageEnvelop is used.
stores.<store>.adstore.value.converter.factory.classNoN/A Factory class to convert instantiate a converter object that converts an IncomingMessageEnvelop to a value object. If not provided, the value in IncomingMessageEnvelop is used.

...