You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 30 Next »

Status

Current state: Under discussion

Discussion threadhere

JIRA: Unable to render Jira issues macro, execution error. Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Note: This KIP was earlier published with the title "Atomic commit of source connector records and offsets", which corresponded to KAFKA-10000. It has now been expanded to also include general EoS support described in KAFKA-6080.

Background and References

KIP-98

KIP-98 added support for exactly-once delivery guarantees with Kafka and its Java clients.

The idempotent producer

One common source of duplicate records from applications that write to Kafka was automatic producer retries, where a producer would re-send a batch to Kafka under certain circumstances even if that batch had already been committed by the broker. KIP-98 allowed users to configure their producer to perform these retries idempotently, such that downstream applications would only see one instance of any message in an automatically-resent producer batch, even if the batch were sent to Kafka several times by the producer.

The transactional producer

Another major component of the user-facing changes from that KIP was the introduction of the transactional producer, which can create transactions that span multiple topic partitions and perform atomic writes to them. Furthermore, the transactional producer is capable of "fencing" out other producers, which involves claiming ownership over a transactional ID and barring any prior producer instances with the same transactional ID from being able to make any subsequent writes to Kafka. This is especially useful in preventing zombie instances of a client application from sending duplicate messages.

Terminology

"Fencing" is used here as a general term to describe disabling an older instance or group of instances of some type of application.

  • "Fencing" out a transactional producer means disabling that producer from being able to perform any further writes to Kafka (one way to do this, for example, is to instantiate a new transactional producer with the same transactional ID)
  • "Fencing" out a generation of source tasks means disabling every source task in that generation from being able to produce any more source records to Kafka, or commit any more source offsets

"Source partition" has two meanings here. It can refer to:

  • A part of the external system that a source connector reads from and is able to assign to a single task. For example, a table in a database, or a topic partition in a Kafka source connector such as MirrorMaker 2
  • The key in a key/value pair that a source task provides to the Connect framework along with a source record (see the SourceRecord constructor, for example) to use for tracking progress in consuming from a source partition

"Source offset" can be used to refer to either the value in the above-mentioned key/value pair for which a "source partition" is the key, or it may refer to the entire key/value pair (in the same way that "offset" can be used to refer to either the combination of a topic partition and some position in it, or just the last coordinate in a topic/partition/offset trio)

Motivation

It's been possible for sink connectors to implement exactly-once delivery for a while now, and in systems with features like unique key constraints, even relatively easy. However, given Kafka's lack of such features that allow for easy remediation and/or prevention of duplicates, the same cannot be said for source connectors, and without framework-level adaptations, exactly-once delivery for source connectors remains impossible.

There are two key reasons for this:

Source offset accuracy: The Connect framework periodically writes source task offsets to an internal Kafka topic at a configurable interval, once the source records that they correspond to have been successfully sent to Kafka. This provides at-least-once delivery guarantees for most source connectors, but allows for the possibility of duplicate writes in cases where source records are written to Kafka but the worker is interrupted before it can write the offsets for those records to Kafka.

Zombies: Some scenarios can cause multiple tasks to run and produce data for the same source partition (such as a database table or a Kafka topic partition) at the same time. This can also cause the framework to produce duplicate records.

Another common source of duplicate records from source connectors is automatic producer retries. However, users can already address this by configuring source connectors to use an idempotent producer by the worker-level producer.enable.idempotence  or connector-level producer.override.enable.idempotence  properties.

In order to support exactly-once delivery guarantees for source connectors, the framework should be expanded to atomically write source records and their source offsets to Kafka, and to prevent zombie tasks from producing data to Kafka.

With these changes, any source connector for which each source partition is assigned to at most one task at a time, and which is capable of resuming consumption from the upstream source on startup based on the source offsets that prior tasks have provided to the framework, should be capable of exactly-once delivery.

Goals

  • Require as few per-connector changes as possible. The Connect ecosystem is fairly mature at this point and even something as lightweight as a new SourceTask  method would have to be applied to potentially dozens if not hundreds of connectors. The current proposal requires no such changes for existing connectors, as long as they use they use the source offsets API correctly.
  • Minimize configuration changes. The implementation of this feature may grow fairly complex, but it should be easy for users to enable and understand. KIP-98, for example, added exactly-once support for Kafka and its Java clients, and only exposed three producer properties and one consumer property to users. The current proposal only adds two user-facing configuration properties.
  • Minimize operational burden on users. It's likely that this feature will be used heavily and may even be enabled by default with a later release of Connect. A wide range of Connect deployment styles and environments should be supported with as little pain as possible, including but not limited to security-conscious and resource-conscious environments. The current proposal's requirements in security-conscious environments are well-defined and easy-to-anticipate, and it does not significantly alter the resource allocation story of Connect.
  • Minimize gotchas and potential footguns. Nobody likes fine print. If we can address a common use case for Connect users or Connect developers, we should; leaving things out for the sake of a simpler design or smaller changes should only be done as a last resort. Even if well-documented, known gaps in use cases (especially those that vary from connector to connector) may not be propagated to or understood by end users, and may be seen as a fault in the quality of this feature.
  • Overall, make this a feature that gives people joy to use, not pain. Jury's out on this one until the vote thread passes!

Public Interfaces

A single worker-level configuration property will be added for distributed mode:

Name

Type

Default

Importance

