THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
Consumer (Properties) { // Incrementally subscribe to a list of topics. // @throws SubscriptionNotValidException if the new subscription is not valid with existing subscriptions. void subscribe(String...) throws SubscriptionNotValidException; // Incrementally subscribe to a specified topic partition. // No group management will be invoked since assigned partitions have been specified. // @throws SubscriptionNotValidException if the new subscription is not valid with existing subscriptions. void subscribe(String, Int); // Incrementally subscribeTry to afetch specifiedmessages topicfrom partitionthe startingtopic/partitions atit thesubscribed givento. offset. // NoReturn groupwhatever managementmessages willare beavailable invokedto sincebe assignedfetched partitionsor haveempty beenafter specified timeout. // @throws@thorws SubscriptionNotValidExceptionInitOffsetNotKnownException if the newstarting subscriptionoffset isfor notthe validassigned withpartitions existingis subscriptionsunknown. void List<Record> subscribepoll(String, Int, Long)Long) throws InitOffsetNotKnownException; // TryCommit tolatest fetchoffsets messagesfor fromall the topic/partitions itcurrently subscribedconsuming tofrom. // If Returnthe whateversync messagesflag areis availableset to be fetched or empty after specified timeouttrue, commit call will be sync and blocking. // @thorwsOtherwise InitOffsetNotKnownException if the startingcommit offsetcall forwill thebe assignedasync partitions is unknownand best-effort. List<Record> void pollcommit(LongBoolean); throws InitOffsetNotKnownException; // Commit the latestspecified offsetsoffset for allthe specified partitions currently consuming from. // If the sync flag is set to true, commit call will be sync and blocking. // Otherwise the commit call will be async and best-effort. void commit(Boolean); // Commit the specified offset for the specified partitions. // If the sync flag is set to true, commit call will be sync and blocking. // Otherwise the commit call will be async and best-effort. // // @thorws InvalidPartitionsToCommitOffsetException if the group management has invoked, sync flag is true and the specified partitions do not belong to assigned ones. void commit(List[(String, Int, Long)], Boolean) throws InvalidPartitionsToCommitOffsetException; // Specify the fetch starting offset for the specified topic/partition. void position(List[(String, Int, Long)]) // Get the currently consuming partitions. // Block wait if the current partitions are not known yet. List. void position(List[(String, Int, Long)] getPartitions(); // Get the last committed offsets of the partitions currently consuming. // Block wait if the current partitions are not known yet. Map[(String, Int), Long] getOffsets() OffsetUnknownException; // Get the last committed offsets of the specified topic/partition. // @throws OffsetUnknownException if not offsets of the specified partitions have ever been committed. Long getOffset(String, Int) throws OffsetUnknownException; // --------- Call-back Below, not part of API ------------ // // Call-back function upon partition de-assignment. // Default implementation will commit offset depending on auto.commit config. void onPartitionDeassigned(List[(String, Int)]); // Call-back function upon partition re-assignment. // Default implementation will fetch starting offset depending on auto.commit config. void onPartitionAssigned(List[(String, Int)]) } |
...
Code Block |
---|
Consumer consumer = new Consumer(props); consumer.subscribe("topic"); long timeout = 1000; while(true) { List<Record> messages = consumer.poll(timeout); process(messages); consumer.commit(true); } |
And the default callback functions can will be implemented as:
Code Block |
---|
void onPartitionDeassigned(partitions) { // ifcommit (kafka.manage.offsets) to Kafka commit(true); else offsets = getOffsets(); servers } void onPartitionAssigned(partitions) { // get offsets of the assigned partitions from Kafka servers, // store offsets } void set the start fetching offset accordingly } |
Example customized callback functions for manually managed offsets could be:
Code Block |
---|
void onPartitionDeassignedonPartitionAssigned(partitions) { offsets = new Map[(String, Int), Long]getOffsets(); if (kafka.manage.offsets) // store offsets = getOffsets() assertEqual(offsets.keyset, partitions) else locally } void onPartitionAssigned(partitions) { // read offsets from local disk, FS, database, etc.. position([partitions, offsets) // ready to start fetching]) } |
Consumer Coordination Protocol Overview
...