THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Public Interfaces
This KIP will add includes the following code changes:
Add a new Coordinator interface to
...
org.apache.kafka.consumer.clients.consumer
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.clients.consumer; import java.io.Closeable; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.OffsetCommitRequest; /** * Coordinator handles group management for a single group member by interacting with a * designated Kafka broker (the coordinator). Group semantics are provided by extending the * {@link org.apache.kafka.clients.consumer.internals.AbstractCoordinator} class. * See {@link ConsumerCoordinator} for example usage. * <p> * From a high level, Kafka's group management protocol consists of the following sequence of * actions: * * <ol> * <li>Group Registration: Group members register with the coordinator providing their own metadata * (such as the set of topics they are interested in).</li> * <li>Group/Leader Selection: The coordinator select the members of the group and chooses one member * as the leader.</li> * <li>State Assignment: The leader collects the metadata from all the members of the group and * assigns state.</li> * <li>Group Stabilization: Each member receives the state assigned by the leader and begins * processing.</li> * </ol> * <p> * To leverage this protocol, an implementation must define the format of metadata provided by each * member for group registration in {@link #metadata()} and the format of the state assignment provided * by the leader in {@link #performAssignment(String, String, List)} and becomes available to members in * {@link #onJoinComplete(int, String, String, ByteBuffer)}. * <p> * Note on locking: this class shares state between the caller and a background thread which is * used for sending heartbeats after the client has joined the group. All mutable state as well as * state transitions are protected with the class's monitor. Generally this means acquiring the lock * before reading or writing the state of the group (e.g. generation, memberId) and holding the lock * when sending a request that affects the state of the group (e.g. JoinGroup, LeaveGroup). */ public interface Coordinator extends Closeable { /** * Unique identifier for the class of supported protocols (e.g. "consumer" or "connect"). * @return Non-null protocol type name */ String protocolType(); /** * Get the current list of protocols and their associated metadata supported * by the local member. The order of the protocols in the list indicates the preference * of the protocol (the first entry is the most preferred). The coordinator takes this * preference into account when selecting the generation protocol (generally more preferred * protocols will be selected as long as all members support them and there is no disagreement * on the preference). * @return Non-empty map of supported protocols and metadata/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.kafka.clients.consumer; import java.io.Closeable; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; /** * Coordinator interface implements group management for a single group member by interacting with * a designated Kafka broker (the coordinator). Group semantics are provided by extending this class. * See {@link ConsumerCoordinator} for example usage. * * From a high level, Kafka's group management protocol consists of the following sequence of actions: * * <ol> * <li>Group Registration: Group members register with the coordinator providing their own metadata * (such as the set of topics they are interested in).</li> * <li>Group/Leader Selection: The coordinator select the members of the group and chooses one member * as the leader.</li> * <li>State Assignment: The leader collects the metadata from all the members of the group and * assigns state.</li> * <li>Group Stabilization: Each member receives the state assigned by the leader and begins * processing.</li> * </ol> * * To leverage this protocol, an implementation must define the format of metadata provided by each * member for group registration in {@link #metadata()} and the format of the state assignment provided * by the leader in {@link #performAssignment(String, String, List)} and becomes available to members in * {@link #onJoinComplete(int, String, String, ByteBuffer)}. * * Note on locking: this class shares state between the caller and a background thread which is * used for sending heartbeats after the client has joined the group. All mutable state as well as * state transitions are protected with the class's monitor. Generally this means acquiring the lock * before reading or writing the state of the group (e.g. generation, memberId) and holding the lock * when sending a request that affects the state of the group (e.g. JoinGroup, LeaveGroup). */ public interface Coordinator extends Closeable { /** * Unique identifier for the class of supported protocols (e.g. "consumer" or "connect"). * @return Non-null protocol type name */ JoinGroupRequestData.JoinGroupRequestProtocolCollectionString metadataprotocolType(); /** * InvokedGet priorthe tocurrent eachlist groupof joinprotocols or rejoin. This is typically used to perform any * cleanup from the previous generation (such as committing offsets for the consumer)and their associated metadata supported * by the local member. The order of the protocols in the list indicates the preference * @paramof generationthe Theprotocol previous(the generationfirst orentry -1is ifthe theremost was none * @param memberId The identifier of this member in the previous group or "" if there was nonepreferred). The coordinator takes this * preference into account when selecting the generation protocol (generally more preferred */ protocols will be selected void onJoinPrepare(int generation, String memberId); /**as long as all members support them and there is no disagreement * Perform assignment foron the grouppreference). This is used by the* leader@return toNon-empty pushmap stateof tosupported allprotocols theand membersmetadata */ of the group (eJoinGroupRequestData.g. to push partition assignments in the case of the new consumer) * @param leaderId The id of the leader (which is this member) * @param protocol The protocol selected by the coordinatorJoinGroupRequestProtocolCollection metadata(); /** * Invoked prior to each group join or rejoin. This is typically used to perform any * cleanup from the previous generation (such as committing offsets for the consumer) * @param generation The allMemberMetadataprevious Metadatageneration fromor all-1 membersif ofthere thewas groupnone * @return@param AmemberId mapThe fromidentifier of eachthis member to their state assignment in the previous group or "" if there was none */ Map<String, ByteBuffer> performAssignment(String leaderId, void onJoinPrepare(int generation, String memberId); /** * Perform assignment for the group. This is used by the leader to push state to all the members * of the group (e.g. to push partition assignments in the case of the new consumer) String protocol, * @param leaderId The id of the leader (which is this member) * @param protocol The protocol selected by the coordinator * @param allMemberMetadata Metadata from all members of the group * @return A map from each member to their state List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata); assignment */** Map<String, *ByteBuffer> Invoked when a group member has successfully joined a group. If this call fails with an exception, performAssignment(String leaderId, * then it will be retried using the same assignment state on the next call to {@link #ensureActiveGroup()}. * * @param generation The generation that was joined * @param memberId The identifier for theString localprotocol, member in the group * @param protocol The protocol selected by the coordinator * @param memberAssignment The assignment propagated from the group leader */ void onJoinComplete(int generation, List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata); String memberId,/** * Invoked when a group member has successfully joined a group. If this call fails with an exception, * Stringthen protocol, it will be retried using the same assignment state on the next call to {@link #ensureActiveGroup()}. * * @param generation ByteBuffer memberAssignment); /**The generation that was joined * Invoked@param priormemberId toThe eachidentifier leavefor groupthe event.local Thismember isin typically used to cleanup assigned partitions;the group * @param noteprotocol itThe isprotocol triggeredselected by the consumer'scoordinator API caller thread (i.e. background* heartbeat@param threadmemberAssignment would The assignment propagated from the *group leader not trigger it even if*/ it tries to forcevoid leaving group upon heartbeat session expiration) onJoinComplete(int generation, */ default void onLeavePrepare() {} /** * Ensure that the groupString ismemberId, active (i.e. joined and synced) */ void ensureActiveGroup(); /** * Get the current generationString stateprotocol, regardless of whether it is currently stable. * Note that the generation information can be updated while we are still in the middle ByteBuffer memberAssignment); * of a rebalance, after the join-group response is received. * * @return the current generation/** * Invoked prior to each leave group event. This is typically used to cleanup assigned partitions; */ note it is triggered Generation generation(); /**by the consumer's API caller thread (i.e. background heartbeat thread would * Get the current generation state if the not trigger it even if it tries to force leaving group isupon stable,heartbeat otherwise return nullsession expiration) */ default * @return the current generation or null */ Generation generationIfStable(); String memberId(); class Generation { public static final Generation NO_GENERATION = new Generation( OffsetCommitRequest.DEFAULT_GENERATION_ID, JoinGroupRequest.UNKNOWN_MEMBER_ID, null); public final int generationId; public final String memberId; public final String protocolName; public Generation(int generationId, String memberId, String protocolName) { this.generationId = generationId; this.memberId = memberId; this.protocolName = protocolName; } /** * @return true if this generation has a valid member id, false otherwise. A member might have an id before * it becomes part of a group generation. */ public boolean hasMemberId() { return !memberId.isEmpty(); } @Override public boolean equals(final Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final Generation that = (Generation) o; return generationId == that.generationId && Objects.equals(memberId, that.memberId) && Objects.equals(protocolName, that.protocolName); } @Override public int hashCode() { return Objects.hash(generationId, memberId, protocolName); } @Override public String toString() { return "Generation{" + "generationId=" + generationId + ", memberId='" + memberId + '\'' + ", protocol='" + protocolName + '\'' + '}'; } } } |
Proposed Changes
This KIP proposes to:
void onLeavePrepare() {}
} |
Proposed Changes
This KIP proposes to:
- Pull the abstract methods from the AbstractCoordinator class into a new interface, which will be part of Kafka's public API.
- Change the visibility (from protected to public) of methods that have been added to this interface when needed.
- Move these classes from the org.apache.clients.consumer.internals package to org.apache.clients.consumer
- Pull the abstract methods from the AbstractCoordinator class into a new interface, which will be part of Kafka's public API.
- Change the visibility (from protected to public) of methods that have been added to this interface when needed
- This KIP does not include any changes to the Admin APIs. Potential changes to the Admin/KafkaAdminClient classes (such as adding methods to query for group metadata from brokers) will be addressed in a separate KIP.
...
- As all members of the new interface are public, clients who extend the existing AbstractCoordinator class will potentially have to change the visibility of the overridden abstract methods.
- Also, as classes are being relocated, the package names of clients extending AbstractCoordinator will have to change accordingly.
- No functional changes are planned as part of this KIP, so these changes won't impact the vast majority of clients.
...