THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.kafka.clients.consumer; import java.io.Closeable; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; /** * Coordinator interface implements group management for a single group member by interacting with * a designated Kafka broker (the coordinator). Group semantics are provided by extending this class. * See {@link ConsumerCoordinator} for example usage. * * From a high level, Kafka's group management protocol consists of the following sequence of actions: * * <ol> * <li>Group Registration: Group members register with the coordinator providing their own metadata * (such as the set of topics they are interested in).</li> * <li>Group/Leader Selection: The coordinator select the members of the group and chooses one member * as the leader.</li> * <li>State Assignment: The leader collects the metadata from all the members of the group and * assigns state.</li> * <li>Group Stabilization: Each member receives the state assigned by the leader and begins * processing.</li> * </ol> * * To leverage this protocol, an implementation must define the format of metadata provided by each * member for group registration in {@link #metadata()} and the format of the state assignment provided * by the leader in {@link #performAssignment(String, String, List)} and becomes available to members in * {@link #onJoinComplete(int, String, String, ByteBuffer)}. * * Note on locking: this class shares state between the caller and a background thread which is * used for sending heartbeats after the client has joined the group. All mutable state as well as * state transitions are protected with the class's monitor. Generally this means acquiring the lock * before reading or writing the state of the group (e.g. generation, memberId) and holding the lock * when sending a request that affects the state of the group (e.g. JoinGroup, LeaveGroup). */ public interface Coordinator extends Closeable { /** * Unique identifier for the class of supported protocols (e.g. "consumer" or "connect"). * @return Non-null protocol type name */ String protocolType(); /** * Get the current list of protocols and their associated metadata supported * by the local member. The order of the protocols in the listmap indicates the preference * of the protocol (the first entry is the most preferred). The coordinator takes this * preference into account when selecting the generation protocol (generally more preferred * protocols will be selected as long as all members support them and there is no disagreement * on the preference). * @return Non-empty map of supported protocols and metadata */ LinkedHashMap<String, JoinGroupRequestData.JoinGroupRequestProtocolCollectionByteBuffer> metadata(); /** * Invoked prior to each group join or rejoin. This is typically used to perform any * cleanup from the previous generation (such as committing offsets for the consumer) * @param generation The previous generation or -1 if there was none * @param memberId The identifier of this member in the previous group or "" if there was none */ void onJoinPrepare(int generation, String memberId); /** * Perform assignment for the group. This is used by the leader to push state to all the members * of the group (e.g. to push partition assignments in the case of the new consumer) * @param leaderId The id of the leader (which is this member) * @param protocol The protocol selected by the coordinator * @param allMemberMetadata Metadata from all members of the group * @return A map from each member to their state assignment */ Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseData.JoinGroupResponseMember> List<AssignmentMetadata> allMemberMetadata); /** * Invoked when a group member has successfully joined a group. If this call fails with an exception, * * then@param itgeneration willThe begeneration retried using the same assignment state on the next call to {@link #ensureActiveGroup()}. * * @param generation The generation that that was joined * @param memberId The identifier for the local member in the group * @param protocol The protocol selected by the coordinator * @param memberAssignment The assignment propagated from the group leader */ void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment); /** * Invoked prior to each leave group event. This is typically used to cleanup assigned partitions; * note it is triggered by the consumer's API caller thread (i.e. background heartbeat thread would * not trigger it even if it tries to force leaving group upon heartbeat session expiration) */ default void onLeavePrepare() {} /** * Wraps {@link JoinGroupResponseMember} responses. */ class AssignmentMetadata { private JoinGroupResponseData.JoinGroupResponseMember delegate; public AssignmentMetadata() { this.delegate = new JoinGroupResponseMember(); } public String memberId() { return delegate.memberId(); } public String groupInstanceId() { return delegate.groupInstanceId(); } public ByteBuffer metadata() { return ByteBuffer.wrap(delegate.metadata()); } public AssignmentMetadata setMemberId(String memberId) { delegate.setMemberId(memberId); return this; } public AssignmentMetadata setGroupInstanceId(String groupInstanceId) { delegate.setGroupInstanceId(groupInstanceId); return this; } public AssignmentMetadata setMetadata(byte[] metadata) { delegate.setMetadata(metadata); return this; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } AssignmentMetadata that = (AssignmentMetadata) o; return Objects.equals(delegate, that.delegate); } @Override public int hashCode() { return delegate.hashCode(); } @Override public String toString() { return "AssignmentMetadata(" + "memberId=" + ((delegate.memberId() == null) ? "null" : "'" + delegate.memberId() + "'") + ", groupInstanceId=" + ((delegate.groupInstanceId() == null) ? "null" : "'" + delegate.groupInstanceId() + "'") + ", metadata=" + Arrays.toString(delegate.metadata()) + ")"; } } } |
It also moves several classes from the org.apache.kafka.consumer.clients.consumer.internals package to the org.apache.kafka.consumer.clients.consumer package, as detailed on the Proposed Changes section below.
...