Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3


Code Block
OffsetCommitRequest => ConsumerGroup [TopicName [Partition Offset Metadata]]
  ConsumerGroup => string
  TopicName => string
  Partition => int32
  Offset => int64
  Metadata => string

These fields should be mostly self explanatory, except for metadata. This is meant as a way to attach arbitrary metadata that should be committed with this offset commit. It could be the name of a file that contains state information for the processor, or a small piece of state. Basically it is just a generic string field that will be passed back to the client when the offset is fetched. It will likely have a tight size limit to avoid server impact.


Code Block
OffsetCommitResponse => [TopicName [Partition ErrorCode]]]
  ErrorCode => int16


Code Block
OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]]

For phase I the implementation would remain the existing zookeeper structure:


In the scala client we should not try to support "pluggable storage" but only implement support for using this API. To support folks who want to store offsets another way we already give back offsets in the message stream so they can store them in the way that makes sense. This will make more sense then some kind of SPI interface thingy. What is laking to complete this picture is allowing the consumer to initialize to particular known offset, but that can be added as a separate issue. If we had this, then a consumer would just need to turn off autocommit and implement the storage mechanism of their choice without needing to implement a particular interface.

Open Questions

Which servers can handle offset storage for which partitions?

With zookeeper there is no reason that we can't make any server handle all partitions. This might or might not be true for another storage implementation.

Do we need some kind of optimistic locking?

E.g. the request could potentially include the current offset and would have the semantics "update the offset to x, iff the current offset is y". This would be nice for sanity checking a consumer implementation, but in the scala implementation the mutual exclusion for consumption is handled by zookeeper (and we had done some work to port this over to the broker) so this would just be a nice sanity check.

What are the error conditions?

Would be good to enumerate these.

What should the behavior be if you attempt to store offsets for topics or partitions that don't exist?

We have two use cases where it might be nice to abuse this api a bit. The first is in the mirroring process which copies from one cluster to another. In this case it might be nice for the copy process to use the cluster it is local to (assuming the clusters are in separate data centers). This might or might not be the cluster you are consuming from. The question is, what should happen if you try to store offsets for partitions or topics not in the cluster? Should we try to prevent this? Why? How would the server do this check?

Phase 2: Backend storage

Zookeeper is not a good way to service a high-write load such as offset updates because zookeeper routes each write though every node and hence has no ability to partition or otherwise scale writes. We have always known this, but chose this implementation as a kind of "marriage of convenience" since we already depended on zk. The problems in this have become more apparent in our usage at LinkedIn with thousands of partitions and hundreds of consumers--even with pretty high commit intervals it is still...exciting.


I would propose that any broker can handle an offset request to make life easy for the client. If the broker happens to be the broker that is the master for the log for the consumer group doing the commit it can just apply the change locally, if not it would invoke a commit request to the appropriate server that is currently the leader for the correct partition. A simple client can just direct their requests anywhere; a client that optimizes a bit can try to hit the right server and save themselves the extra hop.

Some Nuances

All replicas will keep the in-memory lookup structure for offsets. This could just be a simple hashmap. This will contain only committed offsets to ensure we never need to undo updates.
