You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state"Under Discussion"

Discussion thread: TODO

JIRA: KAFKA-6521 (related: KAFKA-3522)

Released: 1.2

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In order to improve the provided stream processing semantics of and to add new feature to Kafka Streams, we want to be able to store record timestamps in KTables. This allows us to address multiple issues like

  • handling out-of-order data for source KTable (related to KAFKA-5533)
  • add TTL for KTables (KAFKA-4212 and KAFKA-4273)
  • return the timestamp of the latest update in Interactive Queries (KAFKA-4304)
  • improve timestamp propagation for DSL operator (KAFKA-6455)

Public Interfaces

We only need to add a new configuration parameter enable.upgrade.mode that will be null by default and can take two values: "prepare_in_place_upgrade" and "prepare_roll_over_upgrade" with the following semantics:

  • null: no upgrade needed, run with latest formats
  • "prepare_in_place_upgrade": prepare yourself for an upgrade of the rebalance user-metadata format and an in-place upgrade of RocksDB on-disk format
  • "prepare_roll_over_upgrade": prepare yourself for an upgrade of the rebalance user-metadata format and an roll-over upgrade of RocksDB on-disk format

Proposed Changes

TODO

Compatibility, Deprecation, and Migration Plan

TODO

Test Plan

We need to write regular unit and integration tests for the new embedded timestamp feature, i.e., tests that insure the timestamps are written and read correctly and that records are (de)serialized correctly. This includes reading source KTables, aggregation operations that result in KTables, and all other operators that might force a KTable materialization. Furthermore, KTable recovery/restore must be tested.

Additionally, we need to write system tests that ensure that rolling bounce upgrades as describe work as expected for in-place upgrades as well as roll-over upgrades.

Rejected Alternatives

  • Change the RocksDB on-disk format and encode the used serialization version per record (this would simplify future upgrades). However there are main disadvantages:
    • storage amplification for local stores and changelog topics
    • record version could get stored in record headers in changelog topics -> changelog topic might never overwrite record with older format
    • code needs to check all versions all the time for future release: increases code complexity and runtime overhead
  • use a simpler upgrade path without any configs or complex rolling bounce scenarios
    • requires application down-time for upgrading to new format
  • only support in-place upgrade path instead of two to simplify the process for users (don't need to pick)
    • might be prohibitive if not enough disk space is available
  • allow users to stay with old format: upgrade would be simpler as it's only one rolling bounce
    • unclear default behavior: should we stay on 1.1 format by default or should we use 1.2 format by default?
      • if 1.1 is default, upgrade is simple, but if one write a new application, users must turn on 1.2 format explicitly
      • if 1.2 is default, simple upgrade requires a config that tells Streams to stay with 1.1 format
      • conclusion: upgrading and not upgrading is not straight forward either way, thus, just force upgrade
    • if no upgrade happens, new features (as listed above) would be useless

 

  • No labels