Over several years of usage we have run into a number of complaints about the scala client. This is a proposal for addressing these complaints.
This would likely be done in the timeframe of a 0.9 release.
Here is a dump of all the problems we have seen and some solutions:
The idea would be to roll out the new api as a separate jar, leaving the existing client intact but deprecated for one or two releases before removing the old client. This should allow a gradual migration.
SendResponse send(KafkaMessage... message); Usage: Producer producer = new Producer(new ProducerConfig(props)); SendResponse r = producer.send(new KafkaMessage(topic, key, message)); r.onCompletion(new Runnable() {System.out.println("All done")}) r.getOffset() r.getError() |
As today we will have a single background thread that does message collation, serialization, and compression. However this thread will now run an event loop to simultaneously send requests and receive responses from all brokers.
In 0.8, the producer throughput is limited due to the synchronous interaction of the producer with the broker. At a time, there can only be one request in flight per broker and that hurts the throughput of the producer severely. The problem with this approach also is that it doesn't take advantage of parallelism on the broker where requests for different partitions can be handled by different request handler threads on the broker, thereby increasing the throughput seen by the producer.
Request pipelining allows multiple requests to the same broker to be in flight. The idea is to do I/O multiplexing using select(), epoll(), wait() and wait for at least one readable or writable socket instead of doing blocking reads and writes. So if a producer sends data to 3 partitions on 2 brokers, the difference between blocking sends/receives and pipelined sends/receives can be understood by looking at the following picture
There are 2 threads, depending on how metadata requests are handled.
There are 2 choices here -
There will be one queue with partition "-1" per topic. The events with null key will enter this queue. There will be on "undesignated" partition queue per topic. This is done to prevent fetching metadata on the client thread when the producer sends the first message for a new topic.
Pros:
Cons:
Pros:
Less memory overhead since there is only one queue data structure for all events
Cons:
Partitioning can happen before the event enters the queue. The advantage is that the event does not require re-queuing if one queue per partition approach is used. Events with null key will be queued to the <topic>-"-1" queue.
When a producer sends the first message for a new topic, it enters the <topic>-undesignated queue. The metadata fetch happens on the event thread. There are 2 choices on how the metadata fetch request will work -
Pros:
Simplicity of code
Cons:
If leaders only for a subset of partitions have changed, a synchronous metadata request can potentially hurt the throughput for other topics/partitions.
Pros:
More isolation, better overall throughput
One option is doing this before the event enters the queue and after partitioning. Downside is potentially slowing down the client thread if compression or serialization takes long. Another option is to just do this in the send thread, which seems like a better choice.
Producer maintains a map of broker to list of partitions that the broker leads. Batch size is per partition. For each broker, if the key is in write state, the producer's send thread will poll the queues for the partitions that the broker leads. Once the batch is full, it will create a ProducerRequest with the partitions and data, compress the data, and writes the request on the socket. This happens in the event thread while handing new requests. The collation logic gets a little complicated if there is only one queue for all topics/partitions.
When the producer send thread polls each partition's queue, it compresses the batch of messages that it dequeues.
while(isRunning) { // configure any new broker connections // select readable keys // handle responses // select writable keys // handle topic metadata requests, if there are non-zero partitions in error // handle incomplete requests // handle retries, if any // handle new requests } |
Here is an example of the proposed consumer API:
Consumer consumer = new Consumer(props); consumer.addTopic("my-topic"); // consume dynamically assigned partitions consumer.addTopic("my-other-topic", 3); // long timeout = 1000; while(true) { List<MessageAndOffset> messages = consumer.poll(timeout); process(messages); consumer.commit(); // alternately consumer.commit(topic) or consumer.commit(topic, partition) } |
As before the consumer group is set in the properties and for simplicity each consumer can belong to only one group.
addTopic is used to change the set of topics the consumer consumes. If the user gives only the topic name it will automatically be assigned a set of partitions based on the group membership protocol below. If the user specifies a partition they will be statically assigned that partition.
The actual partition assignment can be made pluggable (see the proposal on group membership below) by setting an assignment strategy in the properties.
This api allows alternative methods for managing offsets as today by simply disabling autocommit and not calling commit().
The client would be entirely single threaded.
The poll() method executes the network event loop, which includes the following:
The timeout the user specifies will be purely to ensure we have a mechanism to give control back to the user even when no messages are delivered. It is up to the user to ensure poll() is called again within the heartbeat frequency set for the consumer group. Internally the timeout on our select() may uses a shorter timeout to ensure the heartbeat frequency is met even when no messages are delivered.
We will introduce a set of RPC apis for managing partition assignment on the consumer side. This will be based on the prototype here. This set of APIs is orthogonal to the APIs for producing and consuming data, it is responsible for group membership.
api |
sender |
description |
issue to |
---|---|---|---|
ListGroups |
client |
Returns metadata for one or more groups. If issued with a list of GroupName it returns metadata for just those groups. If no GroupName is given it returns metadata for all active groups. This request can be issued to any server. |
any server |
CreateGroup |
client |
Create a new group with the given name and the specified minimum heartbeat frequency. Return the id/host/port of the server acting as the controller for that group. |
any server |
DeleteGroup |
client |
Deletes a non-emphemeral group (but only if it has no members) |
controller |
RegisterConsumer |
client |
Ask to join the given group. This happens as part of the event loop that gets invoked when the consumer uses the poll() API |
controller |
Heartbeat |
client |
This heartbeat message must be sent by the consumer to the controller with an SLA specified in the CreateGroup command. The client can and should issues these more frequently to avoid accidentally timing out. |
controller |
Heartbeat Version => int16 CorrelationId => int64 ClientId => string SessionTimeout => int64 |
Version => int16 CorrelationId => int64 ControllerGeneration => int64 ErrorCode => int16 // error code is non zero if the group change is to be initiated |
Version => int16 CorrelationId => int64 ClientId => string ConsumerGroup => string SessionTimeout => int64 Topics => [string] |
Version => int16 CorrelationId => int64 ConsumerGroup => string ErrorCode => int16 |
Version => int16 CorrelationId => int64 ClientId => string ConsumerGroup => [string] |
Version => int16 CorrelationId => int64 DeletedConsumerGroups => [string] ErrorCode => int16 |
Version => int16 CorrelationId => int64 ClientId => string ConsumerGroup => string ConsumerId => string |
Version => int16 CorrelationId => int64 ClientId => string ConsumerGroup => string PartitionsToOwn => [{Topic Partition Offset}] Topic => string Partition => int16 Offset => int64 ErrorCode => int16 |
Version => int16 CorrelationId => int64 ClientId => string ConsumerGroups => [string] |
Version => int16 CorrelationId => int64 ClientId => string GroupsInfo => [{GroupName, GroupMembers, Topics, ControllerBroker, SessionTimeout}] GroupName => string GroupMembers => [string] Topics => [string] ControllerBroker => Broker Broker => BrokerId Host Port BrokerId => int32 Host => string Port => int16 ErrorCode => int16 |
Version => int16 CorrelationId => int64 ClientId => string ConsumerGroup => string NewOffsets => [{Topic Partition Offset}] Topic => string Partition => int16 Offset => int64 |
Version => int16 CorrelationId => int64 ConsumerGroup => string ActualOffsets => [{Topic Partition Offset}] Topic => string Partition => int16 Offset => int64 ErrorCode => int16 |
The use of this protocol would be as follows:
On startup or when co-ordinator heartbeat fails -
When new partitions are added to existing topics or new topics are created -
while(true) { List<MessageAndMetadata> messages = consumer.poll(timeout); process(messages); if(rewind_required) { List<PartitionOffset> partitionOffsets = new ArrayList<PartitionOffset>(); partitionOffsets.add(new PartitionOffset(topic, partition, offset)); rewind_offsets(partitionOffsets); } } |
TBD we need to design an underlying client network abstraction that can be used to send requests. This should ideally handle metadata and addressing by broker id, multiplex over multiple connections, and support both blocking and non-blocking operations.
Something like
KafkaConnection connection = new KafkaConnection(bootstrapUrl, socket_props); List<Responses> connection.poll(timeout, Request...requests); |
This is an async api so the responses responses returned have no relationship to the request sent. This is meant to be called from within an event loop--i.e. each call to poll corresponds to one iteration of the select() loop. poll() with no request simply checks for any new responses to previous requests or non-client-initiated communication.
Currently we do a lot of logging and metrics recording in the client. I think instead of handling reporting of metrics and logging we should instead incorporate this feedback in the client apis and allow the user to log or monitor this in the manner they chose. This will avoid dependence on a particular logging or metrics library and seems like a good policy.
It is not clear how to support changing the offset either manually or programmatically while consumption is happening. Simply using the api to set the offset will likely collide with commit calls from the active consumer. Perhaps this is not needed?
How many jars should we have? I think we could do either
I prefer Option A as it is simpler if we add an AdminApi--we won't need to introduce a whole new jar for it.