Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Voting Accepted

Discussion threadhere

Voting threadhere

...

Note: This KIP was earlier published with the title "Atomic commit of source connector records and offsets", which corresponded to KAFKA-10000. It has now been expanded to also include general EoS support described in KAFKA-6080.

Background and References

...

  • Require as few per-connector changes as possible. The Connect ecosystem is fairly mature at this point and even something as lightweight as a new SourceTask  method would have to be applied to potentially dozens if not hundreds of connectors. The current proposal requires no such changes for existing connectors, as long as they use they use the source offsets API correctly.
  • Minimize configuration changes. The implementation of this feature may grow fairly complex, but it should be easy for users to enable and understand. KIP-98, for example, added exactly-once support for Kafka and its Java clients, and only exposed three producer properties and one consumer property to users. The current proposal only adds two five user-facing configuration properties.
  • Minimize operational burden on users. It's likely that this feature will be used heavily and may even be enabled by default with a later release of Connect. A wide range of Connect deployment styles and environments should be supported with as little pain as possible, including but not limited to security-conscious and resource-conscious environments. The current proposal's requirements in security-conscious environments are well-defined and easy-to-anticipate, and it does not significantly alter the resource allocation story of Connect.
  • Minimize gotchas and potential footguns. Nobody likes fine print. If we can address a common use case for Connect users or Connect developers, we should; leaving things out for the sake of a simpler design or smaller changes should only be done as a last resort. Even if well-documented, known gaps in use cases (especially those that vary from connector to connector) may not be propagated to or understood by end users, and may be seen as a fault in the quality of this feature.
  • Overall, make this a feature that gives people joy to use, not pain. Jury's out on this one until the vote thread passes!

...

If the transaction.boundary  connector configuration is set to connector , the connector's SourceConnector::canDefineTransactionBoundaries  method will be queried.

If the result is falseConnectorTransactionBoundaries.UNSUPPORTED or null , then an error will be reported with the transaction.boundary  property stating that the connector does not support defining its own transaction boundaries, and the user will be encouraged to configure the connector with a different transaction boundary type.

If the result is trueis ConnectorTransactionBoundaries.SUPPORTED , no error will be reported.

...

Offset reads

When exactly.once.source.enabledsupport is set to trueenabled for a worker, the consumers used by that worker to read source offsets from offsets topics will be configured with a new default property, isolation.level=read_committed. This will cause them to ignore any records that are part of a non-committed transaction, but still consume records that are not part of a transaction at all.

...

Offset (and record) writes

When exactly.once.source.enabled is support is set to trueenabled for a worker, all source tasks created by that worker will use a transactional producer to write to Kafka. All source records that they provide to the worker will be written to Kafka inside a transaction. Once it comes time for that transaction to be committed, offset information for the current batch will be written to the offsets topic, inside the same transaction. Then, the transaction will be committed. This will ensure that source offsets will be committed to Kafka if and only if the source records for that batch were also written to Kafka.

...

Once an offset commit is complete, if the connector is (implicitly or explicitly) configured with a separate offsets topic, the committed offsets will also be written to the worker’s global offsets topic using a non-transactional producer and the worker’s principal. This will be handled on a separate thread from the task’s work and offset commit threads, and should not block or interfere with the task at all. If the worker fails to write these offsets for some reason, it will retry indefinitely, but not fail the task. This will be done in order to facilitate "hard" downgrades and cases where users switch from per-connector offsets topics back to the global offsets topic.

...

Because of this, it cannot be guaranteed that these methods will be invoked after every successful offset commit. However, as long as a task is able to accurately recover based solely off of the offset information provided to the Connect framework with each record it has produced, this should not compromise exactly-once delivery guarantees. These methods can and should still be used to clean up and relinquish resources after record delivery is guaranteed, but using them to track offsets will prevent a connector from being viable for exactly-once support.

Per-connector offsets topics

Whether or not Regardless of the value for the exactly.once.source.enabled is set to true for support property on a worker, source connectors will be allowed to use custom offsets topics, configurable via the offsets.storage.topic property.

Motivation

