Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
 while(true) {  
  List<MessageAndMetadata> messages = consumer.poll(timeout);
  process(messages);
  if(rewind_required) {
     List<PartitionOffset> partitionOffsets = new ArrayList<PartitionOffset>();
     partitionOffsets.add(new PartitionOffset(topic, partition, offset));
     rewind_offsets(partitionOffsets);     
  }  
}
  • the consumer stops fetching data, sends a RewindConsumer request to the controller and awaits a

...

  • RewindConsumerResponse
  • on receiving a RewindConsumer request, the controller sends an error code in the HeartbeatResponse to the current owners of the rewound

...

  • partitions
  • on receiving an error code in the HeartbeatResponse, the affected consumers stop fetching, commit offsets and send the RegisterConsumer request to the

...

  • controller
  • on receiving a RegisterConsumer request from the affected members of the group, the controller records the new offsets for the specified partitions and sends the new offsets to the respective consumers

...

  • on receiving a RegisterConsumerResponse, the consumers start fetching data from the specified offsets

Low-level RPC API

TBD we need to design an underlying client network abstraction that can be used to send requests. This should ideally handle metadata and addressing by broker id, multiplex over multiple connections, and support both blocking and non-blocking operations.

...