Doc

exactly.once.source.enabled 

BOOLEAN 

false 

HIGH 

Whether to enable exactly-once support for source connectors in the cluster by writing source records and their offsets in a Kafka transaction, and by proactively fencing out old task generations before bringing up new ones. Note that this must be enabled on every worker in a cluster in order for exactly-once delivery to be guaranteed.

And a single per-connector configuration property will be added:

Name

Type

Default

Importance

Doc

offsets.storage.topic 

STRING 

null 

LOW 

The name of a separate offsets topic to use for this connector. If empty or not specified, the worker’s global offsets topic name will be used.

Proposed Changes

Atomic offset writes

Offset reads

When exactly.once.source.enabled is set to true for a worker, the consumers used by that worker to read source offsets from offsets topics will be configured with a new default property, isolation.level=read_committed. This will cause them to ignore any records that are part of a non-committed transaction, but still consume records that are not part of a transaction at all.

Users will be not able to explicitly override this in the worker configuration with the property consumer.isolation.level  or in the connector configuration with the property consumer.override.isolation.level. If they attempt to do so, the user-provided value will be ignored and a message will be logged notifying them of this fact.

Offset (and record) writes

When exactly.once.source.enabled is set to true for a worker, all source tasks created by that worker will use a transactional producer to write to Kafka. All source records that they provide to the worker will be written to Kafka inside a transaction. Once it comes time for offsets to be committed for a task, offset information for the current batch will be written to the offsets topic, inside the same transaction. Then, the transaction will be committed. This will ensure that source offsets will be committed to Kafka if and only if the source records for that batch were also written to Kafka.

The timing for offset commits will remain the same as it is today; they will be triggered periodically on a fixed interval that users can adjust via the offset.flush.interval.ms property. Transaction boundaries and record batches will be defined by these offset commits; expanded support for transaction boundary definition can be added later and is noted in the "future work" section.

Once an offset commit is complete, if the connector is configured with a custom offsets topic, the committed offsets will also be written to the worker’s global offsets topic using a non-transactional producer and the worker’s principal. This will be handled on a separate thread from the task’s work and offset commit threads, and should not block or interfere with the task at all. If the worker fails to write these offsets for some reason, it will retry indefinitely, but not fail the task. This will be done in order to facilitate "hard" downgrades and cases where users switch from per-connector offsets topics back to the global offsets topic.

Task producers will be given a transactional ID of ${groupId}-${connector}-${taskId}, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). Users will not be able to override this with the worker-level producer.transactional.id  or connector-level producer.override.transactional.id  property. If they attempt to do so, the user-provided value will be ignored and a message will be logged notifying them of this fact.

SourceTask record commit API

The SourceTask::commit and SourceTask::commitRecord methods will be invoked immediately after each successful offset commit. First, commitRecord will be invoked for every record in the committed batch, followed by a call to commit. It is possible that the worker or task may suddenly die in between a successful offset commit and the completion of these calls.

Because of this, it cannot be guaranteed that these methods will be invoked after every successful offset commit. However, as long as a task is able to accurately recover based solely off of the offset information provided to the Connect framework with each record it has produced, this should not compromise exactly-once delivery guarantees. These methods can and should still be used to clean up and relinquish resources after record delivery is guaranteed, but using them to track offsets will prevent a connector from being viable for exactly-once support.

Per-connector offsets topics

Whether or not exactly.once.source.enabled is set to true for a worker, source connectors will be allowed to use custom offsets topics, configurable via the offsets.storage.topic property.

Motivation

First, with the advent of KIP-458, it is now possible to bring up a single Connect cluster whose connectors each target a different Kafka cluster (via the producer.override.bootstrap.servers, consumer.override.bootstrap.servers, and/or admin.override.bootstrap.servers connector properties). At the moment, source task offsets are still tracked in a single global offsets topic for the entire Connect cluster; however, if the producer for a task is used to write source offsets, and that producer targets a different Kafka cluster than the one the Connect worker uses for its internal topics, that will no longer be possible. In order to support this case well, per-connector offsets topics must be reasoned about, to allow for smooth upgrades and downgrades where large gaps in offsets data are not created unnecessarily (see “Migration” below). Although users do not necessarily need to be given control over the name of the offsets topic for each connector in this case (the worker could simply create an offsets topic with the same name on the connector’s overridden Kafka cluster, for example), exposing this level of control adds very little complexity to this design once the necessity for per-connector offsets topics is realized. As a benefit, it should make it easier for users to configure connectors in secured environments where the connector principal may have limited access on the Kafka cluster it targets. For users and developers familiar with Kafka Streams, this is the major differentiating factor that makes a global offsets topic impossible.

Second, it closes a potential security loophole where malicious connectors can corrupt the offsets information available to other connectors on the cluster. This is preventable at the moment by configuring connectors with separate principals from that of the worker, and only granting write access to the offsets topic to the worker principal. However, since the same producer will now be used to both write offset data and write source task records, that approach will no longer work. Instead, allowing connectors to use their own offsets topics should allow administrators to maintain the security of their cluster, especially in multitenant environments.

