Current Architecture

In 0.8.1.1 with in-built offset management, server side architecture with the life cycle of the produce/fetch request is summarized below (caller --> callee):

ProduceRequest --> 1) KafkaApis.appendToLocalLog()         --> ReplicaManager.getPartition()        --> Partition.appendMessagesToLeader()
                   
                   2) KafkaApis.maybeUnblockDelayedFetch()

                   3) RequestChannel.sendResponse()   OR   ProducerRequestPurgatory.watch()  

                   4) ProducerRequestPurgatory.update()

------------------------------------------------------------------------------------------------------

FetchRequest -->   1) KafkaApis.maybeUpdatePartitionHW()   --> ReplicaManager.recordFollowerPosition()   -->   Partition.updateLeaderHWAndMaybeExpandIsr() / maybeIncrementLeaderHW()

                   2) ProducerRequestPurgatory.update()    --> DelayedProduce.isSatisfied()              --> KafkaApis.maybeUnblockDelayedFetchRequests()

                   3) KafkaApis.readMessageSet             --> ReplicaManager.getReplica()               --> Log.read()

                   4) RequestChannel.sendResponse()   OR   FetchRequestPurgatory.watch()  

------------------------------------------------------------------------------------------------------

ProducerRequestPurgatory:

    // called as step 4) of handling produce request, or step 2) of handling fetch request
    ProducerRequestPurgatory.update()                       --> DelayedProduce.respond()           --> RequestChannel.sendResponse()  

    // any time
    ProducerRequestPurgatory.expire()                       --> DelayedProduce.respond()           --> RequestChannel.sendResponse()

FetchRequestPurgatory:

    // called as step 2) of handling produce request, or inside DelayedProduce.isSatisfied() 
    KafkaApis.maybeUnblockDelayedFetchRequests()            --> FetchRequestPurgatory.update()     --> DelayedFetch.respond()             --> RequestChannel.sendResponse() 

    // any time
    FetchRequestPurgatory.expire()                          --> DelayedFetch.respond()             --> RequestChannel.sendResponse()

 

The problem

As we can see from above, since delayed produce needs to access KafkaApis.maybeUnblockDelayedFetchRequests(), etc, and delayed fetch needs to fetch the data to form the response. As a result, we ended up keeping the appending and fetching logic inside KafkaAPIs and also keeping purgatories/delayed requests inside Kafka APIs to let them access these functions/variables. The problems for this are:

1) Logic of the append message / read message from Replica Manager leaks into KafkaAPIs, and KafkaAPIs itself becomes very huge containing all purgatory / delayed requests classes.

2) However, logic for satisfying delayed fetch requests are not correct: it needs to be related to HW modifications. Hence it needs to also access Partition, which will lead to more logic leak if we follow current architecture.

3) With inbuild offset management, we have to hack the KafkaAPIs and its corresponding delayed requests as follows:

CommitOffsetRequest --> 1) KafkaApis.appendToLocalLog()         --> OffsetManager.producerRequestFromOffsetCommit()    // returns a new ProducerRequest from the OffsetCommitRequest

                        2) KafkaApis.appendToLocalLog()
 
                        3) OffsetManager.putOffsets()   // put the offset into cache

                        4) OffsetManager.offsetCommitRequestOpt().get()    // transform back a OffsetCommitResponse

                        5) RequestChannel.sendResponse()   OR   ProducerRequestPurgatory.watch()  
                    
------------------------------------------------------------------------------------------------------

DelayedProduce.respond()  --> 1) if(not timed out) OffsetManager.putOffsets()
                          
                              2) OffsetManager.offsetCommitRequestOpt().get()                                                   

                              3) RequestChannel.sendResponse()

 

The architecture diagram is shown below:

The Idea

Is to refactor the Kafka Apis along with Replica Manager and Offset Manager (Coordinator) such that the produce/fetch purgatories are moved to replica manager, and are isolated from requests (i.e. they may be just purgatories for append and fetch operations). By doing so:

