Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We extend our existing stores (RocksDB, InMemory) to implement the corresponding new interfaces. Note, we will keep the old stores and extend them to give PAPI users to choice to use stored with or without the ability to store timestamps.

The usage/implementation if of upgrade stores is describe in the next section.

...

For a clean upgrade path for RocksDB itself, we need to introduce the above configs and new interfaces to implement actual upgrade code. We will provide implementations to upgrade from an existing KeyValueStore to and KeyValueWithTimestampStore. The defined interface are generic though, allowing to implement "upgrade stores" from any store type A type to and other store type B type.

To make the upgrade possible, we generalize the changelog-to-store mapping from ConsumerRecords to KeyValue pairs – this allows to customize the translation during store upgrade. To upgrade a store, a user updates her topology to use a `StoreUpgradeBuilder` that can return a proxy store, an upgrade store, and a regular store. If the upgrade mode is enabled, the runtime will create a proxy story for StreamTasks that implements the new store API but internally maps back to the old store (ie old on disk format). Using the proxy store allows to run new code with the already existing old store to avoid any downtime. At the same time, runtime will create StoreUpgradeTasks that use `RecordConverterStore`: for . For those StoreUpgradeTasks, the changelog topic will be consumer, record format will be update to the new format using `RecordConverter` interface; afterwards, the new format is written to (new) RocksDB (and back to the changelog topic if required). Because the changelog topic may contain data with different format, we encode the format in the record header and enhance all changelog readers to check the header for the correct version. If upgrade mode is disabled, `StoreUpgradeBuilder` is used as a regular store builder returning the new store.store – the difference to use the a builder for the new store directly is two fold: (1) using StoreUpgradeBuilder we can perform a final check if the upgrade finished. If not, we can first create no active tasks, but only StoreUpgradeTasks using an upgrade store. Only after the upgrade is finished, StoreUpgradeTasks are destroyed and regular task using the new stores are created. (2) After a successful update, users don't need to rewrite their code (of course, they still can rewrite the code after a successful upgrade and replace `StoreUpgradeBuilder` with a store builder for the new store only).

As mentioned, we generalize the changelog-to-store mapping from CosumerRecords to KeyValue pairs. If a store (like KeyValueWithTimestampStore) requires a non-default mapping, the corresponding As mentioned, we generalize the changelog-to-store mapping from CosumerRecords to KeyValue pairs. If a store (like KeyValueWithTimestampStore) requires a non-default mapping, the corresponding `StateRestoreCallback` must implement `RecordConverter`, too.

...

For the actual upgrade, that might be "in place" we need to make sure to use different directories. Thus, StoreUpgradTasks create store directories with suffix _prepare, ie, for each active task with task directory `X_Y` a StoreUpgradeTask will use task directory `Z`X_Y_prepare`. The directory isolation at task level ensures that we can reuse the same internal store directory structure for active and store-upgrade tasks. After the stores are restored, in a second rebalance, the old task directory will be deletedrenamed, and the "prepare" directory will be renamed to act as new active task directory, and finally we delete the renamed original task directory to free up the disk space.

Compatibility, Deprecation, and Migration Plan

...

  • In-place upgrade: this requires two rolling bounces of each application instance
    • advantage: simpler than roll-over upgrade
    • disadvantage: need 2x local disk storage during upgrade
    • upgrade flow:
      1. prepare a jar hot swap from old version; Kafka Streams need to be configured with upgrade.mode="in_place" for startup
      2. do a rolling bounce to get the new jar and config in place for each instance
      3. because upgrade mode is "in place", each instance will create a "store upgrade" task to each assigned active and standby task, that start to rebuild RocksDB stores in new format in parallel to existing RocksDB
        1. existing RocksDB will be used for processing so there is no down time
        2. each instance builds up the new RocksDB state for all its assigned active and standby tasks
        3. during restore, users need to observe restore progress (as actual processing resumes, restore will never the "finished", and it's up to the user when to trigger the second rolling bound)
      4. after all stores are prepared, user prepares a second round of rebalance; this time, the configuration parameter upgrade.mode must be removed for new startup
      5. do a second rolling bounce for each instance to get new config
        1. if prepare task directories are detected, we check if upgrade is finished and if not finish the upgrade (ie, read the latest delta from the changelog), the corresponding active directories are deleted and replace by the prepare task directories are renamed
        2. afterwards regular store might not be up-to-date yet and will be restored with new format before processing begins
  • Roll-over upgrade:
    • in-place upgrade might not be feasible because of its large local disk storage requirement; thus, roll-over is an alternative to enable upgrading if not enough local disk space is availalbe for a in-place upgrad
    • if an application is running with N instances, user starts N new instances in parallel (the all form one consumer group); the new intances rebuild the RocksDB state with the new format; finally, the old instances are stopped
    • upgrade flow:
      1. prepare a jar hot swap from old version for all existing (old) instances; Kafka Streams need to be configured with upgrade.mode="roll_over" for startup
      2. do a rolling bounce to get the new jar and config in place for each instance
        1. all old instances will just resume processing as usual
        2. because upgrade mode if "roll over" no store upgrade tasks are started (cf. "In-place" upgrade above)
      3. users starts N new instances with config parameter upgrade.mode="roll_over_new"
        1. The `roll_over_new` config will be encoded in the subscription metadata
        2. the leader can distinguish between old and new instances based on the used Subscription encoded information and assign tasks (active and restore) for restore to the "roll over" instances (ie, the assignment is "mirrored" to all "roll over instances)
        3. "roll over" instances only create StoreUpgradeTasks and perform the restore
      4. all old instances are stopped
        1. as long as at least one old instance is alive, the leader assign all aktive tasks to the old instances only
        2. however, as those are shut down in a row, idealy a rebalance will only trigger after all are down already (note, Kafka Streams does not send a "leave group request" and thus a rebalance only happens after session.timeout passes)
        3. when all old instances are down, the leader only receives version 3 Subscriptions and computes are version 3 Assigment base on the prepared stores and upgrade is finished
    • upgrading from 0.10.0.x to 1.2 uses the same upgrage pattern with config upgrade.from="0.10.0.x" and upgrade.mode="roll_over" must be used
      • instead of old Subscription and Assigmentmet metadata verions 2, metadata version 1 is used

...