Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
Consumer (Properties) {

  // Incrementally subscribe to a list of topics.
  // @throws SubscriptionNotValidException if the new subscription is not valid with existing subscriptions.
  void subscribe(String...) throws SubscriptionNotValidException;

  // Incrementally subscribe to a specified topic partition.
  // No group management will be invoked since assigned partitions have been specified.
  // @throws SubscriptionNotValidException if the new subscription is not valid with existing subscriptions.
  void subscribe(String, Int);

  // Incrementally subscribeTry to afetch specifiedmessages topicfrom partitionthe startingtopic/partitions atit thesubscribed givento. offset.
  // NoReturn groupwhatever managementmessages willare beavailable invokedto sincebe assignedfetched partitionsor haveempty beenafter specified timeout.
  // @throws@thorws SubscriptionNotValidExceptionInitOffsetNotKnownException if the newstarting subscriptionoffset isfor notthe validassigned withpartitions existingis subscriptionsunknown.
  void List<Record> subscribepoll(String, Int, Long)Long) throws InitOffsetNotKnownException;

  // TryCommit tolatest fetchoffsets messagesfor fromall the topic/partitions itcurrently subscribedconsuming tofrom. 
  // If Returnthe whateversync messagesflag areis availableset to be fetched or empty after specified timeouttrue, commit call will be sync and blocking.
  // @thorwsOtherwise InitOffsetNotKnownException if the startingcommit offsetcall forwill thebe assignedasync partitions is unknownand best-effort.
  List<Record> void pollcommit(LongBoolean); throws InitOffsetNotKnownException;

  // Commit the latestspecified offsetsoffset for allthe specified partitions currently consuming from.
  // If the sync flag is set to true, commit call will be sync and blocking.
  // Otherwise the commit call will be async and best-effort.
  void commit(Boolean); 

  // Commit the specified offset for the specified partitions.
  // If the sync flag is set to true, commit call will be sync and blocking.
  // Otherwise the commit call will be async and best-effort.
  //  // @thorws InvalidPartitionsToCommitOffsetException if the group management has invoked, sync flag is true and the specified partitions do not belong to assigned ones.
  void commit(List[(String, Int, Long)], Boolean) throws InvalidPartitionsToCommitOffsetException;

  // Specify the fetch starting offset for the specified topic/partition.
  void position(List[(String, Int, Long)])

  // Get the currently consuming partitions.
  // Block wait if the current partitions are not known yet.
  List.
  void position(List[(String, Int, Long)] getPartitions();

  // Get the last committed offsets of the partitions currently consuming.
  // Block wait if the current partitions are not known yet.
  Map[(String, Int), Long] getOffsets() OffsetUnknownException;

  // Get the last committed offsets of the specified topic/partition.
  // @throws OffsetUnknownException if not offsets of the specified partitions have ever been committed.
  Long getOffset(String, Int) throws OffsetUnknownException;
  
  // --------- Call-back Below, not part of API ------------ //

  // Call-back function upon partition de-assignment.
  // Default implementation will commit offset depending on auto.commit config.
  void onPartitionDeassigned(List[(String, Int)]);

  // Call-back function upon partition re-assignment.
  // Default implementation will fetch starting offset depending on auto.commit config.
  void onPartitionAssigned(List[(String, Int)])
}

...

Code Block
Consumer consumer = new Consumer(props);
consumer.subscribe("topic");
long timeout = 1000;
while(true) {
  List<Record> messages = consumer.poll(timeout);
  process(messages);
  consumer.commit(true);
}

 

And the default callback functions can will be implemented as:

Code Block
void onPartitionDeassigned(partitions) {
  // ifcommit (kafka.manage.offsets)
 to Kafka  commit(true);
  else
    offsets = getOffsets();
  servers
}

void onPartitionAssigned(partitions) {
  // get offsets of the assigned partitions from Kafka servers,
  // store offsets
}

void  set the start fetching offset accordingly
}

 

Example customized callback functions for manually managed offsets could be:

Code Block
void onPartitionDeassignedonPartitionAssigned(partitions) {
  offsets = new Map[(String, Int), Long]getOffsets();
  if (kafka.manage.offsets)
   // store offsets = getOffsets()
    assertEqual(offsets.keyset, partitions)
  else
  locally
}

void onPartitionAssigned(partitions) {
  // read offsets from local disk, FS, database, etc..

  position([partitions, offsets) // ready to start fetching])
}

 

Consumer Coordination Protocol Overview

...