1) Kafka API becomes thinner, only handling request-level application logic and talk to Request Channel.

2) Read message and append message logic is moved to Replica Manager, which handles the logic of "committed" appending and fetching "committed data".

3) Offset Manager (Coordinator) only needs to talk to the Replica Manager for handling offset commits, no need to hack Kafka Apis and Delayed Fetch requests.

 

This refactoring will also benefit the following new consumer / coordinator development.

 

Purgatories and Delayed Operations

We refactor the purgatories and delayed requests (now should be called operations) as follows. Here all the module names remain the same, with renaming suggestion in the bracket just for clear indication.

Delayed Request (Kafka Operation)

The base kafka operation just contains:

keys: Seq[Any] // currently it is called DelayedRequestKey, but we can rename it as we like

timeout: long // timeout value in milliseconds

callback: Callback // this is the callback function triggered upon complete, either due to timeout or operation finished

One note here is that a new callback instance needs to be created for each operation, since it will need to remember the request object that it needs to respond on. The base callback class can be extended, with the basic parameters:

// trigger the onComplete function given that whether the operation has succeeded or failed (e.g. timed out).
onComplete(Boolean)

 

Besides these fields, a kafka operation also have the following interface:

// check if myself is satisfied
isSatisfied(): Boolean

// operations upon expiring myself
expire() = this.callback.onComplete(false) // can be overridden for recording metrics, etc

 

Delayed Produce Request (Message Append Operation)

Maintains the append metadata, including the replicate condition:

appendStatus: Map[TopicAndPartition, AppendStatus] // AppendStatus include starting offset, required offset, error code, etc

replicateCondition: ReplicateCondition // ReplicationCondition can be just a ack integer, but may be extended in the future

 

In addition, it implements the interface as:

isSatisfied(replicaManager): Boolean = // access the replicaManager to check if each partition's append status has satisfied the replicate condition

 

Delayed Fetch Request (Message Fetch Operation)

Maintains the fetch metadata, including the fetch min bytes:

fetchInfo: Map[TopicAndPartition, LogOffsetMetadata] // LogOffsetMetadata include message offset, segment starting offset and relative segment position

fetchMinBytes: int

 

And implements the interface as:

isSatisfied(replicaManager): Boolean = // access the replicaManager to check if the accumulated bytes exceeds the minimum bytes, with some corner case special handling.

 

Request Purgatory (Kafka Purgatory)

The base purgatory provides the following APIs:

// check if an operation is satisfied already; if not, watch it.
maybeWatch(operation: Operation): Boolean

// return a list of operations that are satisfied given the key
update(Any): List[KafkaOperations]

 

And its expiry reaper will purge the watch list and for each expired operation trigger operation.expire(). This purgatory is generic and can be actually used for different kinds of operations.

Kafka Server Modules

With these purgatories and operations, server side modules can be refactored as follows:

Replica Manager

Replica Manager maintain metadata of partitions, including their local and remote replicas, and talks to Log Manager for log operations such as log truncation, etc. Here is the proposed API:

 

// append messages to leader replicas of the partition, and wait for replicated to other replicas,
// the callback function will be triggered either when timeout or the replicate condition is satisfied
appendMessages(Map[TopicAndPartition, ByteBufferMessageSet], ReplicateCondition /* acks, etc */,  long /* timeout */, Callback) {

  // 1. Partition.appendToLocalLog()
  // 2. If can respond now, call Callback.onComplete(true)
  // 3. Otherwise create new DelayedAppend(..., Callback)
  // 4. AppendPurgatory.maybeWatch(append)
}

// fetch only committed messages from the leader replica,
// the callback function will be triggered either when timeout or required fetch info is satisfied
fetchMessages(Map[TopicAndPartition, FetchInfo], int /* min bytes*/, long /* timeout */, RespondCallback) {

  // 1. Log.read()
  // 2. If can respond now, call Callback.onComplete(true)
  // 3. Otherwise create new DelayedFetch(..., new FetchCallback() { onComplete(): { fetchMessages; RespondCallback.onComplete(true); } } )
  // 4. FetchPurgatory.maybeWatch(append)
}

