Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will add a new configuration parameter upgrade.mode that will be null by default and can take two three values: "in_place" and "roll_over" with  with the following semantics:

  • null: no upgrade needed, run with latest formats
  • "in_place": prepare an in-place "standby" RocksDB with new format
  • "roll_over": prepare yourself for an upgrade of the rebalance user-metadata format and an roll-over upgrade of RocksDB on-disk "standby" RocksDB with new format

We add a new store type KeyValueWithTimstampStore that extends the exiting KeyValueStore.

...

  • In-place upgrade: this requires two rolling bounces of each application instance
    • advantage: simpler than roll-over upgrade
    • disadvantage: need 2x local disk storage during upgrade
    • upgrade flow:
      1. prepare a jar hot swap from old version; Kafka Streams need to be configured with upgrade.mode="in_place" for startup
      2. do a rolling bounce to get the new jar and config in place for each instance
      3. because upgrade mode is "in place", each instance will create a "store upgrade" task to each assigned active and standby task, that start to rebuild RocksDB stores in new format in parallel to existing RocksDB
        1. existing RocksDB will be used for processing so there is no down time
        2. each instance builds up the new RocksDB state for all its assigned active and standby tasks
        3. during restore, users need to observe restore progress (as actual processing resumes, restore will never the "finished", and it's up to the user when to trigger the second rolling bound)
      4. after all stores are prepared, user prepares a second round of rebalance; this time, the configuration parameter upgrade.mode must be removed for new startup
      5. do a second rolling bounce for each instance to get new config
        1. if prepare task directories are detected, we check if upgrade is finished and if not finish the upgrade (ie, read the latest delta from the changelog), the corresponding active directories are replace by the prepare task directories
        2. afterwards regular processing begins
  • Roll-over upgrade:
    • in-place upgrade might not be feasible because of its large local disk storage requirement; thus, roll-over is an alternative to enable upgrading if not enough local disk space is availalbe for a in-place upgrad
    • if an application is running with N instances, user starts N new instances in parallel (the all old and new form one consumer group); the new intances rebuild the RocksDB state with the new format; finally, the old instances are stopped
    • upgrade flow:
      1. prepare a jar hot swap from old version for all existing (old) instances; old Kafka Streams need to be configured with upgrade.mode="roll_overnull" for startup
      2. do a rolling bounce to get the new jar and config in place for each old instance
        1. all old instances will just resume processing as usual
        2. because upgrade mode if "roll over" is not enabled no store upgrade tasks are started (cf. "In-place" upgrade above)
      3. users starts N new instances with config parameter upgrade.mode="roll_over_new"
        1. The `roll_over_new` config will be encoded in the subscription metadata
        2. the leader can distinguish between old and new instances based on the used Subscription encoded information and assign tasks (active and restore) for restore to the "roll over" instances (ie, the assignment is "mirrored" to all "roll over" instances)
        3. "roll over" instances only create StoreUpgradeTasks and perform the restore
      4. all old instances are stopped
        1. as long as at least one old instance is alive, the leader assign all aktive tasks to the old instances only
        2. however, as those are shut down in a row, idealy a rebalance will only trigger after all are down already (note, Kafka Streams does not send a "leave group request" and thus a rebalance only happens after session.timeout passes)
        3. when all old instances are down, the leader only receives version 3 Subscriptions and computes are version 3 Assigment base on the prepared stores and upgrade is finished
      upgrading from 0.10.0.x to 1.2 uses the same upgrage pattern with config upgrade.from="0.10.0.x" and upgrade.mode="roll_over" must be usedinstead of old Subscription and Assigmentmet metadata verions 2, metadata version 1 is used
        1. subscriptions that encode "roll over" and thus the leader knows that the upgrade is completed

Test Plan

  • unit and integration tests for the new embedded timestamp feature
    • tests that insure the timestamps are written and read correctly and that records are (de)serialized correctly
    • this includes reading source KTables, aggregation operations that result in KTables, and all other operators that might force a KTable materialization
    • KTable recovery/restore must be tested.
  • unit and integration tests for StreamPartitionsAssigner to react correctly to configs and received subscription
  • system tests that perform rolling bounce upgrades as described above
    • this should include failure scenario during the upgrade
    • this should include "simulated upgrades" to metadata version 4, to ensure that the implementation work correct for future changes

...