Finally, it allows users to limit the effect that hanging transactions on an offsets topic will have. If tasks A and B use the same offsets topic, and task A initiates a transaction on that offsets topic right before task B starts up, then task A dies suddenly without committing its transaction, task B will have to wait for that transaction to time out before it can read to the end of the offsets topic. If the transaction timeout is set very high for task A (to accommodate bursts of high throughput, for example), this will block task B from processing any data for a long time. Although this scenario may be unavoidable in some cases, using a dedicated offsets topic for each connector should allow cluster administrators to isolate the blast radius of a hanging transaction on an offsets topic. This way, although tasks of the same connector may still interfere with each other, they will at least not interfere with tasks of other connectors. This should be sufficient for most multitenant environments.

Finally

Migration

If a connector is configured to use a separate offsets topic but no data is present in that topic yet, the worker that is assigned the Connector instance for that connector can automatically migrate any offsets it finds for that connector in the global offsets topic into the connector’s separate offsets topic. The process for this will be:

  1. Worker is assigned Connector
  2. Worker checks connector configuration and observes that it should use a separate offsets topic
  3. Worker attempts to create it using the connector’s principal (no check for topic existence is made; if the topic already exists, the resulting error response from the broker is silently discarded in the same way that it is when a worker tries to create one of its internal topics today)
  4. Worker reads to the end of the custom offsets topic, failing the Connector if the offsets topic doesn’t exist by this point
  5. If no data is present in the custom offsets topic:
    1. Worker refreshes its view of the global offsets topic by reading to the end
    2. Worker begins a producer transaction on the custom offsets topic
    3. Worker copies all offset information for the connector in its latest view of the global offsets topic into the custom offsets topic
    4. If no offset information is available in the global offsets topic for the connector, worker writes a sentinel value into the offsets topic indicating that a migration has taken place
    5. Worker commits the producer transaction
    6. Worker reads to the end of the custom offsets topic (again)
  6. Worker starts the Connector

If a source connector is configured to use a separate offsets topic, the worker will block task startup for as long as necessary (unless interrupted due to task shutdown) until the topic is created. Once the topic exists (after being created by the worker that owns the Connector, if necessary), the worker will read to the end of the offsets topic before starting the task. If there are no offsets for the connector present in the topic and there is no sentinel value in the offsets topic indicating that a migration attempt was made and found no data for the task, the worker will continue to read to the end of the topic until either offset information for the connector or a sentinel value is found. Once this step is completed, the worker will be free to start the task.

An example of a sentinel offsets record for a connector named “reddit-source” would look like:

    key: [“reddit-source”]
value: {“migrated”: true}

Workers are already very permissive about the content of their offsets topics and will not break or even log a warning message if they encounter anything that doesn’t appear to be a source offset. For the sake of cleanliness, we choose a key format that closely mirrors the existing key format for source offsets, which consists of an array containing the name of the connector and the source partition. The value only needs to be non-null so that record of a migration can be kept even after topic compaction has occurred; for readability’s sake (if the topic ever needs to be inspected for debugging, for example), we make it more explicit that this record refers to offset migration.

Zombie Fencing

With exactly.once.source.enabled set to true for a worker, the worker will begin taking steps to ensure that zombie source tasks in the cluster do not compromise exactly-once delivery support.

Task count records

A new type of record, a “task count record”, will be used in the config topic. This record explicitly tracks the number of task producers that will have to be fenced out if a connector is reconfigured before bringing up any tasks with the new set of task configurations, and will implicitly track whether it is necessary to perform a round of fencing before starting tasks for a connector.

An example of this record for a connector named “reddit-source” with 11 tasks would look like:

    key: “tasks-count-reddit-source”
value: {“tasks”: 11}

If the latest task count record for a connector comes after its latest set of task configurations in the config topic, workers will be allowed to create and run tasks for that connector. For example, if the contents of the config topic (represented by the keys of each record) are as follows:

    offset 0: task-reddit-source-0
offset 1: task-reddit-source-1
offset 2: commit-reddit-source
offset 3: tasks-count-reddit-source

then workers will be able to freely bring up tasks for the reddit-source connector.

However, if the contents of the config topic are this:

    offset 0: tasks-count-reddit-source
offset 1: task-reddit-source-0
offset 2: task-reddit-source-1
offset 3: commit-reddit-source

then workers will not bring up tasks for the connector.

Fencing during rebalance

All workers

When a rebalance is triggered, before rejoining the cluster group, all workers will preemptively stop all tasks of all source connectors for which task configurations are present in the config topic after the latest task count record. This step is not necessary to preserve exactly-once delivery guarantees, but should provide a reasonable opportunity for tasks to shut down gracefully before being forcibly fenced out.

Stopping these tasks will be done in parallel (as en-masse connector/task stop/start is already done for distributed workers) using at most eight threads, and any tasks that the worker cannot stop within the graceful task shutdown timeout period (which defaults to five seconds but can be controlled via the task.shutdown.graceful.timeout.ms  worker property) will be abandoned and allowed to continue running. They will be disabled later on when the leader fences out their producers during the rebalance (described below).

Because this work will be done in parallel using up to eight threads and the default timeout for task shutdown is fairly low (five seconds), it is highly unlikely that this will hinder a worker's ability to rejoin a group before the rebalance timeout (which defaults to sixty seconds) times out.

Leader

