THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
class ProduceResponse { /* Wait for the response to complete and return the result. This method will * either return the partition and offset of the corresponding * message or else throw an exception (if there was an error). * If the timeout is hit with no response available, it will throw a TimeoutException. */ def await(timeout: Long = 0): PartitionAndOffset /* Check if the request is complete without blocking */ def isComplete(): Boolean /* Execute the given callback when the response has arrived. Mutliple callbacks may be specified. * Although we guarantee that the callback will execute and that r.await will not block * we don't guarantee what thread will execute the callback. */ def whenComplete((r: ProduceResponse) => Unit) } |
...
Code Block |
---|
val request = new ProducerRequest() .addMessage("topic_1", "my_key1", "my_value1") .addMessage("topic_2", "my_key2", "my_value2") .timeout(1000) // timeout after 1 second .awaitCommitted() // i.e. ISR = -1, would also support await(n) to give an explicit number of brokers .compression(NoCompressionCodec) val responses = client.send(request) for(response <- responses) println(response.waitFor) |
In general I think it is a good design to always have this kind of API because you can easily evolve it by adding more optional parameters. With our current API in order to add a new parameter we would need to make a new method with more parameters.
...