You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

This would be a two phase change:

  1. First add a new API to move the details of offset storage to the broker.
  2. Improve the way we store the offsets to move off Zookeeper

Phase I: API

The JIRA for this API is here.

OffsetCommit

This api saves out the consumer's position in the stream for one or more partitions. In the scala API this happens when the consumer calls commit() or in the background if "autocommit" is enabled. This is the position the consumer will pick up from if it crashes before its next commit().

Request:

OffsetCommitRequest => ConsumerGroup [TopicName [Partition Offset]]
  ConsumerGroup => string
  TopicName => string
  Partition => int32
  Offset => int64

Response:

OffsetCommitResponse => [TopicName [Partition ErrorCode]]]
  ErrorCode => int16
OffsetFetch

This api reads back a consumer position previously written using the OffsetCommit api.

Request:

OffsetFetchRequest => ConsumerGroup [TopicName [Partition]]

Response:

OffsetFetchResponse => [TopicName [Partition Offset ErrorCode]]

For phase I the implementation would remain the existing zookeeper structure:

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value

If we started requiring zookeeper 3.4.x we could potentially optimize the implementation to use zk multi support to bundle together the updates (and reads?).

Integration With Clients

These APIs are optional, clients can store offsets another way if they like.

In the scala client we should not try to support "pluggable storage" but only implement support for using this API. To support folks who want to store offsets another way we already give back offsets in the message stream so they can store them in the way that makes sense. This will make more sense then some kind of SPI interface thingy. What is laking to complete this picture is allowing the consumer to initialize to particular known offset, but that can be added as a separate issue. If we had this, then a consumer would just need to turn off autocommit and implement the storage mechanism of their choice without needing to implement a particular interface.

Open Questions

Which servers can handle offset storage for which partitions?

With zookeeper there is no reason that we can't make any server handle all partitions. This might or might not be true for another storage implementation.

Do we need some kind of optimistic locking?

E.g. the request could potentially include the current offset and would have the semantics "update the offset to x, iff the current offset is y". This would be nice for sanity checking a consumer implementation, but in the scala implementation the mutual exclusion for consumption is handled by zookeeper (and we had done some work to port this over to the broker) so this would just be a nice sanity check.

What are the error conditions?

Would be good to enumerate these.

What should the behavior be if you attempt to store offsets for topics or partitions that don't exist?

We have two use cases where it might be nice to abuse this api a bit. The first is in the mirroring process which copies from one cluster to another. In this case it might be nice for the copy process to use the cluster it is local to (assuming the clusters are in separate data centers). This might or might not be the cluster you are consuming from. The question is, what should happen if you try to store offsets for partitions or topics not in the cluster? Should we try to prevent this? Why? How would the server do this check?

Phase 2: Backend storage

Zookeeper is not a good way to service a high-write load such as offset updates because zookeeper routes each write though every node and hence has no ability to partition or otherwise scale writes. We have always known this, but chose this implementation as a kind of "marriage of convenience" since we already depended on zk. The problems in this have become more apparent in our usage at LinkedIn with thousands of partitions and hundreds of consumers--even with pretty high commit intervals it is still...exciting.

The general requirements for offset storage are the following:

  1. High write load: it should be okay if consumers want to commit after every message if they need to do that (it will be slower, but the system shouldn't collapse). We need to be able to horizontally scale offset writes.
  2. Durability: once an offset is written it can't be lost even if a server crashes or disappears.
  3. Consistent reads: all reads must return the last written value--no "eventual consistency" can be apparent.
  4. Small data size--potentially in memory. A consumer-group/topic/partition/offset tuple should be no more than 64 bytes in memory even with all the inefficiencies of jvm object overhead assuming the strings are interned and a compact lookup structure. So each 1GB of memory could potentially support ~16 million entries.
  5. Transactionality across updates for multiple topic/partitions. This is perhaps not a hard requirement but would be nice to have. What I mean by this is that if a consumer subscribes to two partitions and issues a commit request for its offset in both partitions that commit should either fail for both or succeed for both, since handling a partial success could be fairly complex. This is not true in our zk usage today fwiw.

So here is a strawman proposal. We make use of the keyed topic support we are adding and store the offset commits in Kafka as a topic. The implementation of OffsetCommit would just be publishing a message to the "offset-commit" topic. The cleaner process described in the keyed topic support wiki would do the deduplication to remove obsolete messages from the log periodically. This would support very high write throughput with a partitioning model to scale load as well as giving durability and fault tolerance guarantee from Kafka replication. This obviously would not support fast reads, though. To support fast reads the brokers would create an "index" of this topic in memory by also populating a hash map or other in-memory store. On restart the broker would restore this hash map from the offset-commit topic's contents.

There are a number of ways to handle partitioning of this "offset-commit" topic, each with different trade offs.

My proposal would be that we partition by consumer group, and that each message in the topic contain the complete set of updates sent in one request. This would give us the transactionality since our guarantees today are always at the message level. The broker would explode each of these multi-partition updates into multiple hash map entries. The partitioning would be handled by the broker. That is, the client could issue its commit request to any broker which would in turn issue a send() call to the broker responsible for the partition containing that consumer group. Likewise the get offset call would be proxied to the correct broker (unless by chance it landed on the correct broker).

The advantage of this approach is that updates are transactional and also that the client can be stupid and just direct its requests to whatever broker it happens to be connected to.

The downside of this particular partitioning arrangement is the "two hops" necessary. A savy client could optimize this away by using the same hash the broker does on the consumer group to chose the broker to make its request to.

  • No labels