During rebalance, the leader will take on a few extra responsibilities. Once all group members have joined and it is time to perform assignment the leader will:

  1. For each to-be-assigned source connector, if there is a set of task configurations present in the config topic after the most recent task count record for that connector:
    1. If there is an existing task count record for the connector in the config topic, and either the task count in that record is above 1 or the new number of tasks for the connector is above 1*:
      1. Fence out all producers that may still be active for prior task instances of this connector by instantiating an admin client using the connector principal and invoking Admin::fenceProducers (a new API described below; see Admin API to Fence out Transactional Producers). The transactional IDs will be the IDs that each task of the connector would have used, assuming as many tasks were active as the most recent task count record for the connector.** 
    2. Write the new task count record for all of the fenced-out connectors to the config topic
  2. Read to the end of the config topic to verify that all task count records have been written successfully, and use the new latest offset of the config topic as the offset in the assignment given out to members of the cluster.

* - The check for task counts is done to avoid unnecessary fencing work for permanently single-task connectors, such as Debezium's CDC source connectors. If the most recent task count record for a connector shows one task, there is only one task that needs to be fenced out. And, if the new configuration for that connector contains one task, that new task will automatically fence out its single predecessor as they will use the same transactional ID. 

**  - This step will be done in parallel for all connectors

Source task startup

When a worker is assigned a source task, it will check to see if a task count record is present in the config topic after the most recent task configs for that connector.

If that is the case, the worker can assume that all producers for all prior generations of tasks for that connector have been fenced out, and it will bring up the source tasks it has been assigned. 

Otherwise, the worker will emit a warning log message and refuse to bring the task up until a rebalance occurs and it has a chance to check again for a task count record after the most recent set of task configurations for the connector.

Once a worker has instantiated a producer for a source task, it will read to the end of the config topic once more, and if a new set of task configurations for that connector has been generated, it will abort startup of the task.

To summarize, the startup process for a task will be:

  1. Worker is assigned a source task during rebalance
  2. If there is no task count record present in the config topic for that connector, or there is a set of task configurations present after the latest task count record, refuse to bring up the task and await a subsequent rebalance
  3. Otherwise:
    1. Instantiate a transactional producer for the task
    2. Read to the end of the config topic
    3. If a new set of task configurations has since been generated for the connector, abort startup
    4. Otherwise, begin polling the task for data

Leader access to config topic

After a rebalance, a worker may discover that it has become the leader of the cluster. When this happens, the worker will instantiate a transactional producer whose transactional ID is connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster. If users try to override this by setting the transactional.id worker property, the user-provided value will be ignored and a message will be logged notifying them of this fact. The worker will use this producer for all writes it performs on the config topic. It will begin and commit a transaction for every record it writes. This may seem unusual--why use a transactional producer if transactions aren’t really necessary?--but it ensures that only the most recent leader is capable of producing to the config topic and that zombie leaders (rare though they may be) will not be able to.

If, for example, a leader stalls while attempting to fence out a prior generation of task producers for a given connector during a rebalance, it may fall out of the group and become a zombie. If another worker is elected leader at this point and the connector is then reconfigured, it’s possible that the zombie leader may become unblocked and then attempt to write a task count record to the config topic after a new set of task configurations are written to the topic. This would corrupt the state of the config topic and violate a key assumption: a task count record present after task configurations for a connector means that it is not necessary to fence out a prior generation of producers for that connector’s tasks before starting them.

By using a transactional producer on the leader, we can guarantee that a leader will only be able to write a task count record to the config topic if no other workers have become the leader and written new task configurations for the connector to the topic since then.

Admin API to Fence out Transactional Producers

Java API additions

The Admin interface will be expanded to include new methods for fencing out producers by transactional ID:

Admin.java
public interface Admin {

    /**
     * Fence out all active producers that use any of the provided transactional IDs.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @param options  		   The options to use when fencing the producers.
     * @return The FenceProducersResult.
     */
    FenceProducersResult fenceProducers(Collection<String> transactionalIds,
                                        FenceProducersOptions options);

    /**
     * Fence out all active producers that use any of the provided transactional IDs, with the default options.
     * <p>
     * This is a convenience method for {@link #fenceProducers(Collection, FenceProducersOptions)}
     * with default options. See the overload for more details.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @return The FenceProducersResult.
     */
    default FenceProducersResult fenceProducers(Collection<String> transactionalIds) {
        return fenceProducers(groupIds, new FenceProducersOptions());
    }

}

New classes for the API will be added as well.

The FenceProducersResult  class:

FenceProducersResult.java
package org.apache.kafka.clients.admin;

/**
 * The result of the {@link Admin#fenceProducers(Collection)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersResult {

    /**
     * Return a map from transactional ID to futures which can be used to check the status of
     * individual fencings.
     */
    public Map<String, KafkaFuture<Void>> fencedProducers() {
		// Implementation goes here
    }

	/**
	 * Returns a future that provides the producer ID generated while initializing the given transaction when the request completes.
	 */
	public KafkaFuture<Long> producerId(String transactionalId) {
		// Implementation goes here
	}

	/**
 	 * Returns a future that provides the epoch ID generated while initializing the given transaction when the request completes.
	 */
	public KafkaFuture<Short> epochId(String transactionalId) {
		// Implementation goes here
	}

    /**
     * Return a future which succeeds only if all the producer fencings succeed.
     */
    public KafkaFuture<Void> all() {
		// Implementation goes here
    }

}

And the FenceProducersOptions  class:

FenceProducersOptions.java
package org.apache.kafka.clients.admin;

