You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Here is a proposal for refactoring the producer API. There are several goals:

  1. Make it more natural to store keys in the messages. Currently the ProducerData maps a key to a list of values, which is odd and doesn't map well to the data model in Message.
  2. Return back the offset of the message that was written.
  3. Enable asynchonous I/O even in the sync case.
  4. Improve the partitioner api

This are also two major internal improvements that would be good to make:

  1. Clean up the producer logic which is currently quite convoluted
  2. Move the producer to use NIO and a Selector instead of blocking sockets and support multiplexing requests.
    We don't need to do these things all at once, and we may not need to do any of them in 0.8, but it seems good to lay out all the things we want to do here. Some discussion of possible phasing is given below.

Here is some details on each of these above items.

Proposed API

Producer:

trait Producer {

  /* Send one or more messages and get back a list of corresponding responses, one per message.
   * This API is meant to be asynchonous, so it never directly throws exceptions.
   * Instead it returns a list of lazy Future-like response values which contain the result or error.
   */
  def send(message: ProducerMessage*): Seq[ProduceResponse]
}
}

A ProduceMessage is analagous to the ProducerData class we currently have (i.e. just a placeholder for the preserialized data):

 case class ProducerMessage(topic: String, key: K, value: V)

The ProduceResponse is a lazy object that contains the offset and any error that was thrown:

 class ProduceResponse {

  /* Wait for the response to complete and return the result. This method will
   * either return the partition and offset of the corresponding
   * message or else throw an exception (if there was an error)
   */
  def await(): PartitionAndOffset

  /* Check if the request is complete without blocking
   */
  def isComplete(): Boolean

  /* Execute the given callback when the response has arrived. Mutliple callbacks may be specified.
   * Although we guarantee that the callback will execute and that r.await will not block
   * we don't guarantee what thread will execute the callback.
   */
  def whenComplete((r: ProduceResponse) => Unit)

}

Note that currently send() is a blocking calls and so returning a lazy object might seem odd. However the move to NIO would make this untrue. 

Some Limitations

The primary limitation of the proposed API is that it has no way for specifying per-request parameters. We allow our produce request to specify the number of acks, and the timeout, but there is no place to put this. The right way to fix this would be to have send() take a ProducerRequest object that contained the ProducerMessages but also had fields for the various per-request configs. Having request-level object seems like a good future-proof way to have a place to put any future optional parameters we decided we might need to add to the request.

In general I think it is a good design to always have this kind of API have single request and response object to allow this.

The problem with this approach is that the async producer blurs the user's control over what goes into which request. So if the user set different parameters on a few of these request objects and then we had to batch them into a single API request what parameter would we choose? As a result I don't think we have an easy way to model the request-level parameters.

Rationale for NIO

A few people have questioned this, so it seems worthwhile to layout what this buys us.

Currently we have phenomenal throughput when messages are batched together, but our throughput when sending a single message at a time is not great (it's not terrible either, but it could be a lot better).

Previously our send had no acknowledgement which meant that often the send was effectively asynchonous. However now that is no longer the case we always block on responses. This means that if you use the async producer and send a large batch of messages you will effectively do a set of serial requests to each server. 

Blocking I/O is simpler to use in the case where there is only a single connection to a single server (as we originally had), but I don't think it is simpler in the case where there is multiple connections to multiple servers (the current case). Now all our network I/O for both the consumer, the replicas, and producer connects to multiple servers. NIO has been pretty good on the server, and I think we should move to it for the clients too.

Here is a sketch of how this would look in code. Instead of having a single connection that took Send and Receive objects, we would have a multi-connection:

class KafkaSelector {


  /* Add a new connection associated with the given node id */
  def connect(node: Int, host: String, port: Int)


  /* Remove a connection */
  def disconnect(node: Int)


  /* Add some Send objects to be sent to the given node ids. Return any completed Receives 
   * If the given timeout is reached return even if no new receives are ready
   */
  def poll(sends: Seq[(Int, Send)], timeout: Long): Seq[Recieve]
}

This would allow the producer to simultaneously send on all sockets with only a single thread. It would also allow a single consumer thread to effectively poll on many sockets. It also allows the producer to multiplex requests over a single socket (i.e. have many requests in flight before a response is received).

Usage of this code would look something like this pseudo code:

 class NioSyncProducer(selector: KafkaSelector) {
   def send(produces: Seq[ProduceRequest]): Seq[ProduceResponse] {
     val receives = selector.send(produces.map(new ByteBufferSend(_), socketTimeout)
     receives.map(new ProduceResponse(_.buffer)
   }
 }
NioSyncConsumer(selector: KafkaSelector) {
  def poll(): Seq[ByteBufferMessageSet] {


}
  • No labels