Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Preparation for rebalance

When exactly.once.source.support  is set to enabled  and a new set of task configurations for a connector is detected, then workers will preemptively stop source tasks for that connector. In greater detail:

When a rebalance is triggered, before rejoining the cluster group, workers will preemptively stop all tasks of all source connectors for which task configurations are present in the config topic after the latest task count record. This step is not necessary to preserve exactly-once delivery guarantees, but should provide a reasonable opportunity for tasks to shut down gracefully before being forcibly fenced out.

Stopping these tasks will be done in parallel (as en-masse connector/task stop/start is already done for distributed workers), and any tasks that the worker cannot stop within the graceful task shutdown timeout period (which defaults to five seconds but can be controlled via the task.shutdown.graceful.timeout.ms  worker property) will be abandoned and allowed to continue running. They will be disabled later on if/when the leader fences out their producers during a round of zombie fencing (described below).

Because this work will be done in parallel, the default timeout for task shutdown is fairly low (five seconds), and the current number of threads used to perform en-masse task stops is eight, it is highly unlikely that this will hinder a worker's ability to rejoin a group before the rebalance timeout (which defaults to sixty seconds) times out.

This section was the result of a misunderstanding of the herder's rebalance mechanics, and is actually unnecessary. The worker already performs this preemptive stop for all reconfigured tasks (if using incremental rebalancing) or all tasks (if using eager rebalancing) before (re)joining the group during a rebalance.

Source task startup

When exactly.once.source.support  is set to enabled  for a worker, extra steps will be taken to ensure that tasks are only run when it is safe to do so.

...

The Admin interface will be expanded to include new methods for fencing out producers by transactional ID. This same functionality is already possible by employing one or more transactional producers and invoking their initTransactions  methods, but is replicated on the admin client to make it easier to use for other cases (for more details, see the Fencing-only producers rejected alternative):

Code Block
languagejava
titleAdmin.java
public interface Admin {

    /**
     * Fence out all active producers that use any of the provided transactional IDs.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @param options  		   The options to use when fencing the producers.
     * @return The FenceProducersResult.
     */
    FenceProducersResult fenceProducers(Collection<String> transactionalIds,
                                        FenceProducersOptions options);

    /**
     * Fence out all active producers that use any of the provided transactional IDs, with the default options.
     * <p>
     * This is a convenience method for {@link #fenceProducers(Collection, FenceProducersOptions)}
     * with default options. See the overload for more details.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @return The FenceProducersResult.
     */
    default FenceProducersResult fenceProducers(Collection<String> transactionalIds) {
        return fenceProducers(transactionalIds, new FenceProducersOptions());
    }

}

...