...
In practice, the ESO is used as the cut-off point for cleaning of these control records.
Public Interfaces
This KIP introduces extensive additions to the public interfaces.
Client API changes
KafkaShareConsumer
This KIP introduces a new interface for consuming records from a share group called org.apache.kafka.clients.consumer.ShareConsumer
with an implementation called org.apache.kafka.clients.consumer.KafkaShareConsumer
. The interface stability is Evolving
.
Administration
Several components work together to create share groups. The group coordinator is responsible for assignment, membership and the state of the group. The share-partition leaders are responsible for delivery and acknowledgement. The following table summarises the administration operations and how they work.
Operation | Supported by | Notes |
---|---|---|
Create share group | Group coordinator | This occurs as a side-effect of a ShareGroupHeartbeat. The group coordinator writes a record to the consumer offsets topic to persist the group's existence. |
List share groups | Group coordinator | |
List share group offsets | Group coordinator and share-partition leaders | |
Describe share group | Group coordinator | |
Alter share group offsets | Share-partition leaders | The share-partition leader makes a durable share-partition state update for each share-partition affected. |
Delete share group offsets | Share-partition leaders | The share-partition leader makes a durable share-partition state update for each share-partition affected. |
Delete share group | Group coordinator working with share-partition leaders | Only empty share groups can be deleted. However, the share-partition leaders need to delete share group offsets, and then delete the share group. It is not an atomic operation. The share-partition leader makes a durable share-partition state update for each share-partition affected. The group coordinator writes a tombstone record to the consumer offsets topic to persist the group deletion. |
Public Interfaces
This KIP introduces extensive additions to the public interfaces.
Client API changes
KafkaShareConsumer
This KIP introduces a new interface for consuming records from a share group called org.apache.kafka.clients.consumer.ShareConsumer
with an implementation called org.apache.kafka.clients.consumer.KafkaShareConsumer
. The interface stability is Evolving
.
Code Block | ||
---|---|---|
| ||
@InterfaceStability.Evolving
public interface ShareConsumer<K, V> | ||
Code Block | ||
| ||
@InterfaceStability.Evolving
public interface ShareConsumer<K, V> {
/**
* Get the current subscription. Will return the same topics used in the most recent call to
* {@link #subscribe(Collection)}, or an empty set if no such call has been made.
*
* @return The set of topics currently subscribed to
*/
Set<String> subscription();
/**
* Subscribe to the given list of topics to get dynamically assigned partitions.
* <b>Topic subscriptions are not incremental. This list will replace the current
* assignment, if there is one.</b> If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
*
* <p>
* As part of group management, the coordinator will keep track of the list of consumers that belong to a particular
* group and will trigger a rebalance operation if any one of the following events are triggered:
* <ul>
* <li>A member joins or leaves the share group
* <li>An existing member of the share group is shut down or fails
* <li>The number of partitions changes for any of the subscribed topics
* <li>A subscribed topic is created or deleted
* </ul>
*
* @param topics The list of topics to subscribe to
*
* @throws IllegalArgumentException If topics is null or contains null or empty elements
* @throws KafkaException for any other unrecoverable errors
*/
void subscribe(Collection<String> topics);
/**
* Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)}.
*
* @throws KafkaException for any other unrecoverable errors
*/
void unsubscribe();
/**
* Fetch data for the topics specified using {@link #subscribe(Collection)}. It is an error to not have
* subscribed to any topics before polling for data.
*
* <p>
* This method returns immediately if there are records available. Otherwise, it will await the passed timeout.
* If the timeout expires, an empty record set will be returned.
*
* @param timeout The maximum time to block (must not be greater than {@link Long#MAX_VALUE} milliseconds)
*
* @return map of topic to records since the last fetch for the subscribed list of topics
*
* @throws AuthenticationException if authentication fails. See the exception for more details
* @throws AuthorizationException if caller lacks Read access to any of the subscribed
* topics or to the configured groupId. See the exception for more details
* @throws InterruptException if the calling thread is interrupted before or while this method is called
* @throws InvalidTopicException if the current subscription contains any invalid
* topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)})
* @throws WakeupException if {@link #wakeup()} is called before or while this method is called
* @throws KafkaException for any other unrecoverable errors (e.g. invalid groupId or
* session timeout, errors deserializing key/value pairs,
* or any new error cases in future versions)
* @throws IllegalArgumentException if the timeout value is negative
* @throws IllegalStateException if the consumer is not subscribed to any topics
* @throws ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds.
*/
ConsumerRecords<K, V> poll(Duration timeout);
/**
* Acknowledge successful delivery of a record returned on the last {@link #poll(Duration)} call.
* The acknowledgement is committed on the next {@link #commitSync()}, {@link #commitAsync()} or
* {@link #poll(Duration)} call.
*
* <p>
* Records for each topic-partition must be acknowledged in the order they were returned from
* {@link #poll(Duration)}. By using this method, the consumer is using
* <b>explicit acknowledgement</b>.
*
* @param record The record to acknowledge
*
* @throws IllegalArgumentException if the record being acknowledged doesn't meet the ordering requirement
* @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
* used implicit acknowledgement
*/
void acknowledge(ConsumerRecord<K, V> record);
/**
* Acknowledge delivery of a record returned on the last {@link #poll(Duration)} call indicating whether
* it was processed successfully. The acknowledgement is committed on the next {@link #commitSync()},
* {@link #commitAsync()} or {@link #poll(Duration)} call. By using this method, the consumer is using
* <b>explicit acknowledgement</b>.
*
* <p>
* Records for each topic-partition must be acknowledged in the order they were returned from
* {@link #poll(Duration)}.
*
* @param record The record to acknowledge
* @param type The acknowledge type which indicates whether it was processed successfully
*
* @throws IllegalArgumentException if the record being acknowledged doesn't meet the ordering requirement
* @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
* used implicit acknowledgement
*/
void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type);
/**
* Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
* the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
* {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
* all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
* <p>
* This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
* encountered (in which case it is thrown to the caller), or the timeout expires.
*
* @return A map of the results for each topic-partition for which delivery was acknowledged.
* If the acknowledgement failed for a topic-partition, an exception is present.
*
* @throws InterruptException If the thread is interrupted while blocked.
* @throws KafkaException for any other unrecoverable errors
*/
Map<TopicIdPartition, Optional<KafkaException>> commitSync();
/**
* Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
* the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
* {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
* all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
* <p>
* This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
* encountered (in which case it is thrown to the caller), or the timeout expires.
*
* @param timeout The maximum amount of time to await completion of the acknowledgement
*
* @return A map of the results for each topic-partition for which delivery was acknowledged.
* If the acknowledgement failed for a topic-partition, an exception is present.
*
* @throws IllegalArgumentException If the {@code timeout} is negative.
* @throws InterruptException If the thread is interrupted while blocked.
* @throws KafkaException for any other unrecoverable errors
*/
Map<TopicIdPartition, Optional<KafkaException>> commitSync(Duration timeout);
/**
* Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
* the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
* {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
* all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
*
* @throws KafkaException for any other unrecoverable errors
*/
void commitAsync();
/**
* Sets the acknowledge commit callback which can be used to handle acknowledgement completion.
*
* @param callback The acknowledge commit callback
*/
void setAcknowledgeCommitCallback(AcknowledgeCommitCallback callback);
/**
* Determines the client's unique client instance ID used for telemetry. This ID is unique to
* this specific client instance and will not change after it is initially generated.
* The ID is useful for correlating client operations with telemetry sent to the broker and
* to its eventual monitoring destinations.
* <p>
* If telemetry is enabled, this will first require a connection to the cluster to generate
* the unique client instance ID. This method waits up to {@code timeout} for the consumer
* client to complete the request.
* <p>
* Client telemetry is controlled by the {@link ConsumerConfig#ENABLE_METRICS_PUSH_CONFIG}
* configuration option.
*
* @param timeout The maximum time to wait for consumer client to determine its client instance ID.
* The value must be non-negative. Specifying a timeout of zero means do not
* wait for the initial request to complete if it hasn't already.
*
* @return The client's assigned instance id used for metrics collection.
*
* @throws IllegalArgumentException If the {@code timeout} is negative.
* @throws IllegalStateException If telemetry is not enabled because config `{@code enable.metrics.push}`
* is set to `{@code false}`.
* @throws InterruptException If the thread is interrupted while blocked.
* @throws KafkaException If an unexpected error occurs while trying to determine the client
* instance ID, though this error does not necessarily imply the
* consumer client is otherwise unusable.
*/
Uuid clientInstanceId(Duration timeout);
/**
* Get the metrics kept by the consumer
*/
Map<MetricName, ? extends Metric> metrics();
/**
* Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
* This will commit acknowledgements if possible within the default timeout.
* See {@link #close(Duration)} for details. Note that {@link #wakeup()} cannot be used to interrupt close.
*
* @throws InterruptException If the thread is interrupted before or while this method is called
* @throws KafkaException for any other error during close
*/
void close();
/**
* Tries to close the consumer cleanly within the specified timeout. This method waits up to
* {@code timeout} for the consumer to complete acknowledgements and leave the group.
* If the consumer is unable to complete acknowledgements and gracefully leave the group
* before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
* used to interrupt close.
*
* @param timeout The maximum time to wait for consumer to close gracefully. The value must be
* non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
*
* @throws IllegalArgumentException If the {@code timeout} is negative.
* @throws InterruptException If the thread is interrupted before or while this method is called
* @throws KafkaException for any other error during close
*/
void close(Duration timeout);
/**
* Wake up the consumer. This method is thread-safe and is useful in particular to abort a long poll.
* The thread which is blocking in an operation will throw {@link WakeupException}.
* If no thread is blocking in a method which can throw {@link WakeupException},
* the next call to such a method will raise it instead.
*/
void wakeup();
}
|
...
Method signature | Description |
---|---|
void onComplete(Map<TopicIdPartition, Set<OffsetAndMetadata>> offsets, Exception exception) | A callback method the user can implement to provide asynchronous handling of request completion. Parameters: offsets - A map of the offsets that this callback applies to. exception - The exception thrown during processing of the request, or null if the acknowledgement completed successfully. Exceptions: WakeupException - if KafkaShareConsumer.wakeup() is called. InterruptException - if the calling thread is interrupted. AuthorizationException - if not authorized to the topic or group. KafkaException - for any other unrecoverable errors. |
ConsumerRecord
Add the following method on the org.apache.kafka.client.consumer.ConsumerRecord
class.
Method signature | Description |
---|---|
| Get the delivery count for the record if available. |
The delivery count is available for records delivered using a share group and Optional.empty()
otherwise.
A new constructor is also added:
Code Block | ||
---|---|---|
| ||
/**
* Creates a record to be received from a specified topic and partition
*
* @param topic The topic this record is received from
* @param partition The partition of the topic this record is received from
* @param offset The offset of this record in the corresponding Kafka partition
* @param timestamp The timestamp of the record.
* @param timestampType The timestamp type
* @param serializedKeySize The length of the serialized key
* @param serializedValueSize The length of the serialized value
* @param key The key of the record, if one exists (null is allowed)
* @param value The record contents
* @param headers The headers of the record
* @param leaderEpoch Optional leader epoch of the record (may be empty for legacy record formats)
* @param deliveryCount Optional delivery count of the record (may be empty when deliveries not counted)
*/
public ConsumerRecord(String topic,
int partition,
long offset,
long timestamp,
TimestampType timestampType,
int serializedKeySize,
int serializedValueSize,
K key,
V value,
Headers headers,
Optional<Integer> leaderEpoch,
Optional<Short> deliveryCount) |
AcknowledgeType
The new org.apache.kafka.clients.consumer.AcknowledgeType
enum distinguishes between the types of acknowledgement for a record consumer using a share group.
...