// stop a local replica
stopReplica(TopicAndPartition, Boolean)

// make local replica leader of the partitions
leadPartition(Map[TopicAndPartition, PartitionState])

// make local replica follower of the partitions
followPartition(Map[TopicAndPartition, PartitionState])

// get (or create) partition, get (or create) replica, etc..
getPartition(TopicAndPartition)
getReplica(TopicAndPartition, int)
..

 

Coordinator / Offset Manager

Coordinator's offset manager will talk to the replica manager for appending messages.

 

// trigger the callback only when the offset is committed to replicated logs
putOffsets(Map[TopicAndPartition, OffsetInfo], RespondCallback) {

  // 1. replicaManager.appendMessage(... , new OffsetCommitCallback{ onComplete (): { putToCache; RespondCallback.onComplete(true); } })
}

// access the cache to get the offsets
getOffsets(Set[TopicAndPartition]) : Map[TopicAndPartition, OffsetInfo]

 

Kafka Apis

Now the Kafka APIs becomes:

 

handleProduce(ProduceRequest) = // call replica-manager's appendMessages with callback sending produce response

handleFetch(FetchRequest) = // call replica-manager's fetchMessages with callback sending fetch response

handleCommitOffset(CommitOffsetRequest) = // call coordinator's offset manager with callback sending commit offset response

handleFetchOffset(FetchOffsetRequest) = // call coordinator's offset manager to get the offset, and then send back the response.

 

Request Handling Workflow

With the above refactoring, the request life cycle becomes:

ProduceRequest --> KafkaApis.handleProduce()         --> ReplicaManager.appendMessages()               --> Partition.appendMessagesToLeader() 

                                                                                                       --> AppendPurgatory.maybeWatch()

                                                     --> Callback(): RequestChannel.sendResponse()
                 
------------------------------------------------------------------------------------------------------

FetchRequest -->   KafkaApis.handleFetch()           --> ReplicaManager.fetchMessages()                --> ReplicaManager.readMessageSet()        --> Log.read()

                                                                                                       --> FetchPurgatory.maybeWatch(new OffsetCommitCallback)

                                                                                                       --> OffsetCommitCallback(): ReplicaManager.readMessageSet()

                                                                                                                                   RespondCallback.onComplete()


                                                     --> RespondCallback: RequestChannel.sendResponse()

                                                     

------------------------------------------------------------------------------------------------------

FetchRequest -->   KafkaApis.handleCommitOffset()    --> Coordinator.CommitOffsets()                   --> ReplicaManager.appendMessages(new OffsetCommitCallback)        --> Partition.appendMessagesToLeader() 

                                                                                                                                                                          --> AppendPurgatory.maybeWatch()

                                                                                                       --> OffsetCommitCallback(): OffsetManager.putOffsets()

                                                                                                                                   RespondCallback.onComplete()

                                                         RespondCallback(): RequestChannel.sendResponse()

------------------------------------------------------------------------------------------------------

Partition.maybeIncrementLeaderHW()                      --> ReplicaManager.unblockDelayedFetchRequests() / unblockDelayedProduceRequests()

Partition.appendMessagesToLeader()                      --> ReplicaManager.unblockDelayedFetchRequests()

Partition.recordFollowerLOE()                           --> ReplicaManager.unblockDelayedProduceRequests()

 

The caveat

As we can see, with fetch request and offset commit request, a nested callback is used (RepondCallback from Kafka APIs for sending the response through channel, and FetchCallback / OffsetCommitCallback for fetching the data for response / putting offset into cache). This nesting is not ideal, but necessary if want to have the strict layered architecture.

 

The new architectural diagram will be:

 

 

  • No labels