Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Changes to org.apache.kafka.streams.KafkaStreams, to expose lag for the given state store or across all stores hosted by the Streams instance (either as active or standby)


Code Block
<WIP>...







} // end of KafkaStreams 


Proposed Changes

In the current code, t0 and t1 serve queries from Active(Running) partition. For case t2, we are planning to return List<StreamsMetadata> such that it returns <StreamsMetadata(A), StreamsMetadata(B)> so that if IQ fails on A, the standby on B can serve the data by enabling serving from replicas. This still does not solve case t3 and t4 since B has been promoted to active but it is in Restoring state to catchup till A’s last committed position as we don’t serve from Restoring state in active and new replica on R is building itself from scratch. Both these cases can be solved if we start serving from Restoring state of active as well, since it is almost equivalent to previous active.

The new plan is to enhance the serving query capabilities to include standbys as well and have minimal changes in the core streams code. Initially, when a query comes a user can request for streams Metadata holding the active task(where the key resides) and then query for the key on that host which will return back the response along with the record lag(0 in case of Active/Running tasks) and time lag for the specific store being queried. If such a query fails due to Rebalancing/Node-unavailable user can decide to query streams Metadata holding standby tasks for the partition(where the key resides). This will just return a list of all available standby’s in the system and the user can make IQ query any of those nodes which will return the response, and the record lag and time lag. Based on which user can decide if the user wants to return the response back or call another standby.

  • AssignmentInfo changes to include Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost;  so that each machine knows which machine holds which standby partitions along with the active partitions(which they currently do). Consequently AssignmentInfo version will be bumped up to VERSION 6

  • Changing singnature signature of setPartitionsByHostState(partitionsByHostState, standbyPartitionsByHost) and to onChange() and further rebuildMetadata() to add Set<TopicPartition> standbyTopicPartitions in StreamsMetadata. This will add standby partitions in the metadata.

  • Addition of StreamsMetadataState::getStandbyMetadataListForKey() to returns a list of StreamsMetadata which contains all the standbys available in the system for the partition. We would have access to allMetadata containing activePartitions as well as ReplicaPartitions standby partitions in the StreamsMetadataState.java with the above changes.

  • Renaming partitionsForHost to activePartitionsForHost in StreamsMetadataState.java and partitionsByHostState to activePartitionsByHostState in StreamsPartitionAssignor.java

With

...

this

...

KIP,

...

the

...

onus

...

is

...

left

...

on

...

a

...

user

...

of

...

how

...

much

...

lag

...

they

...

are

...

okay

...

to

...

serve

...

queries

...

with.

...

Since,

...

after

...

this

...

KIP

...

there

...

is

...

the

...

capability

...

of

...

serving

...

from

...

a

...

restoring

...

active

...

as

...

well

...

as

...

running

...

standby

...

task

...

and

...

each

...

response

...

returns

...

the

...

lag

...

along

...

with

...

the

...

actual

...

value,

...

so

...

a

...

user

...

can

...

either

...

decide

...

to

...

discard

...

it

...

or

...

return

...

it

...

back

...

to

...

the

...

client. 

Compatibility, Deprecation, and Migration Plan

...

  • Adding a StreamsConfig to have a universal enableReplicaServing enableStandbyServing flag for the application. This would restrict the functionality as there could be multiple stores in an application and we need to have the flexibility to extend different consistency guarantees in such cases, which would be restricted by having a StreamsConfig.
  • Making the Streams APIs lag aware e.g only return standbys within a certain time/offset lag. It did not add a lot of value to push this into Streams. Instead, the KIP keeps Streams agnostic of what acceptable values for lag is and provides the application/user flexibility to choose.
  • Propagating lag information using the Rebalance protocol. Although it seemed like logical thing to do, with KIP-441 in mind, we decided against it due to various reasons. Foremost, the lag information is quickly outdated