First, with the advent of KIP-458, it is now possible to bring up a single Connect cluster whose connectors each target a different Kafka cluster (via the producer.override.bootstrap.servers, consumer.override.bootstrap.servers, and/or admin.override.bootstrap.servers connector properties). At the moment, source task offsets are still tracked in a single global offsets topic for the entire Connect cluster; however, if the producer for a task is used to write source offsets, and that producer targets a different Kafka cluster than the one the Connect worker uses for its internal topics, that will no longer be possible. In order to support this case well, per-connector offsets topics must be reasoned about, to allow for smooth upgrades and downgrades where large gaps in offsets data are not created unnecessarily (see “Migration” below). Although users do not necessarily need to be given control over the name of the offsets topic for each connector in this case (the worker could simply create an offsets topic with the same name on the connector’s overridden Kafka cluster, for example), exposing this level of control adds very little complexity to this design once the necessity for per-connector offsets topics is realized. As a benefit, it should make it easier for users to configure connectors in secured environments where the connector principal may have limited access on the Kafka cluster it targets. For users and developers familiar with Kafka Streams, this is the major differentiating factor that makes a global offsets topic impossible.

...

Finally, it allows users to limit the effect that hanging transactions on an offsets topic will have. If tasks A and B use the same offsets topic, and task A initiates a transaction on that offsets topic right before task B starts up, then task A dies suddenly without committing its transaction, task B will have to wait for that transaction to time out before it can read to the end of the offsets topic. If the transaction timeout is set very high for task A (to accommodate bursts of high throughput, for example), this will block task B from processing any data for a long time. Although this scenario may be unavoidable in some cases, using a dedicated offsets topic for each connector should allow cluster administrators to isolate the blast radius of a hanging transaction on an offsets topic. This way, although tasks of the same connector may still interfere with each other, they will at least not interfere with tasks of other connectors. This should be sufficient for most multitenant environments.

Configuration

Only the name of the per-connector offsets topic will be configurable by users. Other properties, such as the number of partitions and the replication factor, will be derived from the worker config using the behavior described in KIP-605.

Hosting Kafka cluster