/**
 * Options for {@link AdminClient#fenceProducers(Collection, FenceProducersOptions)}
 *
 * The API of this class is evolving. See {@link AdminClient} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersOptions extends AbstractOptions<FenceProducersOptions> {
    // No options (yet) besides those inherited from AbstractOptions
}

Implementation

Under the hood, this will be implemented by finding the transaction coordinator for each transactional ID via the FindCoordinator protocol, then initializing a new transaction with that ID by invoking the InitProducerId protocol with no producer ID and no producer epoch. In the words of KIP-98, this:

  1. Bumps up the epoch of the PID, so that the any previous zombie instance of the producer is fenced off and cannot move forward with its transaction.
  2. Recovers (rolls forward or rolls back) any transaction left incomplete by the previous instance of the producer.

Addressed failure/degradation scenarios

These scenarios will not compromise a cluster’s exactly-once delivery guarantees.

Leader cannot write task count record to config topic during rebalance

The leader will try indefinitely to write the task count to the config topic during rebalance and verify that it has been written by reading it back from the topic after writing.

If this takes too long, the rebalance timeout will elapse and a new leader will be elected. If the (now-dethroned) leader becomes unblocked during this process and attempts to write to the config topic, it will have been fenced out by its replacement. If it has been blocked on reading from the config topic, it will wake up and attempt to complete the rebalance, only to be informed by the group coordinator that it is no longer a member of the group. It will then attempt to rejoin the group.

If this does not block but instead fails for some reason, the leader will immediately leave the group and then attempt to rejoin.

Cannot fence out producers during rebalance

The leader may be unable to fence out producers during a rebalance if, for example, it is unable to reach or locate a transaction coordinator.

Automatically failing tasks for that connector may seem appealing at first, but the user’s first instincts in this situation are likely to be to restart those tasks, which could cause them to come up without zombies from a prior generation being fenced out first.

Marking the entire rebalance as failed and forcing a follow-up round would be simpler to design and less likely to lead to compromised delivery guarantees, but it would also impact any other connectors and tasks that were due to be reassigned in that round.

To compromise, the leader will limit the time it blocks for producer fencing to complete to be at most half of the configured session timeout for the cluster. If that time elapses and the leader has not yet fenced out the producers for some tasks, it will continue with the rebalance and happily assign all connectors and tasks as normal. No worker will start up new task instances for not-yet-fenced connectors as there will be no task count record present in the config topic after the most recent set of task configurations. Once the leader is able to successfully complete fencing out all producers for a given connector, it will write the task count record to the config topic, read it back, and force a rebalance by rejoining the group. At this point, all workers will be able to (re-)start tasks for the newly-fenced-out connector.

Manual restart of fenced-out tasks

If a task fails because its producer has been fenced out, the user may try to manually restart that task through the REST API. If the worker’s view of the config topic is up-to-date, this will simply result in the restarted task replacing any existing instances of the same task, and exactly-once delivery guarantees will be preserved. However, if the worker has an outdated view of the config topic, restarting that task may compromise exactly-once guarantees if that task ends up producing data from a source partition that has already been assigned to a different task in the latest set of task configurations for the connector.

In order to prevent manual user restarts from compromising exactly-once delivery guarantees, Connect workers will read to the end of the config topic before proceeding with any source task restarts. If the worker is unable to read to the end of the config topic, the restart request will be met with a 500 timeout response. If the worker discovers during its read to the end of the topic that the task no longer exists (because the connector has been deleted or its task count reduced), it will return the usual 404 not found response. If a rebalance becomes expected because of new information read from the config topic, a 409 conflict response will be given.

Worker lag after being assigned source task but before instantiating producer

If a worker is assigned a source task and sees a task count record in the config topic after the latest set of task configurations for that connector, it comes with the assumption that all producers for all prior task generations of that connector have been fenced out. And, at the time of the fencing performed by the leader, this assumption likely holds. However, it's possible that a worker may be assigned a source task, observe a task count record in the config topic that indicates that it is safe to start the task, then block for some large period of time before it is able to construct a producer for that task. For example, consider a scenario with workers F (follower), O (other follower), and L (leader), operating on task T of source connector C:

  1. Connector C is reconfigured and new task configurations for it are written to the config topic
  2. A rebalance is triggered and worker L is able to fence out all possibly-active producers for connector C, write a task count record for C to the config topic, and then assign task T to worker F
  3. Worker F receives its assignment and is able to read the most-recent task count record for connector C from the config topic
  4. Worker F blocks, before it is able to instantiate a transactional producer for task T
  5. Connector C is reconfigured and new task configurations for it are written to the config topic
  6. A rebalance is triggered and worker L is able to fence out all possibly-active producers for connector C, write a task count record for C to the config topic, and then assign task T to worker O as F has fallen out of the group
  7. Worker O receives its assignment and is able to read the most-recent task count record for connector C from the config topic, instantiate a transactional producer for task T, and then begin processing data from that task
  8. Worker F becomes unblocked, instantiates a transactional producer for task T, and then begins processing data for that task

In this scenario, if a source partition initially assigned to task T is reassigned to a different task during the reconfiguration in step 5, the task instance created by worker F in step 8 will begin producing duplicate data and exactly-once delivery guarantees will be compromised.

This is possible because it is not guaranteed at the time of the fencing performed by the leader that all producers that could be active for a connector have been created yet. If a producer has not yet been created, it cannot be fenced out; if it is created after the fencing occurs, a different approach must be taken to ensure that no duplicate data is produced.

This case should be covered by the additional read to the end of the config topic by the worker after it has brought up a transactional producer for a source task. If the connector is reconfigured after the worker became blocked and a round of producer fencing has occurred, then the worker will bring up a transactional producer for its source task, but then discover the connector reconfiguration and abort startup. If the worker has brought up a transactional producer and then become blocked before completing its read to the end of the config topic, the round of producer fencing by the leader should fence out that producer and any subsequent restart attempts will block until/unless the worker is able to complete a read to the end of the config topic (and handle any rebalances necessitated by new information read from the topic in the process).

Accidental task commit of dropped messages

If a transactional producer is successfully instantiated, it will not attempt to contact the broker again until an attempt is made to actually send a record. This can lead to interesting behavior: the producer is brought up and able to make initial contact with the transaction coordinator, then the entire Kafka cluster dies, then the producer begins and commits a transaction without attempting to send any records, and no exception or other indication of failure is generated by the producer at any point in this sequence of events.

When might this occur? If a task is configured with an aggressive SMT that drops all the records in a given batch, its producer will never attempt to send any records before committing the current transaction. And, once this occurs, the worker will invoke SourceTask::commit, which may cause the task to drop the data from the upstream system (for example, by acknowledging a batch of messages from a JMS broker). Even if this occurs, it should not be a problem: it’s fine to drop messages from the upstream system that are meant to be dropped by the connector, regardless of whether source offsets for those records have been committed to Kafka, as the end result is the same either way.

Permitted failure scenarios

These scenarios can compromise a cluster’s exactly-once delivery guarantees.

Heterogeneous clusters

If a worker is active and does not have support for exactly-once delivery, the entire cluster’s ability to provide exactly-once guarantees will be compromised. There is no way to fence out non-compliant workers. Even if one is developed, the problem would only be slightly transformed: if a worker is active that cannot be fenced out by other workers in the cluster, we’d be in the exact same place as before.

This failure scenario will need to be called out in public documentation so that users understand that they are responsible for the operational burden of ensuring that no non-compliant workers are active.

Non-worker producer to config topic

If anything besides a worker ends up writing to the config topic, all assumptions that the cluster relies upon to determine whether it is safe to start tasks and whether a fencing is necessary during rebalance can be invalidated. This should never happen in a reasonably-secured Connect cluster and if it does, compromised delivery guarantees may pale in comparison to the other consequences of corrupting the config topic.

Compatibility, Deprecation, and Migration Plan

Interoperability with rebalancing protocols

Exactly-once support will be possible with both the eager and incremental cooperative rebalancing protocols. The additional design complexity to support both is minimal, and given that both protocols are still supported for the Connect framework, it’s likely that some users will still be using eager rebalancing. They should not have to upgrade to the newer rebalancing protocol in order to gain access to this feature if the work to support both protocols is not prohibitive.

Worker principal permissions

Producer

Before enabling exactly-once source support on a worker, its producer principal must be given the following permissions on the Kafka cluster it writes to:

Operation

Resource Type

Resource Name

Write

TransactionalId

connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

IdempotentWrite

Cluster

Kafka cluster targeted by the Connect cluster

Connector principal permissions

Producer

Each source connector’s producer principal must be given the following permissions on the Kafka cluster it writes to:

Operation

Resource Type

Resource Name

Write

TransactionalId

${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

Write

TransactionalId

${groupId}-${connector}. Only necessary if the connector uses a separate offsets topic.

IdempotentWrite

Cluster

Kafka cluster targeted by the Connect cluster

Write

Topic

Offsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.

Consumer

Each source connector’s consumer principal must be given the following permissions on the Kafka cluster it reads offsets from:

Operation

Resource Type

Resource Name

Read

Topic

Offsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.

Admin

If the offsets topic for a connector does not exist yet, the connector’s admin principal must be given the following permissions on the Kafka cluster it reads offsets from:

Operation

Resource Type

Resource Name

WriteTransactionalId${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

Create

Topic

Offsets topic used by the connector, which is the value of the offsets.storage.topic property in the connector’s configuration. If not provided, this ACL is not needed, as the worker’s shared offsets topic should be created automatically before any connectors are started.

Additionally, the connector's admin principal must be given the following permissions on the Kafka cluster it reads offsets from no matter what:

OperationResource TypeResource Name
DescribeTopicOffsets topic used by the connector, which is the value of the offsets.storage.topic property in the connector’s configuration. If not provided, this ACL is not needed, as the worker’s shared offsets topic should be created automatically before any connectors are started.

This is necessary in order to ensure that the Connect worker has read up to the end of the offsets topic, even when there are open transactions on the topic at the start of the read to the end. This will be accomplished by listing the end offsets for the offsets topic using an admin client with the read_uncommitted  isolation level, and then consuming from that topic with the read_committed  isolation level until at least those offsets listed by the admin client.

Rolling upgrade

Rolling upgrades that enable exactly-once support on a cluster will be possible. Users can stop each worker, upgrade to a later version if necessary, set exactly.once.source.enabled to true in the config, then restart, one by one.

This will have the effect that, until the leader is upgraded, no upgraded workers will start source tasks. This is necessary in order to guarantee that a task count record has been written to the config topic before starting a task with a transactional producer.

Additionally, no tasks for connectors configured to use separate offsets topics will start until that topic exists and the tasks can confirm that a migration has taken place (either implicitly, via the presence of offsets for that connector, or explicitly, via a sentinel migration record for the connector).

Downgrade

Two kinds of downgrade are possible. Exactly-once support for a cluster can be disabled by setting exactly.once.source.enabled to false for workers in the cluster (a “soft” downgrade), or workers can be reverted to use older versions of the Connect framework that does not support exactly-once sources at all (a “hard” downgrade).

Soft downgrade

Soft downgrades should be possible via a cluster roll, where each worker is shut down, its configuration is altered to disable exactly-once source support, and then it is restarted. During the process, some workers that are still configured with exactly-once source support enabled may refuse to start tasks for connectors if a task count record is not found for those connectors after the most recent set of task configurations. This isn’t especially likely given that the most common cause of this would be connector reconfiguration during downgrade, but even if it occurs, it will only affect workers that have yet to be downgraded. There should be no long-lasting impact on the cluster once all workers have been downgraded.

Hard downgrade

Hard downgrades can be performed in the same way as soft downgrades. Because source task offsets on upgraded workers are still written to the worker’s global offsets topic, even if a downgraded worker does not support per-connector offsets topics, it will still pick up on relatively-recent source offsets for its connectors. Some of these offsets may be out-of-date or not match the ones in the connector’s separate offsets topic, but the only consequence of this would be duplicate writes by the connector, which will be possible on any cluster without exactly-once support enabled. Without any writes to the global offsets topic, all records processed by a connector since a switch to a dedicated offsets topic would be re-processed after the downgrade and would likely result in a flood of duplicates. While technically permissible given that the user in this case will have knowingly switched to a version of the Connect framework that doesn't support exactly-once source connectors (and is therefore susceptible to duplicate delivery of records), the user experience in this case could be quite bad, so a little extra effort on the part of Connect to significantly reduce the fallout of downgrades in this case is warranted.

If a re-upgrade is desirable at this point, any separate per-connector offsets topics may need to be deleted beforehand. Otherwise, on startup, the worker will read from the existing separate offsets topic, see that there is offset data for the connector present in it, and not perform a migration, causing all data that was produced by the connector after the downgrade to be duplicated.

Additional configuration properties

The two user-facing configuration properties introduced here should not violate backwards compatibility, as they both come with default values that preserve existing behavior.

New Admin API

The new Admin  methods for fencing producers will be implemented using existing protocols (specifically, FindCoordinator and InitProducerId), so no changes to the Kafka binary protocol will be required. As a result, these new methods should work on any broker that supports those protocols.

Rejected Alternatives

Non-generational task fencing

Summary: instead of proactively fencing out task producers during rebalance, allow each task to bring up its own producer and let it fence out older instances of itself in the process.

Rejected because:

  • This doesn't work for cases where a source partition is reassigned across tasks. For example, if task T1 of a connector is responsible for reading table A, and then the connector reassigns A to task T2, task T1 needs to be shut down completely before task T2 starts up. This can be referred informally to as the "source partition reshuffling problem".
  • This doesn't work for cases where the number of tasks in a connector is reduced. For example, if the number of tasks is brought down from 8 to 5, then tasks 6, 7, and 8 will never be fenced out by successors since none will be brought up.

Connector-defined transaction boundaries

Summary: allow connectors to dictate when a transaction should be started, committed, rolled back, etc.

Rejected because: out of scope; can be pursued as an additional feature later on.

Per-connector exactly-once property

Summary: either instead of or in addition to allowing exactly-once support to be configured on a worker-wide basis, allow it to be enabled on a per-connector basis.

Rejected because: even if well-behaved workers close the producers for tasks that lag on shutdown, zombie workers may still run tasks that the cluster has since reassigned to other workers. In this case, while the worker remains a zombie, manual intervention is required from the cluster administrator to shut down the tasks hosted on the zombie worker, either by shutting down the worker (possible today) or by explicitly revoking tasks from the worker (not possible today, but another option that would work). Requiring timely manual intervention on the part of the user in order to preserve delivery guarantees should be avoided if possible.

If a non-exactly-once source task were to become a zombie because it, its converter, one of its transforms, etc. became blocked, or because its worker became a zombie, then its connector were reconfigured to enable exactly-once support, the zombie task might eventually cause duplicate writes to occur. To make this scenario less likely, exactly-once support is provided in an all-or-nothing fashion that makes it impossible for non-transactional producers to stay lying around in a cluster after all of the workers have been upgraded and configured to provide exactly-once source support.

Deduplication of connector-provided records

Summary: track messages produced by source connectors and perform deduplication automatically.

Rejected because: this requires every message to be uniquely and deterministically identifiable, which is not possible in all cases (although this is likely possible in all cases where connectors properly utilize the offsets API provided by the Connect framework). When it is possible, deduplication can already be implemented by a downstream application such as Kafka Streams. Finally, it’s unclear how all prior IDs for records produced by a connector might be tracked and used efficiently by a Connect cluster without incurring a noticeable performance penalty.

Per-source partition producers

Summary: instead of using one producer for each task, use a producer for each source partition of the task.

Rejected because:

  • Although this would provide exactly-once support for most of the same connectors that the proposed design would (specifically, connectors that allocate each source partition to at most one task at a time), there are some cases that this would miss. This is because the assumption that the connector-specified source partitions that it gives to the Connect framework correspond to the actual source partitions that it consumes from (such as database tables) doesn't always hold. If, for example, a connector stores a version field in its source partitions, a change in that version field (either across connector versions or even during runtime for the connector) would allow zombie task instances for that connector to continue running and processing data, compromising exactly-once delivery.
  • Another issue with this approach is that the framework would only be able to instantiate a transactional producer for a source partition after a task communicates its intent to use that partition. There is no explicit mechanism for this communication right now. One implicit mechanism might be to create a transactional producer for a source partition the first time that a task produces data for that partition, but that wouldn't fence out zombie task instances early enough as they might be able to produce data in between when newer task instances read offset data (and choose where to begin consuming from the upstream system) and when they produce the first record for that data, which would then lead to duplicate record delivery. Another implicit mechanism might be to create a transactional producer for a source partition the first time that a task tries to read the offset for that source partition, but this would only provide exactly-once delivery guarantees for tasks that query the framework for source offsets of every source partition they write to before they perform that write. One common use case where this may not happen is if a task discovers a new object in the upstream system that it knows (by creation timestamp, for example) was created after the task was started; in this case, it's possible and even likely that a connector developer wouldn't see a need to check for source offsets for that object since it was just created.
  • The number of producers created for a connector could grow quite large, at which point, allocating enough buffer memory, broker connections, and other resources for each of these may have adverse impacts on the performance of the worker. This would also go against the pattern established by the Connect framework that, to scale a connector, its task count should be increased, as it would make the number of producers set up for a connector the same no matter if it’s running a single task or hundreds. This would be especially problematic if, for example, MirrorMaker 2 were set up to consume from 1000 slowly-moving topics with a single of task; if the average partition count per topic were 5, that'd lead to 5000 producers being created. This approach was initially used by Kafka Streams, but due to the scalability issues it presented, the follow-up KIP-447 has since been introduced to eliminate excessive construction of producers.
  • It's unclear exactly how a transactional ID could be derived from a source partition, since source partitions consist of structured data (specifically, a Java Map<String, ?> ). We could try to serialize these as JSON and then use the resulting string as the transactional ID (likely in conjunction with the name of the connector in order to avoid collisions with other connectors), but that would lead to some extremely ugly transactional IDs. A hash of the JSON-serialized source partition could be taken, but that would make it even harder to predict, and might even lead to collisions which, though unlikely, would be preferable to avoid.
  • The operational burden on users in security-conscious environments would be significantly heavier, as the transactional IDs required to run a connector would be difficult to predict and could dynamically expand at runtime depending on the implementation of the connectors being run. At best, per-connector documentation would be required to expose a mostly-internal detail so that users could understand exactly which transactional IDs their connector's producer would need.
  • Because no guarantee is made that prior generations of tasks are fenced out, it becomes possible for a task running on a worker with an outdated configuration for the connector to fence out a task running with a correct configuration. This would not compromise delivery guarantees, but may lead to some serious user ire if, for example, the connector were reconfigured with a different transformation chain, converter, or another property that altered the format of data produced by it.

Throwaway fencing-only producers

Summary: instead of expanding the AdminClient  API to add producer-fencing functionality, just instantiate a throwaway producer for the sole purpose of calling Producer::initTransactions .

Rejected because: this is a bit of an abuse of the producer API and using a dedicated admin API is cleaner. It requires no broker-side changes and will be fairly lightweight to implement. There also may be other legitimate use cases for fencing out producers without wanting to instantiate one in other client applications.

Cleanup of task count records on connector deletion

Summary: if a connector is deleted, perform a final round of producer fencing for its tasks before writing a tombstone task count record for the connector to the config topic.

Rejected because: in the worst case, the connector is never recreated and a worker (presumably the leader) will have expended unnecessary effort on fencing out producers that aren’t going to compromise delivery guarantees and won’t have any bigger consequences than such tasks would already have on a Connect cluster today. In the best case, the connector is recreated immediately, in which case, a fencing would be performed anyways if no task count tombstone record were written to the config topic. Ultimately, no work is saved by doing this fencing on connector deletion, and some work may be unnecessary.

Future Work

Finer control over offset commits

The framework periodically commits source offsets at a fixed interval. Since source offset commits are now tied to the commits of the transactions in which source records are sent, Connect users and connector developers may want to trigger offset commits differently. Options for this include allowing source tasks to define their own transaction boundaries, allowing per-connector source offset intervals, or allowing offsets to be committed after a certain number of records.

Standalone mode support

Since the design for atomic writes of source records and their offsets relies on source offsets being stored in a Kafka topic, standalone mode is not eligible. If there is sufficient demand, we may add this capability to standalone mode in the future.

More conservative task fencing

It may not be necessary to fence out all tasks from a previous generation if the division of source partitions among those tasks doesn't change. However, it's impossible to know the actual assignment of source partitions to tasks since there's no API for connectors to be able to provide this information to the Connect framework. An extension to the Connect framework to add this API could be made, but is left for future work. One important question to consider with such an API would be whether it would take into account configuration changes that are invisible to the connector itself, such as converter or transformation chain changes.

  • No labels