Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Workers are already very permissive about the content of their offsets topics and will not break or even log a warning message if they encounter anything that doesn’t appear to be a source offset. For the sake of cleanliness, we choose a key format that closely mirrors the existing key format for source offsets, which consists of an array containing the name of the connector and the source partition. The value only needs to be non-null so that record of a migration can be kept even after topic compaction has occurred; for readability’s sake (if the topic ever needs to be inspected for debugging, for example), we make it more explicit that this record refers to offset migration.

Admin API to Fence out Transactional Producers

Java API additions

The Admin interface will be expanded to include new methods for fencing out producers by transactional ID:

Code Block
languagejava
titleAdmin.java
public interface Admin {

    /**
     * Fence out all active producers that use any of the provided transactional IDs.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @param options  		   The options to use when fencing the producers.
     * @return The FenceProducersResult.
     */
    FenceProducersResult fenceProducers(Collection<String> transactionalIds,
                                        FenceProducersOptions options);

    /**
     * Fence out all active producers that use any of the provided transactional IDs, with the default options.
     * <p>
     * This is a convenience method for {@link #fenceProducers(Collection, FenceProducersOptions)}
     * with default options. See the overload for more details.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @return The FenceProducersResult.
     */
    default FenceProducersResult fenceProducers(Collection<String> transactionalIds) {
        return fenceProducers(groupIds, new FenceProducersOptions());
    }

}

New classes for the API will be added as well.

The FenceProducersResult  class:

Code Block
languagejava
titleFenceProducersResult.java
package org.apache.kafka.clients.admin;

/**
 * The result of the {@link Admin#fenceProducers(Collection)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersResult {

    /**
     * Return a map from transactional ID to futures which can be used to check the status of
     * individual fencings.
     */
    public Map<String, KafkaFuture<Void>> fencedProducers() {
		// Implementation goes here
    }

	/**
	 * Returns a future that provides the producer ID generated while initializing the given transaction when the request completes.
	 */
	public KafkaFuture<Long> producerId(String transactionalId) {
		// Implementation goes here
	}

	/**
 	 * Returns a future that provides the epoch ID generated while initializing the given transaction when the request completes.
	 */
	public KafkaFuture<Short> epochId(String transactionalId) {
		// Implementation goes here
	}

    /**
     * Return a future which succeeds only if all the producer fencings succeed.
     */
    public KafkaFuture<Void> all() {
		// Implementation goes here
    }

}

And the FenceProducersOptions  class:

Code Block
languagejava
titleFenceProducersOptions.java
package org.apache.kafka.clients.admin;