Regardless of whether the value for the exactly.once.source.enabled  is set to true  for the workersupport property on a worker, if a connector configuration contains a value for the offsets.storage.topic  property, it will use an offsets topic with that name on the Kafka cluster that it produces data to (which may be different from the one that hosts the worker's global offsets topic).

Implicit usage

A per-connector offsets topic might be implicitly configured under certain circumstances. Specifically, this will occur (when exactly-once source support is enabled) for connectors whose configurations do not contain the offsets.storage.topic  property, but do contain an overridden bootstrap.servers  value that causes the connector to target a different Kafka cluster than the one that hosts the worker's global offsets topic.

...

The name of this topic will be the same as the name of the global offsets topic, which is controlled by the offsets.storage.topic  property in the worker config.

Creation

If a connector is explicitly or implicitly configured to use a separate offsets topic but that topic does not exist yet, the worker will automatically try to create the topic before startup for any task or Connector  instances belonging to that connector. This will be done using an admin client constructed from the connector's configuration (using the various admin.override.*  connector properties, admin.* worker properties, and top-level worker properties in descending order of precedence), using the same logic that the worker already uses for its internal topics.

Smooth migration

When a separate offsets topic is created for the first time, it will naturally be empty. In order to avoid losing offset information that may be stored in the global offsets topic, when a connector or task instance requests offsets from the worker, it will be given a combined view of the offset information present in both its separate offsets topic and the worker's global offsets topic. Precedence will be given to offset information present in the separate offsets topic.

...

This endpoint will be secured by the session key mechanism introduced in KIP-507: Securing Internal Connect REST Endpoints and will only be used for inter-worker communication; users should not query it directly. It will be available regardless of the value of whether the exactly.once.source.enabled  is set to true in support  value for the worker.

When a worker receives a request to this endpoint, it will:

  1. Check to see if a rebalance is pending and, if it is, serve an HTTP 409 CONFLICT response. (This step is actually unnecessary and has been removed from the implementation; see
    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyKAFKA-15059
    )
  2. Check to see if it is the leader. If it is not the leader, either forward the request to the leader or fail the request, using at most two hops. This two-hop strategy is already implemented in Connect for other REST endpoints that require forwarding of requests, so it is not described in great detail here.
  3. Verify that the connector in the request URL both exists and is a source connector; if not, fail the request.
  4. Read to the end of the config topic.
  5. If there is a task count record for the connector in the config topic after the latest set of task configs for the connector, serve an empty-bodied 200 response.
  6. If there is an existing task count record for the connector in the config topic, and either the task count in that record is above 1 or the new number of tasks for the connector is above 1*:
    1. Fence out all producers that may still be active for prior task instances of this connector by instantiating an admin client using the connector principal and invoking Admin::fenceProducers (a new API described below; see Admin API to Fence out Transactional Producers). The transactional IDs will be the IDs that each task of the connector would have used, assuming as many tasks were active as the most recent task count record for the connector. This will be done in parallel.
    2. If the leader receives a request to write new task configs for the connector during this period, the round of fencing will be immediately cancelled and an HTTP 409 CONFLICT response will be served.
  7. Write the new task count record for the fenced-out connector to the config topic
  8. Read to the end of the config topic to verify that the task count record has been written successfully.
  9. Serve an empty-bodied 200 response.

...

* - The check for task counts is done to avoid unnecessary fencing work for permanently single-task connectors, such as Debezium's CDC source connectors. If the most recent task count record for a connector shows one task, there is only one task that needs to be fenced out. And, if the new configuration for that connector contains one task, that new task will automatically fence out its single predecessor as they will use the same transactional ID. This same logic does not apply for multi-task connectors, even when the number of tasks is unchanged after a reconfiguration; for details on why, see the rejected alternative Non-generational task fencing.

Preparation for Preparation for rebalance

When exactly.once.source.enabledsupport  is set to trueto enabled  and a new set of task configurations for a connector is detected, then workers will preemptively stop source tasks for that connector. In greater detail:

When a rebalance is triggered, before rejoining the cluster group, workers will preemptively stop all tasks of all source connectors for which task configurations are present in the config topic after the latest task count record. This step is not necessary to preserve exactly-once delivery guarantees, but should provide a reasonable opportunity for tasks to shut down gracefully before being forcibly fenced out.

Stopping these tasks will be done in parallel (as en-masse connector/task stop/start is already done for distributed workers), and any tasks that the worker cannot stop within the graceful task shutdown timeout period (which defaults to five seconds but can be controlled via the task.shutdown.graceful.timeout.ms  worker property) will be abandoned and allowed to continue running. They will be disabled later on if/when the leader fences out their producers during a round of zombie fencing (described below).

Because this work will be done in parallel, the default timeout for task shutdown is fairly low (five seconds), and the current number of threads used to perform en-masse task stops is eight, it is highly unlikely that this will hinder a worker's ability to rejoin a group before the rebalance timeout (which defaults to sixty seconds) times out.

This section was the result of a misunderstanding of the herder's rebalance mechanics, and is actually unnecessary. The worker already performs this preemptive stop for all reconfigured tasks (if using incremental rebalancing) or all tasks (if using eager rebalancing) before (re)joining the group during a rebalance.

Source task startup

When exactly.once.source.enabledsupport  is set to true enabled  for a worker, extra steps will be taken to ensure that tasks are only run when it is safe to do so.

Before a worker is starts a source task, it will first send a request to the leader's internal zombie fencing endpoint for the task's connector. If that request fails for any reason, the task will be marked as FAILED  and startup will be aborted.

...

Leader access to config topic

Regardless of whether When exactly.once.source.enabledsupport  is set to truepreparing  for or enabled  for a worker, if the worker is the leader of the cluster, it will now use a transactional producer to guarantee that at most one worker is capable of writing to the config topic at any time. In greater detail:

...

The Admin interface will be expanded to include new methods for fencing out producers by transactional ID:. This same functionality is already possible by employing one or more transactional producers and invoking their initTransactions  methods, but is replicated on the admin client to make it easier to use for other cases (for more details, see the Fencing-only producers rejected alternative):

Code Block
languagejava
titleAdmin.java
Code Block
languagejava
titleAdmin.java
public interface Admin {

    /**
     * Fence out all active producers that use any of the provided transactional IDs.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @param options  		   The options to use when fencing the producers.
     * @return The FenceProducersResult.
     */
    FenceProducersResult fenceProducers(Collection<String> transactionalIds,
                                        FenceProducersOptions options);

    /**
     * Fence out all active producers that use any of the provided transactional IDs, with the default options.
     * <p>
     * This is a convenience method for {@link #fenceProducers(Collection, FenceProducersOptions)}
     * with default options. See the overload for more details.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @return The FenceProducersResult.
     */
    default FenceProducersResult fenceProducers(Collection<String> transactionalIds) {
        return fenceProducers(transactionalIds, new FenceProducersOptions());
    }

}

...

The ACLs required for this new API will be the same as the ones required to use a transactional producer for each of the specified transactional IDs. Specifically, this amounts to grants for the Write  and Describe  operations on the TransactionalId  resource, and a grant for the IdempotentWrite operation on the Cluster  resource.

Note that the IdempotentWrite  ACL has been deprecated as of 2.8 (see KIP-679) and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters.

Limitations

  • Distributed mode is required; standalone mode will not be supported (yet) for exactly-once source connectors
  • Exactly-once source support can only be enabled for all source connectors or none; it cannot be toggled on a per-connector basis
  • In order to be viable for exactly-once delivery, connectors must assign source partitions to at most one task at a time (otherwise duplicate writes may occur)
  • In order to be viable for exactly-once delivery, connectors must use the Connect source offset API for tracking progress (otherwise duplicate writes or dropped records may occur)

...

If a worker is active and does not have support for exactly-once delivery (either because exactly.once.source.enabledsupport  is not set to false enabled , or the worker is running an older version of Connect for which the feature is not available), the entire cluster’s ability to provide exactly-once guarantees will be compromised. There is no way to fence out non-compliant workers. Even if one is developed, the problem would only be slightly transformed: if a worker is active that cannot be fenced out by other workers in the cluster, we’d be in the exact same place as before.

...

Compatibility, Deprecation, and Migration Plan

Breaking changes

Worker principal permissions

Interoperability with rebalancing protocols

Exactly-once support will be possible with both the eager and incremental cooperative rebalancing protocols. The additional design complexity to support both is minimal, and given that both protocols are still supported for the Connect framework, it’s likely that some users will still be using eager rebalancing. They should not have to upgrade to the newer rebalancing protocol in order to gain access to this feature if the work to support both protocols is not prohibitive.

Worker principal permissions

Before setting exactly.once.source.support  to preparing  or enabled , the producer principal for the worker must be given the Before upgrading a worker to 3.0, its producer principal must be given the following permissions on the Kafka cluster it writes to:

Operation

Resource Type

Resource Name

Write

TransactionalId

connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

Describe

TransactionalId

connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

IdempotentWriteIdempotentWrite *

Cluster

Kafka cluster targeted by the Connect cluster

These new ACLs will be required regardless of whether exactly once source support is enabled on the worker.

Interoperability with rebalancing protocols

cluster targeted by the Connect cluster

* - Note that the IdempotentWrite  ACL has been deprecated as of 2.8 (see KIP-679) and will only be necessary for Connect clusters running on pre-2.8 Kafka clustersExactly-once support will be possible with both the eager and incremental cooperative rebalancing protocols. The additional design complexity to support both is minimal, and given that both protocols are still supported for the Connect framework, it’s likely that some users will still be using eager rebalancing. They should not have to upgrade to the newer rebalancing protocol in order to gain access to this feature if the work to support both protocols is not prohibitive.

Connector principal permissions

...

Operation

Resource Type

Resource Name

Write

TransactionalId

${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

Describe

TransactionalId

${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

IdempotentWrite

Cluster

Kafka cluster targeted by the connector.

Write

Topic

Offsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if notvalue of the offsets.storage.topic property in the worker’s configuration if not.

IdempotentWrite *

Cluster

Kafka cluster targeted by the connector.

* - Note that the IdempotentWrite  ACL has been deprecated as of 2.8 (see KIP-679) and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters.

Consumer

Each source connector’s consumer principal must be given the following permissions on the Kafka cluster it reads offsets from:

...

The first rolling upgrade will be to upgrade every worker to a version of Connect that can provide exactly-once source support, and to set exactly.once.source.support  to preparing .

The second rolling upgrade will be to actually enable exactly-once source support on each worker (by setting exactly.once.source.enabled  to true support  to enabled).

Two upgrades will be required in order to ensure that the internal fencing endpoint is available on every worker before it is required by any of them, and that no non-leader workers are able to write to the config topic during the upgrade.

Downgrade

Two kinds of downgrade are possible. Exactly-once support for a cluster can be disabled by setting exactly.once.source.enabled to false for support  to either disabled  or preparing  for workers in the cluster (a “soft” downgrade), or workers can be reverted to use older versions of the Connect framework that does not support exactly-once sources at all (a “hard” downgrade).

...

The worker-level exactly.once.source.enabledsupport  and connector-level offsets.storage.topic , transaction.boundary , exactly.once.support , and transaction.boundary.interval.ms  configuration properties introduced here should not violate backwards compatibility, as they both all come with default values that preserve existing behavior. Technically, an existing connector may expose a offsets.storage.topic  configuration property that will now conflict with this newly-introduced framework property, but the risk is low enough to be acceptable.

...

Rejected because: there is no one-size-fits-all strategy for transaction boundaries that can be expected to accommodate every reasonable combination of connector and use case. Defining transactions on the batches returned from SourceTask::poll  would heavily limit throughput for connectors that frequently produce small record batches. Defining transactions on an interval would add a latency penalty to the records at the beginning of these transactions and, in the case of very large transactions, would inflate the memory requirements of downstream consumers (which would have to buffer the entire transaction locally within each topic-partition before beginning to process any of the records in it (this is actually incorrect; consumers do not have to buffer transactions locally)). And some connectors may have no reasonable way to define their own transaction boundaries at all.

...