Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Voting Accepted

Discussion threadhere

Voting threadhere

...

Note: This KIP was earlier published with the title "Atomic commit of source connector records and offsets", which corresponded to KAFKA-10000. It has now been expanded to also include general EoS support described in KAFKA-6080.

Background and References

...

Once an offset commit is complete, if the connector is (implicitly or explicitly) configured with a separate offsets topic, the committed offsets will also be written to the worker’s global offsets topic using a non-transactional producer and the worker’s principal. This will be handled on a separate thread from the task’s work and offset commit threads, and should not block or interfere with the task at all. If the worker fails to write these offsets for some reason, it will retry indefinitely, but not fail the task. This will be done in order to facilitate "hard" downgrades and cases where users switch from per-connector offsets topics back to the global offsets topic.

...

Because of this, it cannot be guaranteed that these methods will be invoked after every successful offset commit. However, as long as a task is able to accurately recover based solely off of the offset information provided to the Connect framework with each record it has produced, this should not compromise exactly-once delivery guarantees. These methods can and should still be used to clean up and relinquish resources after record delivery is guaranteed, but using them to track offsets will prevent a connector from being viable for exactly-once support.

Per-connector offsets topics

Regardless of the value for the exactly.once.source.support property on a worker, source connectors will be allowed to use custom offsets topics, configurable via the offsets.storage.topic property.

Motivation

First, with the advent of KIP-458, it is now possible to bring up a single Connect cluster whose connectors each target a different Kafka cluster (via the producer.override.bootstrap.servers, consumer.override.bootstrap.servers, and/or admin.override.bootstrap.servers connector properties). At the moment, source task offsets are still tracked in a single global offsets topic for the entire Connect cluster; however, if the producer for a task is used to write source offsets, and that producer targets a different Kafka cluster than the one the Connect worker uses for its internal topics, that will no longer be possible. In order to support this case well, per-connector offsets topics must be reasoned about, to allow for smooth upgrades and downgrades where large gaps in offsets data are not created unnecessarily (see “Migration” below). Although users do not necessarily need to be given control over the name of the offsets topic for each connector in this case (the worker could simply create an offsets topic with the same name on the connector’s overridden Kafka cluster, for example), exposing this level of control adds very little complexity to this design once the necessity for per-connector offsets topics is realized. As a benefit, it should make it easier for users to configure connectors in secured environments where the connector principal may have limited access on the Kafka cluster it targets. For users and developers familiar with Kafka Streams, this is the major differentiating factor that makes a global offsets topic impossible.

...

Finally, it allows users to limit the effect that hanging transactions on an offsets topic will have. If tasks A and B use the same offsets topic, and task A initiates a transaction on that offsets topic right before task B starts up, then task A dies suddenly without committing its transaction, task B will have to wait for that transaction to time out before it can read to the end of the offsets topic. If the transaction timeout is set very high for task A (to accommodate bursts of high throughput, for example), this will block task B from processing any data for a long time. Although this scenario may be unavoidable in some cases, using a dedicated offsets topic for each connector should allow cluster administrators to isolate the blast radius of a hanging transaction on an offsets topic. This way, although tasks of the same connector may still interfere with each other, they will at least not interfere with tasks of other connectors. This should be sufficient for most multitenant environments.

Configuration

Only the name of the per-connector offsets topic will be configurable by users. Other properties, such as the number of partitions and the replication factor, will be derived from the worker config using the behavior described in KIP-605.

Hosting Kafka cluster

