Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The Records returned by the poll() function will include metadata such as offset, partition, key and partition-key. This would also help removing the decompression/recompression in mirror maker (details can be found in KAFKA-1011).

 

Here is an exmaple example of using the consumer API:

...

Code Block
void onPartitionDeassigned(partitions) {
  // commit offsets to Kafka servers
  consumer.commit(true);
}

void onPartitionAssigned(partitions) {
  // get offsets of the assigned partitions from Kafka servers,
  //offsets set the start fetching offset accordingly= getOffsets();
  position([partitions, offsets]);
}

 Example

customized callback functions for manually managed offsets could beAnother example of manually manage partition assignment:

Code Block
Consumer consumer = new Consumer(props);
consumer.subscribe("topic-1", 0);
consumer.subscribe("topic-2", 0);
long timeout = 1000;
while(true) {
  List<Record> messages = consumer.poll(timeout);
  for (message in messages) {
    if (message.topic.equals("topic-1")) processOneMessageOfTopicOne(message);
    else processOneMessageOfTopicOne(message);
  }
  consumer.commit(true);
}

 

An example customized callback implementation for manual offset management:

Code Block
void onPartitionDeassigned(partitions) {
  offsets = getOffsets();
  // store offsets in a remote DB
void onPartitionDeassigned(partitions) {
  offsets = getOffsets();
  // store offsets locally
}

void onPartitionAssigned(partitions) {
  // read offsets from the localremote diskDB
  position([partitions, offsets]);
}

...

Consumer Coordination Protocol Overview

...