Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces

This KIP will add includes the following code changes:

Add a new Coordinator interface to

...

org.apache.kafka.consumer.clients.consumer

...


Code Block
languagejava
titleCoordinator.java
package org.apache.kafka.clients.consumer;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;

/**
 * Coordinator handles group management for a single group member by interacting with a
 * designated Kafka broker (the coordinator). Group semantics are provided by extending the
 * {@link org.apache.kafka.clients.consumer.internals.AbstractCoordinator} class.
 * See {@link ConsumerCoordinator} for example usage.
 * <p>
 * From a high level, Kafka's group management protocol consists of the following sequence of
 * actions:
 *
 * <ol>
 *     <li>Group Registration: Group members register with the coordinator providing their own metadata
 *         (such as the set of topics they are interested in).</li>
 *     <li>Group/Leader Selection: The coordinator select the members of the group and chooses one member
 *         as the leader.</li>
 *     <li>State Assignment: The leader collects the metadata from all the members of the group and
 *         assigns state.</li>
 *     <li>Group Stabilization: Each member receives the state assigned by the leader and begins
 *         processing.</li>
 * </ol>
 * <p>
 * To leverage this protocol, an implementation must define the format of metadata provided by each
 * member for group registration in {@link #metadata()} and the format of the state assignment provided
 * by the leader in {@link #performAssignment(String, String, List)} and becomes available to members in
 * {@link #onJoinComplete(int, String, String, ByteBuffer)}.
 * <p>
 * Note on locking: this class shares state between the caller and a background thread which is
 * used for sending heartbeats after the client has joined the group. All mutable state as well as
 * state transitions are protected with the class's monitor. Generally this means acquiring the lock
 * before reading or writing the state of the group (e.g. generation, memberId) and holding the lock
 * when sending a request that affects the state of the group (e.g. JoinGroup, LeaveGroup).
 */
public interface Coordinator extends Closeable {

    /**
     * Unique identifier for the class of supported protocols (e.g. "consumer" or "connect").
     * @return Non-null protocol type name
     */
    String protocolType();

    /**
     * Get the current list of protocols and their associated metadata supported
     * by the local member. The order of the protocols in the list indicates the preference
     * of the protocol (the first entry is the most preferred). The coordinator takes this
     * preference into account when selecting the generation protocol (generally more preferred
     * protocols will be selected as long as all members support them and there is no disagreement
     * on the preference).
     * @return Non-empty map of supported protocols and metadata/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.clients.consumer;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;

/**
 * Coordinator interface implements group management for a single group member by interacting with
 * a designated Kafka broker (the coordinator). Group semantics are provided by extending this class.
 * See {@link ConsumerCoordinator} for example usage.
 *
 * From a high level, Kafka's group management protocol consists of the following sequence of actions:
 *
 * <ol>
 *     <li>Group Registration: Group members register with the coordinator providing their own metadata
 *         (such as the set of topics they are interested in).</li>
 *     <li>Group/Leader Selection: The coordinator select the members of the group and chooses one member
 *         as the leader.</li>
 *     <li>State Assignment: The leader collects the metadata from all the members of the group and
 *         assigns state.</li>
 *     <li>Group Stabilization: Each member receives the state assigned by the leader and begins
 *         processing.</li>
 * </ol>
 *
 * To leverage this protocol, an implementation must define the format of metadata provided by each
 * member for group registration in {@link #metadata()} and the format of the state assignment provided
 * by the leader in {@link #performAssignment(String, String, List)} and becomes available to members in
 * {@link #onJoinComplete(int, String, String, ByteBuffer)}.
 *
 * Note on locking: this class shares state between the caller and a background thread which is
 * used for sending heartbeats after the client has joined the group. All mutable state as well as
 * state transitions are protected with the class's monitor. Generally this means acquiring the lock
 * before reading or writing the state of the group (e.g. generation, memberId) and holding the lock
 * when sending a request that affects the state of the group (e.g. JoinGroup, LeaveGroup).
 */
public interface Coordinator extends Closeable {

    /**
     * Unique identifier for the class of supported protocols (e.g. "consumer" or "connect").
     * @return Non-null protocol type name
     */
    JoinGroupRequestData.JoinGroupRequestProtocolCollectionString metadataprotocolType();

    /**
     * InvokedGet priorthe tocurrent eachlist groupof joinprotocols or rejoin. This is typically used to perform any
     * cleanup from the previous generation (such as committing offsets for the consumer)and their associated metadata supported
     * by the local member. The order of the protocols in the list indicates the preference
     * @paramof generationthe Theprotocol previous(the generationfirst orentry -1is ifthe theremost was none
     * @param memberId The identifier of this member in the previous group or "" if there was nonepreferred). The coordinator takes this
     * preference into account when selecting the generation protocol (generally more preferred
     */
 protocols will be selected void onJoinPrepare(int generation, String memberId);

    /**as long as all members support them and there is no disagreement
     * Perform assignment foron the grouppreference).
 This is used by the* leader@return toNon-empty pushmap stateof tosupported allprotocols theand membersmetadata
     */
 of the group (eJoinGroupRequestData.g. to push partition assignments in the case of the new consumer)
     * @param leaderId The id of the leader (which is this member)
     * @param protocol The protocol selected by the coordinatorJoinGroupRequestProtocolCollection metadata();

