THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
/**
* Coordinator interface implements group management for a single group member by interacting with
* a designated Kafka broker (the coordinator). Group semantics are provided by extending this class.
* See {@link ConsumerCoordinator} for example usage.
*
* From a high level, Kafka's group management protocol consists of the following sequence of actions:
*
* <ol>
* <li>Group Registration: Group members register with the coordinator providing their own metadata
* (such as the set of topics they are interested in).</li>
* <li>Group/Leader Selection: The coordinator select the members of the group and chooses one member
* as the leader.</li>
* <li>State Assignment: The leader collects the metadata from all the members of the group and
* assigns state.</li>
* <li>Group Stabilization: Each member receives the state assigned by the leader and begins
* processing.</li>
* </ol>
*
* To leverage this protocol, an implementation must define the format of metadata provided by each
* member for group registration in {@link #metadata()} and the format of the state assignment provided
* by the leader in {@link #performAssignment(String, String, List)} and becomes available to members in
* {@link #onJoinComplete(int, String, String, ByteBuffer)}.
*
* Note on locking: this class shares state between the caller and a background thread which is
* used for sending heartbeats after the client has joined the group. All mutable state as well as
* state transitions are protected with the class's monitor. Generally this means acquiring the lock
* before reading or writing the state of the group (e.g. generation, memberId) and holding the lock
* when sending a request that affects the state of the group (e.g. JoinGroup, LeaveGroup).
*/
public interface Coordinator extends Closeable {
/**
* Unique identifier for the class of supported protocols (e.g. "consumer" or "connect").
* @return Non-null protocol type name
*/
String protocolType();
/**
* Get the current list of protocols and their associated metadata supported
* by the local member. The order of the protocols in the map indicates the preference
* of the protocol (the first entry is the most preferred). The coordinator takes this
* preference into account when selecting the generation protocol (generally more preferred
* protocols will be selected as long as all members support them and there is no disagreement
* on the preference).
* @return Non-empty map of supported protocols and metadata
*/
LinkedHashMap<String, ByteBuffer> metadata();
/**
* Invoked prior to each group join or rejoin. This is typically used to perform any
* cleanup from the previous generation (such as committing offsets for the consumer)
* @param generation The previous generation or -1 if there was none
* @param memberId The identifier of this member in the previous group or "" if there was none
*/
void onJoinPrepare(int generation, String memberId);
/**
* Perform assignment for the group. This is used by the leader to push state to all the members
* of the group (e.g. to push partition assignments in the case of the new consumer)
* @param leaderId The id of the leader (which is this member)
* @param protocol The protocol selected by the coordinator
* @param allMemberMetadata Metadata from all members of the group
* @return A map from each member to their state assignment
*/
Map<String, ByteBuffer> performAssignment(String leaderId,
String protocol,
List<AssignmentMetadata> allMemberMetadata);
/**
* Invoked when a group member has successfully joined a group.
*
* @param generation The generation that was joined
* @param memberId The identifier for the local member in the group
* @param protocol The protocol selected by the coordinator
* @param memberAssignment The assignment propagated from the group leader
*/
void onJoinComplete(int generation,
String memberId,
String protocol,
ByteBuffer memberAssignment);
/**
* Invoked prior to each leave group event. This is typically used to cleanup assigned partitions;
* note it is triggered by the consumer's API caller thread (i.e. background heartbeat thread would
* not trigger it even if it tries to force leaving group upon heartbeat session expiration)
*/
default void onLeavePrepare() {}
/**
* Wraps {@link JoinGroupResponseMember} responses.
*/
class AssignmentMetadata {
private JoinGroupResponseData.JoinGroupResponseMember delegate;
public AssignmentMetadata() {
this.delegate = new JoinGroupResponseMember();
}
public String memberId() {
return delegate.memberId();
}
public String groupInstanceId() {
return delegate.groupInstanceId();
}
public ByteBuffer metadata() {
return ByteBuffer.wrap(delegate.metadata());
}
public AssignmentMetadata setMemberId(String memberId) {
delegate.setMemberId(memberId);
return this;
}
public AssignmentMetadata setGroupInstanceId(String groupInstanceId) {
delegate.setGroupInstanceId(groupInstanceId);
return this;
}
public AssignmentMetadata setMetadata(byte[] metadata) {
delegate.setMetadata(metadata);
return this;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AssignmentMetadata that = (AssignmentMetadata) o;
return Objects.equals(delegate, that.delegate);
}
@Override
public int hashCode() {
return delegate.hashCode();
}
@Override
public String toString() {
return "AssignmentMetadata("
+ "memberId=" + ((delegate.memberId() == null) ? "null" : "'" + delegate.memberId() + "'")
+ ", groupInstanceId=" + ((delegate.groupInstanceId() == null) ? "null" : "'" + delegate.groupInstanceId() + "'")
+ ", metadata=" + Arrays.toString(delegate.metadata())
+ ")";
}
}
} |
...