Regardless of the value for the exactly.once.source.support property on a worker, if a connector configuration contains a value for the offsets.storage.topic  property, it will use an offsets topic with that name on the Kafka cluster that it produces data to (which may be different from the one that hosts the worker's global offsets topic).

Implicit usage

A per-connector offsets topic might be implicitly configured under certain circumstances. Specifically, this will occur (when exactly-once source support is enabled) for connectors whose configurations do not contain the offsets.storage.topic  property, but do contain an overridden bootstrap.servers  value that causes the connector to target a different Kafka cluster than the one that hosts the worker's global offsets topic.

...

The name of this topic will be the same as the name of the global offsets topic, which is controlled by the offsets.storage.topic  property in the worker config.

Creation

If a connector is explicitly or implicitly configured to use a separate offsets topic but that topic does not exist yet, the worker will automatically try to create the topic before startup for any task or Connector  instances belonging to that connector. This will be done using an admin client constructed from the connector's configuration (using the various admin.override.*  connector properties, admin.* worker properties, and top-level worker properties in descending order of precedence), using the same logic that the worker already uses for its internal topics.

Smooth migration

When a separate offsets topic is created for the first time, it will naturally be empty. In order to avoid losing offset information that may be stored in the global offsets topic, when a connector or task instance requests offsets from the worker, it will be given a combined view of the offset information present in both its separate offsets topic and the worker's global offsets topic. Precedence will be given to offset information present in the separate offsets topic.

...

When a worker receives a request to this endpoint, it will:

  1. Check to see if a rebalance is pending and, if it is, serve an HTTP 409 CONFLICT response. (This step is actually unnecessary and has been removed from the implementation; see
    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyKAFKA-15059
    )
  2. Check to see if it is the leader. If it is not the leader, either forward the request to the leader or fail the request, using at most two hops. This two-hop strategy is already implemented in Connect for other REST endpoints that require forwarding of requests, so it is not described in great detail here.
  3. Verify that the connector in the request URL both exists and is a source connector; if not, fail the request.
  4. Read to the end of the config topic.
  5. If there is a task count record for the connector in the config topic after the latest set of task configs for the connector, serve an empty-bodied 200 response.
  6. If there is an existing task count record for the connector in the config topic, and either the task count in that record is above 1 or the new number of tasks for the connector is above 1*:
    1. Fence out all producers that may still be active for prior task instances of this connector by instantiating an admin client using the connector principal and invoking Admin::fenceProducers (a new API described below; see Admin API to Fence out Transactional Producers). The transactional IDs will be the IDs that each task of the connector would have used, assuming as many tasks were active as the most recent task count record for the connector. This will be done in parallel.
    2. If the leader receives a request to write new task configs for the connector during this period, the round of fencing will be immediately cancelled and an HTTP 409 CONFLICT response will be served.
  7. Write the new task count record for the fenced-out connector to the config topic
  8. Read to the end of the config topic to verify that the task count record has been written successfully.
  9. Serve an empty-bodied 200 response.

...

* - The check for task counts is done to avoid unnecessary fencing work for permanently single-task connectors, such as Debezium's CDC source connectors. If the most recent task count record for a connector shows one task, there is only one task that needs to be fenced out. And, if the new configuration for that connector contains one task, that new task will automatically fence out its single predecessor as they will use the same transactional ID. This same logic does not apply for multi-task connectors, even when the number of tasks is unchanged after a reconfiguration; for details on why, see the rejected alternative Non-generational task fencing.

Preparation for rebalance

When exactly.once.source.support  is set to enabled  and a new set of task configurations for a connector is detected, then workers will preemptively stop source tasks for that connector. In greater detail:

When a rebalance is triggered, before rejoining the cluster group, workers will preemptively stop all tasks of all source connectors for which task configurations are present in the config topic after the latest task count record. This step is not necessary to preserve exactly-once delivery guarantees, but should provide a reasonable opportunity for tasks to shut down gracefully before being forcibly fenced out.

Stopping these tasks will be done in parallel (as en-masse connector/task stop/start is already done for distributed workers), and any tasks that the worker cannot stop within the graceful task shutdown timeout period (which defaults to five seconds but can be controlled via the task.shutdown.graceful.timeout.ms  worker property) will be abandoned and allowed to continue running. They will be disabled later on if/when the leader fences out their producers during a round of zombie fencing (described below).

Because this work will be done in parallel, the default timeout for task shutdown is fairly low (five seconds), and the current number of threads used to perform en-masse task stops is eight, it is highly unlikely that this will hinder a worker's ability to rejoin a group before the rebalance timeout (which defaults to sixty seconds) times out.

This section was the result of a misunderstanding of the herder's rebalance mechanics, and is actually unnecessary. The worker already performs this preemptive stop for all reconfigured tasks (if using incremental rebalancing) or all tasks (if using eager rebalancing) before (re)joining the group during a rebalance.

Source task startup

When exactly.once.source.support  is set to enabled  for a worker, extra steps will be taken to ensure that tasks are only run when it is safe to do so.

Before a worker is starts a source task, it will first send a request to the leader's internal zombie fencing endpoint for the task's connector. If that request fails for any reason, the task will be marked as FAILED  and startup will be aborted.

...

Note that the IdempotentWrite  ACL has been deprecated as of 2.8 (see see KIP-679) and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters.

...

Before setting exactly.once.source.support  to preparing  or enabled , the producer principal for the worker must be given the following permissions on the Kafka cluster it writes to:

Operation

Resource Type

Resource Name

Write

TransactionalId

connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

Describe

TransactionalId

connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

IdempotentWrite *

Cluster

Kafka cluster targeted by the Connect cluster

* - Note that the IdempotentWrite  ACL has been deprecated as of 2.8 (see see KIP-679) and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters.

...

* - Note that the IdempotentWrite  ACL has been deprecated as of 2.8 (see see KIP-679) and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters.

...

Two upgrades will be required in order to ensure that the internal fencing endpoint is available on every worker before it is required by any of them, and that the no nonno non-leader workers are able to write to the config topic during the upgrade.

...

Rejected because: there is no one-size-fits-all strategy for transaction boundaries that can be expected to accommodate every reasonable combination of connector and use case. Defining transactions on the batches returned from SourceTask::poll  would heavily limit throughput for connectors that frequently produce small record batches. Defining transactions on an interval would add a latency penalty to the records at the beginning of these transactions and, in the case of very large transactions, would inflate the memory requirements of downstream consumers (which would have to buffer the entire transaction locally within each topic-partition before beginning to process any of the records in it (this is actually incorrect; consumers do not have to buffer transactions locally)). And some connectors may have no reasonable way to define their own transaction boundaries at all.

...