Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public class FindCoordinatorRequest extends AbstractRequest {
    private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key";
    private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type";
	private static final String COORDINATOR_GROUPIDS_KEY_NAME = "coordinator_groupIds"


	private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(GROUP_ID);

	private static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema(
        new Field("coordinator_key", STRING, "Id to use for finding the coordinator (for groups, this is the groupId, " +
                        "for transactional producers, this is the transactional id)"),
        new Field("coordinator_type", INT8, "The type of coordinator to find (0 = group, 1 = transaction)"));


	/**
 	* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
 	*/
	private static final Schema FIND_COORDINATOR_REQUEST_V2 = FIND_COORDINATOR_REQUEST_V1;


	private static final Schema FIND_COORDINATOR_REQUEST_V3 = new Schema(
	new Field(COORDINATOR_GROUPIDS_KEY_NAME, new ArrayOf(DESCRIBE_GROUPS_RESPONSE_MEMBER_V0), "All the coordinator ids " +
        "for this request"));

	...


	private final Map<String, String> groupIds; // Use to store <coordinator(coordinator_key, coordinator_type>type) pair


	...

FindCoordinatorResponse

...

Code Block
languagejava
public class FindCoordinatorResponse extends AbstractResponse {
    private static final String COORDINATOR_KEY_NAME = "coordinator";

    // coordinator level field names
    private static final String NODE_ID_KEY_NAME = "node_id";
    private static final String HOST_KEY_NAME = "host";
    private static final String PORT_KEY_NAME = "port";

    private static final Schema FIND_COORDINATOR_BROKER_V0 = new Schema(
            new Field(NODE_ID_KEY_NAME, INT32, "The broker id."),
            new Field(HOST_KEY_NAME, STRING, "The hostname of the broker."),
            new Field(PORT_KEY_NAME, INT32, "The port on which the broker accepts requests."));

    private static final Schema FIND_COORDINATOR_RESPONSE_V0 = new Schema(
            ERROR_CODE,
            new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator " +
                    "for a consumer group."));

    private static final Schema FIND_COORDINATOR_RESPONSE_V1 = new Schema(
            THROTTLE_TIME_MS,
            ERROR_CODE,
            ERROR_MESSAGE,
            new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator"));


    private static final Schema FIND_COORDINATOR_RESPONSE_V3 = new Schema(
            THROTTLE_TIME_MS,
			new ArrayOf(
				new Schema(
					GROUP_ID,
        			ERROR_CODE,
      				ERROR_MESSAGE,
        			new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator"));));

    /**
     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
     */
    private static final Schema FIND_COORDINATOR_RESPONSE_V2 = FIND_COORDINATOR_RESPONSE_V1;

    public static Schema[] schemaVersions() {
        return new Schema[] {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1, FIND_COORDINATOR_RESPONSE_V2, FIND_COORDINATOR_RESPONSE_V3};
    }


	...


	private final int throttleTimeMs;
	private final String errorMessage;
	private final Errors error;
	private final Map<String, Node> nodes; //store (groupID, Node)


	...


handleFindCoordinatorRequest



Compatibility, Deprecation, and Migration Plan

...