Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: MINOR: refactoring

Table of Contents

Status

Current state: Under discussion

...

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10000
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6080

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Note: This KIP was earlier published with the title "Atomic commit of source connector records and offsets", which corresponded to KAFKA-10000. It has now been expanded to also include general EoS support described in KAFKA-6080.

Motivation

It's been possible for sink connectors to implement exactly-once delivery for a while now, but the same cannot be said for source connectors. The There are two key reason reasons for this:

Source offset accuracy: The Connect framework periodically writes source task offsets is how source offsets are handled by the framework: they are periodically written to an internal Kafka topic at a configurable interval, once the source record that they correspond to has been successfully sent to Kafka and ack'd by the producer. This provides at-least-once delivery guarantees for most source connectors, but allows for the possibility of duplicate writes in cases where source records are successfully written to Kafka but the worker is interrupted before it can write the offsets for those records to Kafka.

Zombies: Some scenarios can cause multiple tasks to run and produce data for the same source partition (such as a database table or a Kafka topic partition) at the same time. This can also cause the framework to produce duplicate records.

In order to support In order to strengthen the delivery guarantees for source connectors, it should be possible to atomically write source records and their source offsets to Kafka. This should provide exactly-once delivery guarantees for any source connector that can deterministically resume consumption from its source based on the source offsets it provides to the Connect framework.

Public Interfaces

source connectors, the framework should be expanded to atomically write source records and their source offsets to Kafka, and to prevent zombie tasks from producing data to Kafka.

With these changes, any source connector for which each source partition is assigned to at most one task at a time, and which is capable of resuming consumption from the upstream source on startup based on the source offsets that prior tasks have provided to the framework, should be capable of exactly-once delivery.

Public Interfaces

A single worker-level configuration property will be added for distributed mode:

Name

Type

Default

Importance

Doc

atomic

exactly.once.source.

offset.commit

enabled 

BOOLEAN 

false 

LOW

HIGH 

Whether to

commit offsets for source tasks in a producer transaction with the records produced by that task. For some source connectors, this is all that is required

enable exactly-once support for source connectors in the cluster by writing source records and their offsets in a Kafka transaction, and by proactively fencing out old task generations before bringing up new ones. Note that this must be enabled on every worker in a cluster in order for exactly-once delivery

.

This property will apply by default to all source connectors created on this worker, but it can be overridden on a per-connector basis by including it in the configuration for a connector.

to be guaranteed.

And And a single per-connector configuration property will be added:

Name

Type

Default

Importance

Doc

atomic

offsets.

source.offset.commitBOOLEANfalseLOW

Whether to commit offsets for this connector's source tasks in a producer transaction with the records produced by those tasks. For some source connectors, this is all that is required for exactly-once delivery.

storage.topic 

STRING 

null 

LOW 

The name of a separate offsets topic to use for this connector. If empty or not specified, the worker’s global offsets topic name will be used

Will only apply in distributed mode; if used in a connector configuration on a standalone worker, will be ignored

.

Proposed Changes

New source offset read behavior

Atomic offset writes

Offset reads

When exactly.once.source.enabled is set to true for a worker, the consumers used by that worker to The consumers used by the framework to read source offsets from the internal offsets topic topics will be configured with a new default property, isolation.level=read_committed. This will cause them to ignore any records that are part of a non-committed transaction, but still consume records that are not part of a transaction at all. This property will be used for every consumer instantiated by the framework for reading source offsets regardless of whether the connector has atomic source offset commits enabled, since the behavior for connectors with atomic source offsets disabled will not be affected. There are also no additional ACL requirements for consumers configured this way.

Users will be allowed to explicitly override this in the worker configuration with the property property consumer.isolation.level  or in the connector configuration with the property property consumer.override.isolation.level . This aligns with the current behavior of the framework with regards to overriding default Kafka client properties: even if it's usually unnecessary and possibly a bad idea, we give users that flexibility just in case they know what they're doing.

With atomic source offset commit enabled

Source task producers

Each source task will use its own producer to write offsets to Kafka, as opposed to the single shared producer that the framework currently uses to perform offset writes for all tasks on the worker. This is required since multiple producers cannot share a transaction ID.

The producer for each source task will be configured with a few new default properties that are required for producer transactions: enable_idempotence=true and transactional_id=<id>. The <id> for the producer will be the group ID of the cluster, the name of the connector, and the task ID, all delimited by hyphen; for example, connect-cluster-reddit-source-12 for task number 12 of a connector named reddit-source running on a Connect cluster with a group ID of connect-cluster. The cluster group ID is used in order to allow different Connect clusters running against the same underlying Kafka cluster to use connectors with the same name.

