Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Records are explicitly deleted once they have been fully consumedKafka Streams treats repartition topics differently to regular topics. Instead of setting arbitrary retention criteria and having the broker cleanup old records, Kafka Streams sets infinite retention on repartition topics and explicitly deletes records once they've been committed to the next topic in their Topology. Currently, this is done every time the Task is committed, resulting in explicit "delete records" requests being sent every commit.interval.ms milliseconds.

When commit.interval.ms is set very low, for example when processing.guarantee is set to exactly_once_v2, this causes delete records requests to be sent extremely frequently, potentially reducing throughput and causing a high volume of log messages to be logged by the brokers.

Public Interfaces

...

New configuration options

NameTypeImportanceDefaultDescription
repartition.purge.interval.msLongLOW30000The frequency in milliseconds with which to delete fully consumed records from repartition topics.  The default value is the same as the default for commit.interval.ms (30000).  Records are only deleted after a successful commit, so setting this to a value equal to or larger than commit.interval.ms will bound it the commit interval. (Note, unlike commit.interval.ms, the default for this value remains unchanged when processing.guarantee is set to exactly_once_v2).

Proposed Changes

Adding a new configuration option, deleterepartition.purge.interval.ms, that configures the frequency these explicit record deletions are sent will resolve the issue, by enabling users to tune the commit.interval.ms anddelete repartition.purge.interval.ms separately.

We will still wait for a commit before explicitly deleting repartition records, but we will only do so if the time since the last record deletion is at least delete.interval.ms. This means the lower-bound for deleterepartition.purge.interval.ms  is effectively capped by the value of commit.interval.ms.

Compatibility, Deprecation, and Migration Plan

  • The interval between explicit delete requests for repartition records will no longer be coupled to commit.interval.ms. Default behaviour is unchanged, however:
    • When commit
    Default value for delete
    • .interval.ms  is explicitly modified by the user, old repartition records will no longer be
    set to 30 seconds , the (current) default value of
    • deleted on every commit.
    • When processing.guarantee is set to exactly_once_v2, since the default commit.interval.ms
    . This ensures that users who do not modify either setting will retain the existing behaviour.
    • is changed internally to 100 ms, old repartition records will no longer be deleted on every commit.
    • Users can regain this coupling by explicitly configuring both
    • For users that use EOS, the default commit.interval.ms is automatically reduced to 100ms. The default value of delete and repartition.purge.interval.ms will not be reduced when EOS is enabled, to ensure that these users benefit from the improved performance of these changesthe same value.

Rejected Alternatives

Modifying the explicit deletion of records to be completely independent of commits such that delete.interval.ms is strictly adhered to, irrespective of the value of commit.interval.ms was not explored, as the increased complexity of the changes may introduce bugs, with little additional benefit.