THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
public class FindCoordinatorRequest extends AbstractRequest { private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key"; private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type"; private static final String COORDINATOR_GROUPIDS_KEY_NAME = "coordinator_groupIds" private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(GROUP_ID); private static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema( new Field("coordinator_key", STRING, "Id to use for finding the coordinator (for groups, this is the groupId, " + "for transactional producers, this is the transactional id)"), new Field("coordinator_type", INT8, "The type of coordinator to find (0 = group, 1 = transaction)")); /** * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. */ private static final Schema FIND_COORDINATOR_REQUEST_V2 = FIND_COORDINATOR_REQUEST_V1; private static final Schema FIND_COORDINATOR_REQUEST_V3 = new Schema( new Field(COORDINATOR_GROUPIDS_KEY_NAME, new ArrayOf(DESCRIBE_GROUPS_RESPONSE_MEMBER_V0), "All the coordinator ids " + "for this request")); ... private final Map<String, String> groupIds; // Use to store <coordinator(coordinator_key, coordinator_type>type) pair ... |
FindCoordinatorResponse
...
Code Block | ||
---|---|---|
| ||
public class FindCoordinatorResponse extends AbstractResponse {
private static final String COORDINATOR_KEY_NAME = "coordinator";
// coordinator level field names
private static final String NODE_ID_KEY_NAME = "node_id";
private static final String HOST_KEY_NAME = "host";
private static final String PORT_KEY_NAME = "port";
private static final Schema FIND_COORDINATOR_BROKER_V0 = new Schema(
new Field(NODE_ID_KEY_NAME, INT32, "The broker id."),
new Field(HOST_KEY_NAME, STRING, "The hostname of the broker."),
new Field(PORT_KEY_NAME, INT32, "The port on which the broker accepts requests."));
private static final Schema FIND_COORDINATOR_RESPONSE_V0 = new Schema(
ERROR_CODE,
new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator " +
"for a consumer group."));
private static final Schema FIND_COORDINATOR_RESPONSE_V1 = new Schema(
THROTTLE_TIME_MS,
ERROR_CODE,
ERROR_MESSAGE,
new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator"));
private static final Schema FIND_COORDINATOR_RESPONSE_V3 = new Schema(
THROTTLE_TIME_MS,
new ArrayOf(
new Schema(
GROUP_ID,
ERROR_CODE,
ERROR_MESSAGE,
new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator"));));
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema FIND_COORDINATOR_RESPONSE_V2 = FIND_COORDINATOR_RESPONSE_V1;
public static Schema[] schemaVersions() {
return new Schema[] {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1, FIND_COORDINATOR_RESPONSE_V2, FIND_COORDINATOR_RESPONSE_V3};
}
...
private final int throttleTimeMs;
private final String errorMessage;
private final Errors error;
private final Map<String, Node> nodes; //store (groupID, Node)
... |
handleFindCoordinatorRequest
Compatibility, Deprecation, and Migration Plan
...