In addition to those properties, in order to respect the offset.flush.timeout.ms worker property (which dictates how long the framework will wait for an offset commit to complete), the max.block.ms property will be changed from Integer.MAX_VALUE  to the value specified for offset.flush.timeout.ms, which will establish an upper bound for the amount of time that committing a producer transaction can take.

Source task lifetime

  1. After a source task is initialized, the framework will call producer.initTransactions(). This will fence out any zombie instances of the same task and prevent those instances from being able to write to Kafka. Any fenced-out tasks will log an error letting the user know that it has been pre-empted by a new instance of the same task.
  2. Before polling for new records from the task, the framework will first block until any in-progress offset commits are completed. This is done in order to obey the contract for task.commit(), which is that the task should commit offsets "up to the offsets that have been returned by poll()". Polling for more records from the task might cause it to commit for records that have not actually been committed or even sent to Kafka yet.
  3. After any offset commits are completed and the framework receives records from the source task, it will:
    1. temporarily store the source offsets for those records,
    2. call producer.beginTransaction() if there is not a transaction that is already in progress, and
    3. pass those records to producer.send(...).
  4. When it is time to commit offsets for the source task, if there are no source offsets to be written, the framework will do nothing. Otherwise, it will:
    1. write those source offsets to Kafka (using the source task's producer), and then
    2. call producer.commitTransaction().
  5. After, if there were offsets to commit, the framework will:
    1. invoke task.commitRecord(...) for each record that was sent and committed to Kafka, then finally,
    2. invoke task.commit().

If an attempt to commit a transaction fails, the framework will immediately call producer.abortTransaction() and fail the task. This differs from the framework's current behavior on a failed source offset commit, which is to simply log the error and move on. However, this does align with the framework's current behavior on a failed source record send, which is to fail the task as quickly as possible. Since a failure to commit source offsets also means a failure to commit the producer transaction containing source records for those offsets, failing the task is preferable.

This pseudocode snippet illustrates the proposed lifecycle for a source task:

Code Block
titleSource task, with atomic offset commit enabled
private final Map<Map<String, ?>, Map<String, ?>> sourceOffsets = new HashMap<>();
private final Map<SourceRecord, RecordMetadata> recordsToCommit = new ConcurrentHashMap<>();
private volatile boolean transactionInProgress = false;

// The "work" loop whose major responsibility is to poll the source task for records and gives them to the producer
public void pollProduceLoop() {

	// (1) Initialize the producer transaction, fencing out the producers of any previous abandoned task instance
	producer.initTransactions();

	while (!stopping) {
		List<SourceRecord> records;

		// (2) Synchronize to ensure we don't poll for records while committing offsets
		synchronized (this) {

			// Double-check here in case we failed to commit offsets before entering this synchronized block, which would cause the
			// "stopping" flag to be set to true
			if (stopping)
				break;

			// Poll the task for records
			records = task.poll();
			if (records == null || records.isEmpty())
				continue;

			// (3a) Store the source offsets for each record so that they can be committed later
			records.forEach(record -> sourceOffsets.put(record.sourcePartition(), record.sourceOffset()));

			// (3b) Start a new transaction if one hasn't yet been started
			if (!transactionInProgress) {
				try {
					producer.beginTransaction();
					transactionInProgress = true;
				} catch (ProducerFencedException e) {
					throw new ConnectException("Fenced out while attempting to begin transaction; likely caused by another, newer, instance of this task being created", e);
				}

			// (3c) Send the records to the producer
			// No need to track the returned future as all records will be flushed when the producer transaction is committed and any
			// failures to send records will be surfaced then
			records.forEach(record -> {
				producer.send(
					serializeRecord(record),
					(metadata, exception) -> {
						// Track the resulting record metadata so that it can be given to the source task for record commit
						if (metadata != null)
							recordsToCommit.put(record, metadata)
					}
				)
			});
		}
	}
}

// The offset commit method that is invoked periodically by the framework
public void commitOffsets() {

	// Synchronize to ensure we don't commit offsets while polling for records
	synchronized (this) {

		// Short-circuit if there's no offsets to commit so we don't try to commit a producer transaction that hasn't been started yet
		if (sourceOffsets.isEmpty())
			return;

		// (4a) Write source offsets to Kafka
		sourceOffsets.forEach(sourcePartition, sourceOffset -> producer.send(serializeSourceOffset(sourceOffset, sourcePartition)));
		sourceOffsets.clear();

		// (4b) Commit the producer transaction
		try {
			producer.commitTransaction();
			transactionInProgress = false;
		} catch (ProducerFencedException e) {
			throw new ConnectException("Fenced out while attempting to commit transaction; likely caused by another, newer, instance of this task being created", e);
		}

		// (5a) Allow the task to commit each record
		recordsToCommit.forEach((record, metadata) -> task.commitRecord(record, metadata));
		recordsToCommit.clear();

		// (5b) And allow the task to offsets, if it would like
		task.commit();
	}
}

// Called externally by the framework on uncaught exceptions in the poll-produce loop or during offset commit
private void onFailure(Throwable t) {
	log.error("Task failing due to uncaught error", t);
	stopping = true;
	if (transactionInProgress) {
		try {
			producer.abortTransaction();
		} catch (Exception e) {
			log.warn("Failed to abort producer transaction", e);
		}
	}
}

private ProducerRecord<byte[], byte[]> serializeRecord(SourceRecord record) {
	// Use the task's key, value, and header converter to serialize the record and return a producer record using the serialized
	// byte arrays for the record's key, value, and headers
}

private ProducerRecord<byte[], byte[]> serializeSourceOffset(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
	// Use the worker's internal key and value converter to serialize the partition and offset respectively into byte arrays, then return
	// a producer record using those as its key and value
}

Drawbacks and pitfalls

This proposal makes assumptions about how Connect is run that should apply to most users, but possibly not all. These assumptions are:

  1. It is acceptable to give source connectors direct access to the offsets topic
  2. The Kafka cluster that source connectors are writing to is the same Kafka cluster that the Connect worker uses for its internal topics

Assumption 1 may not hold in extremely security-conscious and/or multitenant environments, since it technically becomes possible for malicious users to corrupt the content of the offsets topic for any/all connectors on the cluster if they are able to create a source connector with atomic source offset enabled that produces arbitrary source records.

Assumption 2 may not hold in multitenant environments where a single global Connect cluster hosts connectors that write to and read from a variety of Kafka clusters.

Compatibility, Deprecation, and Migration Plan

Enabling atomic source offset commit

Producer principal permissions

Before enabling atomic source offset commit for a connector, its producer principal must be given the following permissions on the Kafka cluster it writes to:

...

.

Offset (and record) writes

When exactly.once.source.enabled is set to true for a worker, all source tasks created by that worker will use a transactional producer to write to Kafka. All source records that they provide to the worker will be written to Kafka inside a transaction. Once it comes time for offsets to be committed for a task, offset information for the current batch will be written to the offsets topic, inside the same transaction. Then, the transaction will be committed. This will ensure that source offsets will be committed to Kafka if and only if the source records for that batch were also written to Kafka.

Once an offset commit is complete, if the connector is configured with a custom offsets topic, the committed offsets will also be written to the worker’s global offsets topic using a non-transactional producer and the worker’s principal. This will be handled on a separate thread from the task’s work and offset commit threads, and should not block or interfere with the task at all. If the worker fails to write these offsets for some reason, it will retry indefinitely, but not fail the task.

Tasks will be given a transactional ID of ${groupId}-${connector}-${taskId}, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero).

SourceTask record commit API

The SourceTask::commit and SourceTask::commitRecord methods will be invoked immediately after each successful offset commit. First, commitRecord will be invoked for every record in the committed batch, followed by a call to commit. It is possible that the worker or task may suddenly die in between a successful offset commit and the completion of these calls.

Because of this, it cannot be guaranteed that these methods will be invoked after every successful offset commit. However, as long as a task is able to accurately recover based solely off of the offset information it provided to the Connect framework with each record it produced, this should not compromise exactly-once delivery guarantees. These methods can and should still be used to clean up and relinquish resources after record delivery is guaranteed, but using them to track offsets will prevent a connector from being viable for exactly-once support.

Per-connector offsets topics

Whether or not exactly.once.source.enabled is set to true for a worker, source connectors will be allowed to use custom offsets topics, configurable via the offsets.storage.topic property.

Motivation

First, it closes a potential security loophole where malicious connectors can corrupt the offsets information available to other connectors on the cluster. This is preventable at the moment by configuring connectors with separate principals from that of the worker, and only granting write access to the offsets topic to the worker principal. However, since the same producer will now be used to both write offset data and write source task records, that approach will no longer work. Instead, allowing connectors to use their own offsets topics should allow administrators to maintain the security of their cluster, especially in multitenant environments.

Second, it allows users to limit the effect that hanging transactions on an offsets topic will have. If tasks A and B use the same offsets topic, and task A initiates a transaction on that offsets topic right before task B starts up, then task A dies suddenly without committing its transaction, task B will have to wait for that transaction to time out before it can read to the end of the offsets topic. If the transaction timeout is set very high for task A (to accommodate bursts of high throughput, for example), this will block task B from processing any data for a long time. Although this scenario may be unavoidable in some cases, using a dedicated offsets topic for each connector should allow cluster administrators to isolate the blast radius of a hanging transaction on an offsets topic. This way, although tasks of the same connector may still interfere with each other, they will at least not interfere with tasks of other connectors. This should be sufficient for most multitenant environments.

Finally, with the advent of KIP-458, it is now possible to bring up a single Connect cluster whose connectors each target a different Kafka cluster (via the producer.override.bootstrap.servers, consumer.override.bootstrap.servers, and/or admin.override.bootstrap.servers connector properties). At the moment, source task offsets are still tracked in a single global offsets topic for the entire Connect cluster; however, if the producer for a task is used to write source offsets, and that producer targets a different Kafka cluster than the one the Connect worker uses for its internal topics, that will no longer be possible. In order to support this case well, per-connector offsets topics must be reasoned about, to allow for smooth upgrades and downgrades where large gaps in offsets data are not created unnecessarily (see “Migration” below). Although users do not necessarily need to be given control over the name of the offsets topic for each connector in this case (the worker could simply create an offsets topic with the same name on the connector’s overridden Kafka cluster, for example), exposing this level of control adds very little complexity to this design once the necessity for per-connector offsets topics is realized. As a benefit, it should make it easier for users to configure connectors in secured environments where the connector principal may have limited access on the Kafka cluster it targets.

Migration

If a connector is configured to use a separate offsets topic but no data is present in that topic yet, the worker that is assigned the Connector instance for that connector can automatically migrate any offsets it finds for that connector in the global offsets topic into the connector’s separate offsets topic. The process for this will be:

  1. Worker is assigned Connector
  2. Worker checks connector configuration and observes that it should use a separate offsets topic
  3. Worker attempts to create it using the connector’s principal (no check for topic existence is made; if the topic already exists, the resulting error response from the broker is silently discarded in the same way that it is when a worker tries to create one of its internal topics today)
  4. Worker reads to the end of the custom offsets topic, failing the Connector if the offsets topic doesn’t exist by this point
  5. If no data is present in the custom offsets topic:
    1. Worker refreshes its view of the global offsets topic by reading to the end
    2. Worker begins a producer transaction on the custom offsets topic
    3. Worker copies all offset information for the connector in its latest view of the global offsets topic into the custom offsets topic
    4. If no offset information is available in the global offsets topic for the connector, worker writes a sentinel value into the offsets topic indicating that a migration has taken place
    5. Worker commits the producer transaction
    6. Worker reads to the end of the custom offsets topic (again)
  6. Worker starts the Connector

If a source connector is configured to use a separate offsets topic, the worker will block task startup for as long as necessary (unless interrupted due to task shutdown) until the topic is created. Once the topic exists (after being created by the worker that owns the Connector, if necessary), the worker will read to the end of the offsets topic before starting the task. If there are no offsets for the connector present in the topic and there is no sentinel value in the offsets topic indicating that a migration attempt was made and found no data for the task, the worker will continue to read to the end of the topic until either offset information for the connector or a sentinel value is found. Once this step is completed, the worker will be free to start the task.

An example of a sentinel offsets record for a connector named “reddit-source” would look like:

    key: [“reddit-source”]
value: {“migrated”: true}

Workers are already very permissive about the content of their offsets topics and will not break or even log a warning message if they encounter anything that doesn’t appear to be a source offset. For the sake of cleanliness, we choose a key format that closely mirrors the existing key format for source offsets, which consists of an array containing the name of the connector and the source partition. The value only needs to be non-null so that record of a migration can be kept even after topic compaction has occurred; for readability’s sake (if the topic ever needs to be inspected for debugging, for example), we make it more explicit that this record refers to offset migration.

Zombie Fencing

With exactly.once.source.enabled set to true for a worker, the worker will begin taking steps to ensure that zombie source tasks in the cluster do not compromise exactly-once delivery support.

Task count records

A new type of record, a “task count record”, will be used in the config topic. This record explicitly tracks the number of task producers that will have to be fenced out if a connector is reconfigured before bringing up any tasks with the new set of task configurations, and will implicitly track whether it is necessary to perform a round of fencing before starting tasks for a connector.

An example of this record for a connector named “reddit-source” with 11 tasks would look like:

    key: “tasks-count-reddit-source”
value: {“tasks”: 11}

If the latest task count record for a connector comes after its latest set of task configurations in the config topic, workers will be allowed to create and run tasks for that connector. For example, if the contents of the config topic (represented by the keys of each record) are as follows:

    offset 0: task-reddit-source-0
offset 1: task-reddit-source-1
offset 2: commit-reddit-source
offset 3: tasks-count-reddit-source

then workers will be able to freely bring up tasks for the reddit-source connector.

However, if the contents of the config topic are this:

    offset 0: tasks-count-reddit-source
offset 1: task-reddit-source-0
offset 2: task-reddit-source-1
offset 3: commit-reddit-source

then workers will not bring up tasks for the connector.

Fencing during rebalance

All workers

When a rebalance is triggered, before (re-)joining the cluster group, all workers will preemptively stop all tasks of all source connectors for which task configurations are present in the config topic after the latest task count record. This step is not necessary to preserve exactly-once delivery guarantees, but should provide a reasonable opportunity for tasks to shut down gracefully before being forcibly fenced out.

Leader

During rebalance, the leader will take on a few extra responsibilities. Once all group members have joined and it is time to perform assignment the leader will:

  1. For each to-be-assigned source connector, if there is a set of task configurations present in the config topic after the most recent task count record for that connector:
    1. Fence out all producers that may still be active for prior task instances of this connector by:
      1. Instantiating a producer with the transactional ID for each task, up to the number of tasks in the most recent task count record for the connector.* These producers will be configured specially to avoid allocating unused resources; specifically, their buffer.memory will be 0.
      2. Initializing transactions on each of these producers.*
      3. Immediately closing each of these producers.*
    2. Write the new task count record for all of the fenced-out connectors to the config topic
  2. Read to the end of the config topic to verify that all task count records have been written successfully, and use the new latest offset of the config topic as the offset in the assignment given out to members of the cluster.

*  - These steps will be done in parallel for all tasks of all connectors

Source task startup

When a worker is assigned a source task, it will check to see if a task count record is present in the config topic after the most recent task configs for that connector.

If that is the case, the worker can assume that all producers for all prior generations of tasks for that connector have been fenced out, and it will bring up the source tasks it has been assigned. 

Otherwise, the worker will emit a warning log message and refuse to bring the task up until a rebalance occurs and it has a chance to check again for a task count record after the most recent set of task configurations for the connector.

Leader access to config topic

After a rebalance, a worker may discover that it has become the leader of the cluster. When this happens, the worker will instantiate a transactional producer whose transactional ID is, by default, the group ID of the cluster (but may be overwritten by users using the transactional.id worker property). The worker will use this producer for all writes it performs on the config topic. It will begin and commit a transaction for every record it writes. This may seem unusual--why use a transactional producer if transactions aren’t really necessary?--but it ensures that only the most recent leader is capable of producing to the config topic and that zombie leaders (rare though they may be) will not be able to.

If, for example, a leader stalls while attempting to fence out a prior generation of task producers for a given connector during a rebalance, it may fall out of the group and become a zombie. If another worker is elected leader at this point and the connector is then reconfigured, it’s possible that the zombie leader may become unblocked and then attempt to write a task count record to the config topic after a new set of task configurations are written to the topic. This would corrupt the state of the config topic and violate a key assumption: a task count record present after task configurations for a connector means that it is not necessary to fence out a prior generation of producers for that connector’s tasks before starting them.

By using a transactional producer on the leader, we can guarantee that a leader will only be able to write a task count record to the config topic if no other workers have become the leader and written new task configurations for the connector to the topic since then.

Addressed failure/degradation scenarios

These scenarios will not compromise a cluster’s exactly-once delivery guarantees.

Leader cannot write task count record to config topic during rebalance

The leader will try indefinitely to write the task count to the config topic during rebalance and verify that it has been written by reading it back from the topic after writing.

If this takes too long, the rebalance timeout will elapse and a new leader will be elected. If the (now-dethroned) leader becomes unblocked during this process and attempts to write to the config topic, it will have been fenced out by its replacement. If it has been blocked on reading from the config topic, it will wake up and attempt to complete the rebalance, only to be informed by the group coordinator that it is no longer a member of the group. It will then attempt to rejoin the group.

If this does not block but instead fails for some reason, the leader will immediately leave the group and then attempt to rejoin.

Cannot fence out producers during rebalance

The leader may be unable to fence out producers during a rebalance if, for example, it is unable to reach or locate a transaction coordinator.

Automatically failing tasks for that connector may seem appealing at first, but the user’s first instincts in this situation are likely to be to restart those tasks, which could cause them to come up without zombies from a prior generation being fenced out first.

Marking the entire rebalance as failed and forcing a follow-up round would be simpler to design and less likely to lead to compromised delivery guarantees, but it would also impact any other connectors and tasks that were due to be reassigned in that round.

To compromise, the leader will limit the time it blocks for producer fencing to complete to be at most half of the configured session timeout for the cluster. If that time elapses and the leader has not yet fenced out the producers for some tasks, it will continue with the rebalance and happily assign all connectors and tasks as normal. No worker will start up new task instances for not-yet-fenced connectors as there will be no task count record present in the config topic after the most recent set of task configurations. Once the leader is able to successfully complete fencing out all producers for a given connector, it will write the task count record to the config topic, read it back, and force a rebalance by rejoining the group. At this point, all workers will be able to (re-)start tasks for the newly-fenced-out connector.

Manual restart of fenced-out tasks

If a task fails because its producer has been fenced out, the user may try to manually restart that task through the REST API. If the worker’s view of the config topic is up-to-date, this will simply result in the restarted task replacing any existing instances of the same task, and exactly-once delivery guarantees will be preserved. However, if the worker has an outdated view of the config topic, restarting that task may compromise exactly-once guarantees if that task ends up producing data from a source partition that has already been assigned to a different task in the latest set of task configurations for the connector.

In order to prevent manual user restarts from compromising exactly-once delivery guarantees, Connect workers will read to the end of the config topic before proceeding with any source task restarts. If the worker is unable to read to the end of the config topic, the restart request will be met with a 500 timeout response. If the worker discovers during its read to the end of the topic that the task no longer exists (because the connector has been deleted or its task count reduced), it will return the usual 404 not found response. If a rebalance becomes expected because of new information read from the config topic, a 409 conflict response will be given.

Accidental task commit of dropped messages

If a transactional producer is successfully instantiated, it will not attempt to contact the broker again until an attempt is made to actually send a record. This can lead to interesting behavior: the producer is brought up and able to make initial contact with the transaction coordinator, then the entire Kafka cluster dies, then the producer begins and commits a transaction without attempting to send any records, and no exception or other indication of failure is generated by the producer at any point in this sequence of events.

When might this occur? If a task is configured with an aggressive SMT that drops all the records in a given batch, its producer will never attempt to send any records before committing the current transaction. And, once this occurs, the worker will invoke SourceTask::commit, which may cause the task to drop the data from the upstream system (for example, by acknowledging a batch of messages from a JMS broker). Even if this occurs, it should not be a problem: it’s fine to drop messages from the upstream system that are meant to be dropped by the connector, regardless of whether source offsets for those records have been committed to Kafka, as the end result is the same either way.

Permitted failure scenarios

These scenarios can compromise a cluster’s exactly-once delivery guarantees.

Heterogeneous clusters

If a worker is active and does not have support for exactly-once delivery, the entire cluster’s ability to provide exactly-once guarantees will be compromised. There is no way to fence out non-compliant workers. Even if one is developed, the problem would only be slightly transformed: if a worker is active that cannot be fenced out by other workers in the cluster, we’d be in the exact same place as before.

This failure scenario will need to be called out in public documentation so that users understand that they are responsible for the operational burden of ensuring that no non-compliant workers are active.

Non-worker producer to config topic

If anything besides a worker ends up writing to the config topic, all assumptions that the cluster relies upon to determine whether it is safe to start tasks and whether a fencing is necessary during rebalance can be invalidated. This should never happen in a reasonably-secured Connect cluster and if it does, compromised delivery guarantees may pale in comparison to the other consequences of corrupting the config topic.

Compatibility, Deprecation, and Migration Plan

Interoperability with rebalancing protocols

Exactly-once support will be possible with both the eager and incremental cooperative rebalancing protocols. The additional design complexity to support both is minimal, and given that both protocols are still supported for the Connect framework, it’s likely that some users will still be using eager rebalancing. They should not have to upgrade to the newer rebalancing protocol in order to gain access to this feature if the work to support both protocols is not prohibitive.

Worker principal permissions

Before enabling exactly-once source support on a worker, its producer principal must be given the following permissions on the Kafka cluster it writes to:

Operation

Resource Type

Resource Name

Write

TransactionalId

The group.id of the cluster, or the transactional.id if one is specified in the worker configuration.

IdempotentWrite

Cluster

Kafka cluster targeted by the Connect cluster

Connector principal permissions

Producer

Each source connector’s producer principal must be given the following permissions on the Kafka cluster it writes to:

Operation

Resource Type

Resource Name

Write

TransactionalId

${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

Write

TransactionalId

${groupId}-${connector}. Only necessary if the connector uses a separate offsets topic.

IdempotentWrite

Cluster

Kafka cluster targeted by the Connect cluster

Write

Topic

Offsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.

Consumer

Each source connector’s consumer principal must be given the following permissions on the Kafka cluster it reads offsets from:

Operation

Resource Type

Resource Name

Read

Topic

Offsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.

Admin

If the offsets topic for a connector does not exist yet, the connector’s admin principal must be given the following permissions on the Kafka cluster it reads offsets from:

Operation

Resource Type

Resource Name

Create

Topic

Offsets topic used by the connector, which is the value of the offsets.storage.topic property in the connector’s configuration. If not provided, this ACL is not needed, as the worker’s shared offsets topic should be created automatically before any connectors are started.

Rolling upgrade

Rolling upgrades that enable exactly-once support on a cluster will be possible. Users can stop each worker, upgrade to a later version if necessary, set exactly.once.source.enabled to true in the config, then restart, one by one.

This will have the effect that, until the leader is upgraded, no upgraded workers will start source tasks. This is necessary in order to guarantee that a task count record has been written to the config topic before starting a task with a transactional producer.

Additionally, no tasks for connectors configured to use separate offsets topics will start until that topic exists and the tasks can confirm that a migration has taken place (either implicitly, via the presence of offsets for that connector, or explicitly, via a sentinel migration record for the connector).

Downgrade

Two kinds of downgrade are possible. Exactly-once support for a cluster can be disabled by setting exactly.once.source.enabled to false for workers in the cluster (a “soft” downgrade), or workers can be reverted to use older versions of the Connect framework that does not support exactly-once sources at all (a “hard” downgrade).

Soft downgrade

Soft downgrades should be possible via a cluster roll, where each worker is shut down, its configuration is altered to disable exactly-once source support, and then it is restarted. During the process, some workers that are still configured with exactly-once source support enabled may refuse to start tasks for connectors if a task count record is not found for those connectors after the most recent set of task configurations. This isn’t especially likely given that the most common cause of this would be connector reconfiguration during downgrade, but even if it occurs, it will only affect workers that have yet to be downgraded. There should be no long-lasting impact on the cluster once all workers have been downgraded.

Hard downgrade

Hard downgrades can be performed in the same way as soft downgrades. Because source task offsets on upgraded workers are still written to the worker’s global offsets topic, even if a downgraded worker does not support per-connector offsets topics, it will still pick up on relatively-recent source offsets for its connectors. Some of these offsets may be out-of-date or not match the ones in the connector’s separate offsets topic, but the only consequence of this would be duplicate writes by the connector, which will be possible on any cluster without exactly-once support enabled.

If a re-upgrade is desirable at this point, any separate per-connector offsets topics may need to be deleted beforehand. Otherwise, on startup, the worker will read from the existing separate offsets topic, see that there is offset data for the connector present in it, and not perform a migration, causing all data that was produced by the connector after the downgrade to be duplicated.

Additional configuration properties

The two user-facing configuration properties introduced here should not violate backwards compatibility, as they both come with default values that preserve existing behavior.

Rejected Alternatives

Connector-defined transaction boundaries

Summary: allow connectors to dictate when a transaction should be started, committed, rolled back, etc.

Rejected because: out of scope; can be pursued as an additional feature later on.

Per-connector exactly-once property

Summary: either instead of or in addition to allowing exactly-once support to be configured on a worker-wide basis, allow it to be enabled on a per-connector basis.

Rejected because: even if well-behaved workers close the producers for tasks that lag on shutdown, zombie workers may still run tasks that the cluster has since reassigned to other workers. In this case, while the worker remains a zombie, manual intervention is required from the cluster administrator to shut down the tasks hosted on the zombie worker, either by shutting down the worker (possible today) or by explicitly revoking tasks from the worker (not possible today, but another option that would work). Requiring timely manual intervention on the part of the user in order to preserve delivery guarantees should be avoided if possible.

If a non-exactly-once source task were to become a zombie because it, its converter, one of its transforms, etc. became blocked, or because its worker became a zombie, then its connector were reconfigured to enable exactly-once support, the zombie task might eventually cause duplicate writes to occur. To make this scenario less likely, exactly-once support is provided in an all-or-nothing fashion that makes it impossible for non-transactional producers to stay lying around in a cluster after all of the workers have been upgraded and configured to provide exactly-once source support.

Deduplication of connector-provided records

Summary: track messages produced by source connectors and perform deduplication automatically.

Rejected because: this requires every message to be uniquely and deterministically identifiable, which is not possible in all cases (although this is likely possible in all cases where connectors properly utilize the offsets API provided by the Connect framework). When it is possible, deduplication can already be implemented by a downstream application such as Kafka Streams. Finally, it’s unclear how all prior IDs for records produced by a connector might be tracked and used efficiently by a Connect cluster without incurring a noticeable performance penalty.

Per-source partition producers

Summary: instead of using one producer for each task, use a producer for each source partition of the task.

Rejected because: although this would likely guarantee exactly-once delivery for the same connectors that the proposed design would (specifically, connectors that allocate each source partition to at most one task at a time), it would come with a moderate drawback. The number of producers created for a connector could grow quite large, at which point, allocating enough buffer memory, broker connections, and other resources for each of these may have adverse impacts on the performance of the worker. This would also go against the pattern established by the Connect framework that, to scale a connector, its task count should be increased, as it would make the number of producers set up for a connector the same no matter if it’s running a single task or hundreds. Finally, it would increase the operational burden on users in security-conscious environments, as the transactional IDs required to run a connector would be difficult to predict and could dynamically expand at runtime depending on the implementation of the connectors being run.

Cleanup of task count records on connector deletion

Summary: if a connector is deleted, perform a final round of producer fencing for its tasks before writing a tombstone task count record for the connector to the config topic.

Rejected because: in the worst case, the connector is never recreated and a worker (presumably the leader) will have expended unnecessary effort on fencing out producers that aren’t going to compromise delivery guarantees and won’t have any bigger consequences than such tasks would already have on a Connect cluster today. In the best case, the connector is recreated immediately, in which case, a fencing would be performed anyways if no task count tombstone record were written to the config topic. Ultimately, no work is saved by doing this fencing on connector deletion, and some work may be unnecessary

Rolling upgrade

Once the necessary permissions are granted to the producers of all source connectors on the cluster that will use atomic source offset commit, it should be possible to enable atomic source offset commit via a single rolling upgrade of the cluster. If enabling atomic source offset for the entire worker, atomic.source.offset.commit=true should configured on each worker before restart. If enabling for a specific set of connectors, atomic.source.offset.commit=true  can be configured for these connectors either before, during, or after the rolling upgrade; it makes little difference either way.

During this process, it is possible that source offsets that have been written to Kafka by an upgraded worker but not yet committed will be visible to workers that have not yet been upgraded; this is because the default isolation.level  for consumers set up by not-yet-upgraded workers will be read_uncommitted  and will pick up all writes to Kafka regardless of transactional boundaries. This should be acceptable as there is no potential for data loss, no increased risk of duplicate delivery, and it will cease to be a problem entirely once the upgrade is complete.

Additional configuration properties

The new atomic.source.offset.commit worker configuration property is only at risk of conflicting with configuration properties for existing REST extensions. Given the length and specificity of the property name, it is highly unlikely for this kind of conflict.

Although the source connector configuration property with the same name is at heightened risk of conflict since it shares a namespace with connector configurations, this conflict is still unlikely given the length and specificity of the property name.

Rejected Alternatives

Connector-defined transaction boundaries

Summary: allow connectors to communicate to the framework when a transaction should be started, committed, rolled back, etc.

Rejected because: out of scope for this KIP. Can be pursued as an additional feature later on, but the proposed approach should cover most cases without requiring connector developers to alter their code.

Only worker-level atomic source offset property

Summary: instead of allowing atomic source offset to be configured on a per-connector basis, only allow per-connector atomic source offset to be enabled in an all-or-nothing fashion for an entire worker

Rejected because: would restrict configurability for connectors, require a worker restart in order to toggle, and make incremental upgrades and testing more difficult as every source connector principal would have to be given permission to write to the offsets topic before enabling atomic source offset commit

Only connector-level atomic source offset property

Summary: instead of allowing atomic source offset to be configured on a worker-wide basis, only allow atomic source offset to be enabled on a per-connector basis

Rejected because: would be inconvenient for users who want every (or nearly every) source connector to use atomic source offset commit, and doesn't align with precedent set by converter and Kafka client configurations where there are worker-wide defaults that can be selectively overridden on a per-connector basis.

Future Work

Finer control over offset commits

The framework periodically commits source offsets at a fixed interval. Since source offset commits are now tied to the commits of the transactions in which source records are sent, Connect users and connector developers may want to trigger offset commits differently. Options for this include allowing source tasks to define their own transaction boundaries, allowing per-connector source offset intervals, and or allowing offsets to be committed after a certain number of records.