    /**
     * Invoked prior to each group join or rejoin. This is typically used to perform any
     * cleanup from the previous generation (such as committing offsets for the consumer)
     * @param generation The allMemberMetadataprevious Metadatageneration fromor all-1 membersif ofthere thewas groupnone
     * @return@param AmemberId mapThe fromidentifier of eachthis member to their state assignment in the previous group or "" if there was none
     */
    Map<String, ByteBuffer> performAssignment(String leaderId,
  void onJoinPrepare(int generation, String memberId);

    /**
     * Perform assignment for the group. This is used by the leader to push state to all the members
     * of the group (e.g. to push partition assignments in the case of the new consumer)
    String protocol,
* @param leaderId The id of the leader (which is this member)
     * @param protocol The protocol selected by the coordinator
     * @param allMemberMetadata Metadata from all members of the group
     * @return A map from each member to their state List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata);

assignment
     */**
    Map<String, *ByteBuffer> Invoked when a group member has successfully joined a group. If this call fails with an exception,
performAssignment(String leaderId,
                    * then it will be retried using the same assignment state on the next call to {@link #ensureActiveGroup()}.
     *
     * @param generation The generation that was joined
     * @param memberId The identifier for theString localprotocol,
 member in the group
     * @param protocol The protocol selected by the coordinator
     * @param memberAssignment The assignment propagated from the group leader
     */
    void onJoinComplete(int generation,
                       List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata);

    String memberId,/**
     * Invoked when a group member has successfully joined a group. If this call fails with an exception,
     * Stringthen protocol,
it will be retried using the same assignment state on the next call to {@link #ensureActiveGroup()}.
     *
     * @param generation ByteBuffer memberAssignment);

    /**The generation that was joined
     * Invoked@param priormemberId toThe eachidentifier leavefor groupthe event.local Thismember isin typically used to cleanup assigned partitions;the group
     * @param noteprotocol itThe isprotocol triggeredselected by the consumer'scoordinator
 API caller thread (i.e. background* heartbeat@param threadmemberAssignment would
The assignment propagated from the *group leader
 not trigger it even if*/
 it tries to forcevoid leaving group upon heartbeat session expiration)
onJoinComplete(int generation,
         */
    default void onLeavePrepare() {}

    /**
     * Ensure that the groupString ismemberId,
 active (i.e.  joined and synced)
     */
    void ensureActiveGroup();

    /**
     * Get the current generationString stateprotocol,
 regardless of whether it is currently stable.
     * Note that the generation information can be updated while we are still in the middle
  ByteBuffer memberAssignment);

  * of a rebalance, after the join-group response is received.
     *
     * @return the current generation/**
     * Invoked prior to each leave group event. This is typically used to cleanup assigned partitions;
     */
 note it is triggered Generation generation();

    /**by the consumer's API caller thread (i.e. background heartbeat thread would
     * Get the current generation state if the not trigger it even if it tries to force leaving group isupon stable,heartbeat otherwise return nullsession expiration)
     */
    default * @return the current generation or null
     */
    Generation generationIfStable();

    String memberId();

    class Generation {
        public static final Generation NO_GENERATION = new Generation(
                OffsetCommitRequest.DEFAULT_GENERATION_ID,
                JoinGroupRequest.UNKNOWN_MEMBER_ID,
                null);

        public final int generationId;
        public final String memberId;
        public final String protocolName;

        public Generation(int generationId, String memberId, String protocolName) {
            this.generationId = generationId;
            this.memberId = memberId;
            this.protocolName = protocolName;
        }

        /**
         * @return true if this generation has a valid member id, false otherwise. A member might have an id before
         * it becomes part of a group generation.
         */
        public boolean hasMemberId() {
            return !memberId.isEmpty();
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            final Generation that = (Generation) o;
            return generationId == that.generationId &&
                    Objects.equals(memberId, that.memberId) &&
                    Objects.equals(protocolName, that.protocolName);
        }

        @Override
        public int hashCode() {
            return Objects.hash(generationId, memberId, protocolName);
        }

        @Override
        public String toString() {
            return "Generation{" +
                    "generationId=" + generationId +
                    ", memberId='" + memberId + '\'' +
                    ", protocol='" + protocolName + '\'' +
                    '}';
        }
    }
}

Proposed Changes

This KIP proposes to:

void onLeavePrepare() {}

}

Proposed Changes

This KIP proposes to:

  1. Pull the abstract methods from the AbstractCoordinator class into a new interface, which will be part of Kafka's public API.
  2. Change the visibility (from protected to public) of methods that have been added to this interface when needed.
  3. Move these classes from the org.apache.clients.consumer.internals package to org.apache.clients.consumer
    1. org.apache.kafka.clients.consumer.internals.AbstractCoordinator → org.apache.kafka.clients.consumer.AbstractCoordinator
    2. org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient → org.apache.kafka.clients.consumer.ConsumerNetworkClient
    3. org.apache.kafka.clients.consumer.internals.Heartbeat → org.apache.kafka.clients.consumer.Heartbeat
    4. org.apache.kafka.clients.consumer.internals.RequestFuture → org.apache.kafka.clients.consumer.RequestFuture
    5. org.apache.kafka.clients.consumer.internals.RequestFutureAdapter → org.apache.kafka.clients.consumer.RequestFutureAdapter
    6. org.apache.kafka.clients.consumer.internals.RequestFutureListener → org.apache.kafka.clients.consumer.RequestFutureListener
  4. Adjust the method visibility for the moved classes when required
  5. Pull the abstract methods from the AbstractCoordinator class into a new interface, which will be part of Kafka's public API.
  6. Change the visibility (from protected to public) of methods that have been added to this interface when needed.
  7. This KIP does not include any changes to the Admin APIs. Potential changes to the Admin/KafkaAdminClient classes (such as adding methods to query for group metadata from brokers) will be addressed in a separate KIP.

...

  • As all members of the new interface are public, clients who extend the existing AbstractCoordinator class will potentially have to change the visibility of the overridden abstract methods.
  • Also, as classes are being relocated, the package names of clients extending AbstractCoordinator will have to change accordingly.
  • No functional changes are planned as part of this KIP, so these changes won't impact the vast majority of clients.

...