/**
 * Options for {@link AdminClient#fenceProducers(Collection, FenceProducersOptions)}
 *
 * The API of this class is evolving. See {@link AdminClient} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersOptions extends AbstractOptions<FenceProducersOptions> {
    // No options (yet) besides those inherited from AbstractOptions
}

Implementation

Under the hood, this will be implemented by finding the transaction coordinator for each transactional ID via the FindCoordinator protocol, then initializing a new transaction with that ID by invoking the InitProducerId protocol with no producer ID and no producer epoch. In the words of KIP-98, this:

  1. Bumps up the epoch of the PID, so that the any previous zombie instance of the producer is fenced off and cannot move forward with its transaction.
  2. Recovers (rolls forward or rolls back) any transaction left incomplete by the previous instance of the producer.

Zombie Fencing

With exactly.once.source.enabled set to true for a worker, the worker will begin taking steps to ensure that zombie source tasks in the cluster do not compromise exactly-once delivery support.

Task count records

A new type of record, a “task count record”, will be used in the config topic. This record explicitly tracks the number of task producers that will have to be fenced out if a connector is reconfigured before bringing up any tasks with the new set of task configurations, and will implicitly track whether it is necessary to perform a round of fencing before starting tasks for a connector.

An example of this record for a connector named “reddit-source” with 11 tasks would look like:

    key: “tasks-count-reddit-source”
value: {“tasks”: 11}

If the latest task count record for a connector comes after its latest set of task configurations in the config topic, workers will be allowed to create and run tasks for that connector. For example, if the contents of the config topic (represented by the keys of each record) are as follows:

    offset 0: task-reddit-source-0
offset 1: task-reddit-source-1
offset 2: commit-reddit-source
offset 3: tasks-count-reddit-source

then workers will be able to freely bring up tasks for the reddit-source connector.

However, if the contents of the config topic are this:

    offset 0: tasks-count-reddit-source
offset 1: task-reddit-source-0
offset 2: task-reddit-source-1
offset 3: commit-reddit-source

then workers will not bring up tasks for the connector.

Fencing during rebalance

All workers

When a rebalance is triggered, before (re-)joining the cluster group, all workers will preemptively stop all tasks of all source connectors for which task configurations are present in the config topic after the latest task count record. This step is not necessary to preserve exactly-once delivery guarantees, but should provide a reasonable opportunity for tasks to shut down gracefully before being forcibly fenced out.

Leader

During rebalance, the leader will take on a few extra responsibilities. Once all group members have joined and it is time to perform assignment the leader will:

  1. For each to-be-assigned source connector, if there is a set of task configurations present in the config topic after the most recent task count record for that connector:
    1. If there is an existing task count record for the connector in the config topic, and either the task count in that record is above 1 or the new number of tasks for the connector is above 1*:
      1. Fence out all producers that may still be active for prior task instances of this connector by instantiating an admin client using the connector principal and invoking Admin::fenceProducers. The transactional IDs will be the IDs that each task of the connector would have used, assuming as many tasks were active as the most recent task count record for the connector.** 
    2. Write the new task count record for all of the fenced-out connectors to the config topic
  2. Read to the end of the config topic to verify that all task count records have been written successfully, and use the new latest offset of the config topic as the offset in the assignment given out to members of the cluster.

* - The check for task counts is done to avoid unnecessary fencing work for permanently single-task connectors, such as Debezium's CDC source connectors. If the most recent task count record for a connector shows one task, there is only one task that needs to be fenced out. And, if the new configuration for that connector contains one task, that new task will automatically fence out its single predecessor as they will use the same transactional ID. 

**  - This step will be done in parallel for all connectors

Source task startup

When a worker is assigned a source task, it will check to see if a task count record is present in the config topic after the most recent task configs for that connector.

If that is the case, the worker can assume that all producers for all prior generations of tasks for that connector have been fenced out, and it will bring up the source tasks it has been assigned. 

Otherwise, the worker will emit a warning log message and refuse to bring the task up until a rebalance occurs and it has a chance to check again for a task count record after the most recent set of task configurations for the connector.

Once a worker has instantiated a producer for a source task, it will read to the end of the config topic once more, and if a new set of task configurations for that connector has been generated, it will abort startup of the task.

To summarize, the startup process for a task will be:

  1. Worker is assigned a source task during rebalance
  2. If there is no task count record present in the config topic for that connector, or there is a set of task configurations present after the latest task count record, refuse to bring up the task and await a subsequent rebalance
  3. Otherwise:
    1. Instantiate a transactional producer for the task
    2. Read to the end of the config topic
    3. If a new set of task configurations has since been generated for the connector, abort startup
    4. Otherwise, begin polling the task for data

Leader access to config topic

After a rebalance, a worker may discover that it has become the leader of the cluster. When this happens, the worker will instantiate a transactional producer whose transactional ID is connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster. If users try to override this by setting the transactional.id worker property, the user-provided value will be ignored and a message will be logged notifying them of this fact. The worker will use this producer for all writes it performs on the config topic. It will begin and commit a transaction for every record it writes. This may seem unusual--why use a transactional producer if transactions aren’t really necessary?--but it ensures that only the most recent leader is capable of producing to the config topic and that zombie leaders (rare though they may be) will not be able to.

If, for example, a leader stalls while attempting to fence out a prior generation of task producers for a given connector during a rebalance, it may fall out of the group and become a zombie. If another worker is elected leader at this point and the connector is then reconfigured, it’s possible that the zombie leader may become unblocked and then attempt to write a task count record to the config topic after a new set of task configurations are written to the topic. This would corrupt the state of the config topic and violate a key assumption: a task count record present after task configurations for a connector means that it is not necessary to fence out a prior generation of producers for that connector’s tasks before starting them.

...

Zombie Fencing

With exactly.once.source.enabled set to true for a worker, the worker will begin taking steps to ensure that zombie source tasks in the cluster do not compromise exactly-once delivery support.

Task count records

A new type of record, a “task count record”, will be used in the config topic. This record explicitly tracks the number of task producers that will have to be fenced out if a connector is reconfigured before bringing up any tasks with the new set of task configurations, and will implicitly track whether it is necessary to perform a round of fencing before starting tasks for a connector.

An example of this record for a connector named “reddit-source” with 11 tasks would look like:

    key: “tasks-count-reddit-source”
value: {“tasks”: 11}

If the latest task count record for a connector comes after its latest set of task configurations in the config topic, workers will be allowed to create and run tasks for that connector. For example, if the contents of the config topic (represented by the keys of each record) are as follows:

    offset 0: task-reddit-source-0
offset 1: task-reddit-source-1
offset 2: commit-reddit-source
offset 3: tasks-count-reddit-source

then workers will be able to freely bring up tasks for the reddit-source connector.

However, if the contents of the config topic are this:

    offset 0: tasks-count-reddit-source
offset 1: task-reddit-source-0
offset 2: task-reddit-source-1
offset 3: commit-reddit-source

then workers will not bring up tasks for the connector.

Fencing during rebalance

All workers

When a rebalance is triggered, before (re-)joining the cluster group, all workers will preemptively stop all tasks of all source connectors for which task configurations are present in the config topic after the latest task count record. This step is not necessary to preserve exactly-once delivery guarantees, but should provide a reasonable opportunity for tasks to shut down gracefully before being forcibly fenced out.

Leader

During rebalance, the leader will take on a few extra responsibilities. Once all group members have joined and it is time to perform assignment the leader will:

  1. For each to-be-assigned source connector, if there is a set of task configurations present in the config topic after the most recent task count record for that connector:
    1. If there is an existing task count record for the connector in the config topic, and either the task count in that record is above 1 or the new number of tasks for the connector is above 1*:
      1. Fence out all producers that may still be active for prior task instances of this connector by instantiating an admin client using the connector principal and invoking Admin::fenceProducers (a new API described below; see Admin API to Fence out Transactional Producers). The transactional IDs will be the IDs that each task of the connector would have used, assuming as many tasks were active as the most recent task count record for the connector.** 
    2. Write the new task count record for all of the fenced-out connectors to the config topic
  2. Read to the end of the config topic to verify that all task count records have been written successfully, and use the new latest offset of the config topic as the offset in the assignment given out to members of the cluster.

* - The check for task counts is done to avoid unnecessary fencing work for permanently single-task connectors, such as Debezium's CDC source connectors. If the most recent task count record for a connector shows one task, there is only one task that needs to be fenced out. And, if the new configuration for that connector contains one task, that new task will automatically fence out its single predecessor as they will use the same transactional ID. 

**  - This step will be done in parallel for all connectors

Source task startup

When a worker is assigned a source task, it will check to see if a task count record is present in the config topic after the most recent task configs for that connector.

If that is the case, the worker can assume that all producers for all prior generations of tasks for that connector have been fenced out, and it will bring up the source tasks it has been assigned. 

Otherwise, the worker will emit a warning log message and refuse to bring the task up until a rebalance occurs and it has a chance to check again for a task count record after the most recent set of task configurations for the connector.

Once a worker has instantiated a producer for a source task, it will read to the end of the config topic once more, and if a new set of task configurations for that connector has been generated, it will abort startup of the task.

To summarize, the startup process for a task will be:

  1. Worker is assigned a source task during rebalance
  2. If there is no task count record present in the config topic for that connector, or there is a set of task configurations present after the latest task count record, refuse to bring up the task and await a subsequent rebalance
  3. Otherwise:
    1. Instantiate a transactional producer for the task
    2. Read to the end of the config topic
    3. If a new set of task configurations has since been generated for the connector, abort startup
    4. Otherwise, begin polling the task for data

Leader access to config topic

After a rebalance, a worker may discover that it has become the leader of the cluster. When this happens, the worker will instantiate a transactional producer whose transactional ID is connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster. If users try to override this by setting the transactional.id worker property, the user-provided value will be ignored and a message will be logged notifying them of this fact. The worker will use this producer for all writes it performs on the config topic. It will begin and commit a transaction for every record it writes. This may seem unusual--why use a transactional producer if transactions aren’t really necessary?--but it ensures that only the most recent leader is capable of producing to the config topic and that zombie leaders (rare though they may be) will not be able to.

If, for example, a leader stalls while attempting to fence out a prior generation of task producers for a given connector during a rebalance, it may fall out of the group and become a zombie. If another worker is elected leader at this point and the connector is then reconfigured, it’s possible that the zombie leader may become unblocked and then attempt to write a task count record to the config topic after a new set of task configurations are written to the topic. This would corrupt the state of the config topic and violate a key assumption: a task count record present after task configurations for a connector means that it is not necessary to fence out a prior generation of producers for that connector’s tasks before starting them.

By using a transactional producer on the leader, we can guarantee that a leader will only be able to write a task count record to the config topic if no other workers have become the leader and written new task configurations for the connector to the topic since then.

Admin API to Fence out Transactional Producers

Java API additions

The Admin interface will be expanded to include new methods for fencing out producers by transactional ID:

Code Block
languagejava
titleAdmin.java
public interface Admin {

    /**
     * Fence out all active producers that use any of the provided transactional IDs.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @param options  		   The options to use when fencing the producers.
     * @return The FenceProducersResult.
     */
    FenceProducersResult fenceProducers(Collection<String> transactionalIds,
                                        FenceProducersOptions options);

    /**
     * Fence out all active producers that use any of the provided transactional IDs, with the default options.
     * <p>
     * This is a convenience method for {@link #fenceProducers(Collection, FenceProducersOptions)}
     * with default options. See the overload for more details.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @return The FenceProducersResult.
     */
    default FenceProducersResult fenceProducers(Collection<String> transactionalIds) {
        return fenceProducers(groupIds, new FenceProducersOptions());
    }

}

