...
The Records returned by the poll() function will include metadata such as offset, partition, key and partition-key. This would also help removing the decompression/recompression in mirror maker (details can be found in KAFKA-1011).
Here is an exmaple example of using the consumer API:
...
Code Block |
---|
void onPartitionDeassigned(partitions) { // commit offsets to Kafka servers consumer.commit(true); } void onPartitionAssigned(partitions) { // get offsets of the assigned partitions from Kafka servers, //offsets set the start fetching offset accordingly= getOffsets(); position([partitions, offsets]); } |
Example
customized callback functions for manually managed offsets could beAnother example of manually manage partition assignment:
Code Block |
---|
Consumer consumer = new Consumer(props);
consumer.subscribe("topic-1", 0);
consumer.subscribe("topic-2", 0);
long timeout = 1000;
while(true) {
List<Record> messages = consumer.poll(timeout);
for (message in messages) {
if (message.topic.equals("topic-1")) processOneMessageOfTopicOne(message);
else processOneMessageOfTopicOne(message);
}
consumer.commit(true);
} |
An example customized callback implementation for manual offset management:
Code Block |
---|
void onPartitionDeassigned(partitions) { offsets = getOffsets(); // store offsets in a remote DB void onPartitionDeassigned(partitions) { offsets = getOffsets(); // store offsets locally } void onPartitionAssigned(partitions) { // read offsets from the localremote diskDB position([partitions, offsets]); } |
...
Consumer Coordination Protocol Overview
...