Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note: This KIP was earlier published with the title "Atomic commit of source connector records and offsets", which corresponded to KAFKA-10000. It has now been expanded to also include general EoS support described in KAFKA-6080.

Background and References

...

Once an offset commit is complete, if the connector is (implicitly or explicitly) configured with a separate offsets topic, the committed offsets will also be written to the worker’s global offsets topic using a non-transactional producer and the worker’s principal. This will be handled on a separate thread from the task’s work and offset commit threads, and should not block or interfere with the task at all. If the worker fails to write these offsets for some reason, it will retry indefinitely, but not fail the task. This will be done in order to facilitate "hard" downgrades and cases where users switch from per-connector offsets topics back to the global offsets topic.

...

When a worker receives a request to this endpoint, it will:

  1. Check to see if a rebalance is pending and, if it is, serve an HTTP 409 CONFLICT response. (This step is actually unnecessary and has been removed from the implementation; see
    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyKAFKA-15059
    )
  2. Check to see if it is the leader. If it is not the leader, either forward the request to the leader or fail the request, using at most two hops. This two-hop strategy is already implemented in Connect for other REST endpoints that require forwarding of requests, so it is not described in great detail here.
  3. Verify that the connector in the request URL both exists and is a source connector; if not, fail the request.
  4. Read to the end of the config topic.
  5. If there is a task count record for the connector in the config topic after the latest set of task configs for the connector, serve an empty-bodied 200 response.
  6. If there is an existing task count record for the connector in the config topic, and either the task count in that record is above 1 or the new number of tasks for the connector is above 1*:
    1. Fence out all producers that may still be active for prior task instances of this connector by instantiating an admin client using the connector principal and invoking Admin::fenceProducers (a new API described below; see Admin API to Fence out Transactional Producers). The transactional IDs will be the IDs that each task of the connector would have used, assuming as many tasks were active as the most recent task count record for the connector. This will be done in parallel.
    2. If the leader receives a request to write new task configs for the connector during this period, the round of fencing will be immediately cancelled and an HTTP 409 CONFLICT response will be served.
  7. Write the new task count record for the fenced-out connector to the config topic
  8. Read to the end of the config topic to verify that the task count record has been written successfully.
  9. Serve an empty-bodied 200 response.

...

* - The check for task counts is done to avoid unnecessary fencing work for permanently single-task connectors, such as Debezium's CDC source connectors. If the most recent task count record for a connector shows one task, there is only one task that needs to be fenced out. And, if the new configuration for that connector contains one task, that new task will automatically fence out its single predecessor as they will use the same transactional ID. This same logic does not apply for multi-task connectors, even when the number of tasks is unchanged after a reconfiguration; for details on why, see the rejected alternative Non-generational task fencing.

Preparation for rebalance

...

When exactly.once.source.support  is set to enabled  for a worker, extra steps will be taken to ensure that tasks are only run when it is safe to do so.

Before a worker is starts a source task, it will first send a request to the leader's internal zombie fencing endpoint for the task's connector. If that request fails for any reason, the task will be marked as FAILED  and startup will be aborted.

...

The Admin interface will be expanded to include new methods for fencing out producers by transactional ID. This same functionality is already possible by employing one or more transactional producers and invoking their initTransactions  methods, but is replicated on the admin client to make it easier to use for other cases (for more details, see the Fencing-only producers rejected alternative):

Code Block
languagejava
titleAdmin.java
public interface Admin {

    /**
     * Fence out all active producers that use any of the provided transactional IDs.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @param options  		   The options to use when fencing the producers.
     * @return The FenceProducersResult.
     */
    FenceProducersResult fenceProducers(Collection<String> transactionalIds,
                                        FenceProducersOptions options);

    /**
     * Fence out all active producers that use any of the provided transactional IDs, with the default options.
     * <p>
     * This is a convenience method for {@link #fenceProducers(Collection, FenceProducersOptions)}
     * with default options. See the overload for more details.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @return The FenceProducersResult.
     */
    default FenceProducersResult fenceProducers(Collection<String> transactionalIds) {
        return fenceProducers(transactionalIds, new FenceProducersOptions());
    }

}

...

Two upgrades will be required in order to ensure that the internal fencing endpoint is available on every worker before it is required by any of them, and that the no nonno non-leader workers are able to write to the config topic during the upgrade.

...

Rejected because: there is no one-size-fits-all strategy for transaction boundaries that can be expected to accommodate every reasonable combination of connector and use case. Defining transactions on the batches returned from SourceTask::poll  would heavily limit throughput for connectors that frequently produce small record batches. Defining transactions on an interval would add a latency penalty to the records at the beginning of these transactions and, in the case of very large transactions, would inflate the memory requirements of downstream consumers (which would have to buffer the entire transaction locally within each topic-partition before beginning to process any of the records in it (this is actually incorrect; consumers do not have to buffer transactions locally)). And some connectors may have no reasonable way to define their own transaction boundaries at all.

...