New classes for the API will be added as well.

The FenceProducersResult  class:

Code Block
languagejava
titleFenceProducersResult.java
package org.apache.kafka.clients.admin;

/**
 * The result of the {@link Admin#fenceProducers(Collection)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersResult {

    /**
     * Return a map from transactional ID to futures which can be used to check the status of
     * individual fencings.
     */
    public Map<String, KafkaFuture<Void>> fencedProducers() {
		// Implementation goes here
    }

	/**
	 * Returns a future that provides the producer ID generated while initializing the given transaction when the request completes.
	 */
	public KafkaFuture<Long> producerId(String transactionalId) {
		// Implementation goes here
	}

	/**
 	 * Returns a future that provides the epoch ID generated while initializing the given transaction when the request completes.
	 */
	public KafkaFuture<Short> epochId(String transactionalId) {
		// Implementation goes here
	}

    /**
     * Return a future which succeeds only if all the producer fencings succeed.
     */
    public KafkaFuture<Void> all() {
		// Implementation goes here
    }

}

And the FenceProducersOptions  class:

Code Block
languagejava
titleFenceProducersOptions.java
package org.apache.kafka.clients.admin;

/**
 * Options for {@link AdminClient#fenceProducers(Collection, FenceProducersOptions)}
 *
 * The API of this class is evolving. See {@link AdminClient} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersOptions extends AbstractOptions<FenceProducersOptions> {
    // No options (yet) besides those inherited from AbstractOptions
}

Implementation

Under the hood, this will be implemented by finding the transaction coordinator for each transactional ID via the FindCoordinator protocol, then initializing a new transaction with that ID by invoking the InitProducerId protocol with no producer ID and no producer epoch. In the words of KIP-98, this:

  1. Bumps up the epoch of the PID, so that the any previous zombie instance of the producer is fenced off and cannot move forward with its transaction.
  2. Recovers (rolls forward or rolls back) any transaction left incomplete by the previous instance of the producer.

Addressed failure/degradation scenarios

...

Additionally, the connector's admin principal must be given the following permissions on the Kafka cluster it reads offsets from no matter what:

OperationResource TypeResource Name
DescribeTopicOffsets topic used by the connector, which is the value of the offsets.storage.topic property in the connector’s configuration. If not provided, this ACL is not needed, as the worker’s shared offsets topic should be created automatically before any connectors are started.

This is necessary in order to ensure that the Connect worker has read up to the end of the offsets topic, even when there are open transactions on the topic at the start of the read to the end. This will be accomplished by listing the end offsets for the offsets topic using an admin client with the read_uncommitted  isolation level, and then consuming from that topic with the read_committed  isolation level until at least those offsets listed by the admin client.

...