Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Introduce a new construct AdjunctDataStoreManager, its responsibility includes
    • Maintains mapping between system streams and adjunct data stores
    • Extract the key and value from an IncomingMessageEnvelop
    • Save data in stores
  • Introduce container level storage engines to support broadcast streams as sources
  • TaskInstance will be modified to consult AdjunctDataStoreManager before delivering a message to a task; a message is stored in an adjunct store, it is not delivered to tasks
  • Introduce new configurations to "associate" a Samza K/V store to a system stream
  • Provide hooks to transform an incoming message to desired types (this is useful to store a subset of the incoming message)

...

User defined objects will be treated as POJO's, and we may provide a default serde for POJO.

Broadcast Streams

Broadcast streams are shared by all tasks within a container, likewise fashion when a broadcast stream is associated to a adjunct store, its content is visible to all tasks as well. To support this, we will introduce storage engines at container level. When a is marked broadcast using configuration task.broadcast.inputs and also associated to an adjunct data store, the storage engine will be instantiated at container level. In parallel to TaskStorageManager, we will introduce class ContainerStorageManager, which manages the lifecycle of underlying stores. The population of such a store is designated to one of the tasks, the rest of the tasks will only read from the store.

Initialization

During initialization of a Samza container, the following steps will be added to the sequence

  • During container initialization
    • All adjunct stores associated broadcast are retrieved from configuration
    • Instantiate each store using last step as input
    • Instantiate ContainerStorageManager
  • During task instance initialization
    • Instantiate storage engines and TaskStorageEngineManager (current flow)
    • Instantiate AdjunctDataStorageManager, it takes configuration, ContainerStorageManager and TaskStorageManager as input

Configuration

NameDefaultDescription
stores.adstore.manager.factory 

Factory class to instantiate an adjunct data store manager. If not configured, a built-in implementation will be used.

stores.<store>.adstore.input The name of the system stream to be associated with this store, a store can only be associated to one stream
stores.<store>.adstore.key.value.extractor.factory Factory class to instantiate a key/value extractor object that extracts keys and value from an IncomingMessageEnvelop. If not provided, the key and value in IncomingMessageEnvelop is used.

In addition to configuration above, the following configuration items may be necessary

  • systems.<system>.streams.<stream>.samza.bootstrap: true
  • systems.<system>.streams.<stream>.samza.reset.offset: true

The latter is needed when a memory store is used

Key classes

AdjunctDataStoreManagerFactory

...