Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
 class ProduceResponse {

  /* Wait for the response to complete and return the result. This method will
   * either return the partition and offset of the corresponding
   * message or else throw an exception (if there was an error).
   * If the timeout is hit with no response available, it will throw a TimeoutException.
   */
  def await(timeout: Long = 0): PartitionAndOffset

  /* Check if the request is complete without blocking
   */
  def isComplete(): Boolean

  /* Execute the given callback when the response has arrived. Mutliple callbacks may be specified.
   * Although we guarantee that the callback will execute and that r.await will not block
   * we don't guarantee what thread will execute the callback.
   */
  def whenComplete((r: ProduceResponse) => Unit)

}

...

Code Block
 val request = new ProducerRequest()
                 .addMessage("topic_1", "my_key1", "my_value1")
                 .addMessage("topic_2", "my_key2", "my_value2")
                 .timeout(1000)     // timeout after 1 second
                 .awaitCommitted()  // i.e. ISR = -1, would also support await(n) to give an explicit number of brokers
                 .compression(NoCompressionCodec)
 val responses = client.send(request)
 for(response <- responses)
   println(response.waitFor)

In general I think it is a good design to always have this kind of API because you can easily evolve it by adding more optional parameters. With our current API in order to add a new parameter we would need to make a new method with more parameters.

...