You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state: Under Discussion

JIRA

Unable to render Jira issues macro, execution error. Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

State restoration is a key procedure in Kafka Streams when processing tasks are migrated in a rebalance, as well for maintaining standby tasks for failure recoveries.

This proposal aims to expose more visibilities around this procedure to users, and is composed of two components: 1) augmenting the metrics related to restoration, 2) add new APIs for StateRestoreListener.

Public Interfaces

Below summarizes the public API changes in this KIP.

Restoration metrics

All the metrics below would be on the thread-level. Note that we will have separate thread handling restoration procedures, and hence their thread id would be different from stream threads.

Metric tags are:

  • type=stream-state-updater-metrics
  • client-id=[clientId]
  • thread-id=[threadId]

Recording level is: INFO


The POC implementation of the proposed metrics can be found here: https://github.com/apache/kafka/pull/12391


Metric Name

Type

DescriptionNotes
active-restoring-tasks
countThe number of active tasks currently undergoing restoration
standby-updating-tasks
countThe number of active tasks currently undergoing updating
active-paused-tasks
countThe number of active tasks paused restoring
standby-paused-tasks
countThe number of standby tasks paused updating
idle-ratio
gauge (percentage)The fraction of time the thread spent on being idleidle-ratio + restore-ratio + checkpoint-ratio should be 1
restore-ratio
gauge (percentage)The fraction of time the thread spent on restoring active or standby tasksidle-ratio + restore-ratio + checkpoint-ratio should be 1
checkpoint-ratio
gauge (percentage)The fraction of time the thread spent on checkpointing restored progressidle-ratio + restore-ratio + checkpoint-ratio should be 1
active-records-restored-total
countThe total number of records restored for active tasksit is for the lifetime of the streams app, hence ever going 
standby-records-updated-total
countThe total number of records updated for active tasksit is for the lifetime of the streams app, hence ever going 
active-records-remaining
countThe number of records remained to be restoredit should be usually declining, and during rebalance it may be jumping up or down
standby-records-remaining
countThe number of records remained to be updatedit could be usually increasing or declining, and during rebalance it may be jumping up or down
records-restored-rate
rateThe average per-second number of records restored for active or updated for standbyit counts for both active and standby tasks
restore-call-rate
rateThe average per-second number of restore calls triggered


Along with these new metrics, we would also deprecate the metrics below:

Metric Name

Type

DescriptionNotes
standby-process-ratio
gaugeTask-level; the fraction of time the processing thread spent on processing this standby taskRemoved since standby tasks are not processed by stream thread


New Method in StateRestoreListener

When an active task starts restoration, StateStoreListener#onRestoreStart would be triggered. The restoring task could end in two possible ways:

1) Restoration completes and the task could now be processed normally with incoming stream records. At this time StateStoreListener#onRestoreEnd would be triggered. 

2) Restoration was paused before completes, e.g. since another rebalance is triggered and this task is suspended and potentially migrated out of the current host later. At this time no callbacks would be triggered.

We propose to cover the second case above with a new API, so that each onRestoreStart function would be paired with either an onRestoreEnd function or an onRestorePaused function. Note that if the suspended task was re-assigned back to the current host, another onRestoreStart would be triggered again.

public interface StateRestoreListener {

    void onRestoreStart(final TopicPartition topicPartition,
                        final String storeName,
                        final long startingOffset,
                        final long endingOffset);

    void onRestoreEnd(final TopicPartition topicPartition,
                      final String storeName,
                      final long totalRestored);

    ...

    /**
     * NEW FUNC. Method called when restoring the {@link StateStore} is suspended due to the task being suspended from the host.
     *           If the task was resumed after suspension and restoration continues, another {@link onRestoreStart} would be called. 
     */
    default void onRestoreSuspended(final TopicPartition topicPartition,
                                    final String storeName,
                                    final long totalRestored) {
        // do nothing
    } 
}


Compatibility, Deprecation, and Migration Plan

  • The default implementation of the new onRestorePaused function would be a no-op, to maintain backward compatibilities.
  • Deprecated metric would still be exposed, and only be removed in the next major release.

Rejected Alternatives

None.

  • No labels