You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under discussion

Discussion thread: TODO

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

It's been possible for sink connectors to implement exactly-once delivery for a while now, but the same cannot be said for source connectors. The key reason for this is how source offsets are handled by the framework: they are periodically written to an internal Kafka topic at a configurable interval, once the source record that they correspond to has been successfully sent to Kafka and ack'd by the producer. This provides at-least-once delivery guarantees for most source connectors, but allows for the possibility of duplicate writes in cases where source records are successfully written to Kafka but the worker is interrupted before it can write the offsets for those records to Kafka.

In order to strengthen the delivery guarantees for source connectors, it should be possible to atomically write source records and their source offsets to Kafka. This should provide exactly-once delivery guarantees for any source connector that can deterministically resume consumption from its source based on the source offsets it provides to the Connect framework.

Public Interfaces

A single worker-level configuration property will be added for distributed mode:

NameTypeDefaultImportanceDoc
atomic.source.offset.commitBOOLEANfalseLOW

Whether to commit offsets for source tasks in a producer transaction with the records produced by that task. For some source connectors, this is all that is required for exactly-once delivery.

This property will apply by default to all source connectors created on this worker, but it can be overridden on a per-connector basis by including it in the configuration for a connector.

And a single per-connector configuration property will be added:

NameTypeDefaultImportanceDoc
atomic.source.offset.commitBOOLEANfalseLOW

Whether to commit offsets for this connector's source tasks in a producer transaction with the records produced by those tasks. For some source connectors, this is all that is required for exactly-once delivery.

Will only apply in distributed mode; if used in a connector configuration on a standalone worker, will be ignored.

Proposed Changes

New source offset read behavior

The consumers used by the framework to read source offsets from the internal offsets topic will be configured with a new default property, isolation.level=read_committed. This will cause them to ignore any records that are part of a non-committed transaction, but still consume records that are not part of a transaction at all. This property will be used for every consumer instantiated by the framework for reading source offsets regardless of whether the connector has atomic source offset commits enabled, since the behavior for connectors with atomic source offsets disabled will not be affected. There are also no additional ACL requirements for consumers configured this way.

Users will be allowed to explicitly override this in the worker configuration with the property consumer.isolation.level  or in the connector configuration with the property consumer.override.isolation.level . This aligns with the current behavior of the framework with regards to overriding default Kafka client properties: even if it's usually unnecessary and possibly a bad idea, we give users that flexibility in case they know what they're doing.

With atomic source offset commit enabled

Source task producers

Each source task will use its own producer to write offsets to Kafka, as opposed to the single shared producer that the framework currently uses to perform offset writes for all tasks on the worker. This is required since multiple producers cannot share a transaction ID.

The producer for each source task will be configured with a few new default properties that are required for producer transactions: enable_idempotence=true and transactional_id=<id>. The <id> for the producer will be the group ID of the cluster, the name of the connector, and the task ID, all delimited by hyphen; for example, connect-cluster-reddit-source-12 for task number 12 of a connector named reddit-source running on a Connect cluster with a group ID of connect-cluster. The cluster group ID is used in order to allow different Connect clusters running against the same underlying Kafka cluster to use connectors with the same name.

In addition to those properties, in order to respect the offset.flush.timeout.ms worker property (which dictates how long the framework will wait for an offset commit to complete), the max.block.ms property will be changed from Integer.MAX_VALUE  to the value specified for offset.flush.timeout.ms, which will establish an upper bound for the amount of time that committing a producer transaction can take.

Source task lifetime

  1. After a source task is initialized, the framework will call producer.initTransactions(). This will fence out any zombie instances of the same task and prevent those instances from being able to write to Kafka. Any fenced-out tasks will log an error letting the user know that it has been pre-empted by a new instance of the same task.
  2. Before polling for new records from the task, the framework will first block until any in-progress offset commits are completed. This is done in order to obey the contract for task.commit(), which is that the task should commit offsets "up to the offsets that have been returned by poll()". Polling for more records from the task might cause it to commit for records that have not actually been committed or even sent to Kafka yet.
  3. After any offset commits are completed and the framework receives records from the source task, it will:
    1. temporarily store the source offsets for those records,
    2. call producer.beginTransaction() if there is not a transaction that is already in progress, and
    3. pass those records to producer.send(...).
  4. When it is time to commit offsets for the source task, if there are no source offsets to be written, the framework will do nothing. Otherwise, it will:
    1. write those source offsets to Kafka (using the source task's producer), and then
    2. call producer.commitTransaction().
  5. After, if there were offsets to commit, the framework will:
    1. invoke task.commitRecord(...) for each record that was sent and committed to Kafka, then finally,
    2. invoke task.commit().

If an attempt to commit a transaction fails, the framework will immediately call producer.abortTransaction() and fail the task. This differs from the framework's current behavior on a failed source offset commit, which is to simply log the error and move on. However, this does align with the framework's current behavior on a failed source record send, which is to fail the task as quickly as possible. Since a failure to commit source offsets also means a failure to commit the producer transaction containing source records for those offsets, failing the task is preferable.

This pseudocode snippet illustrates the proposed lifecycle for a source task:

Source task, with atomic offset commit enabled
private final Map<Map<String, ?>, Map<String, ?>> sourceOffsets = new HashMap<>();
private final Map<SourceRecord, RecordMetadata> recordsToCommit = new ConcurrentHashMap<>();
private volatile boolean transactionInProgress = false;

// The "work" loop whose major responsibility is to poll the source task for records and gives them to the producer
public void pollProduceLoop() {

	// (1) Initialize the producer transaction, fencing out the producers of any previous abandoned task instance
	producer.initTransactions();

	while (!stopping) {
		List<SourceRecord> records;

		// (2) Synchronize to ensure we don't poll for records while committing offsets
		synchronized (this) {

			// Double-check here in case we failed to commit offsets before entering this synchronized block, which would cause the
			// "stopping" flag to be set to true
			if (stopping)
				break;

			// Poll the task for records
			records = task.poll();
			if (records == null || records.isEmpty())
				continue;

			// (3a) Store the source offsets for each record so that they can be committed later
			records.forEach(record -> sourceOffsets.put(record.sourcePartition(), record.sourceOffset()));

			// (3b) Start a new transaction if one hasn't yet been started
			if (!transactionInProgress) {
				try {
					producer.beginTransaction();
					transactionInProgress = true;
				} catch (ProducerFencedException e) {
					throw new ConnectException("Fenced out while attempting to begin transaction; likely caused by another, newer, instance of this task being created", e);
				}

			// (3c) Send the records to the producer
			// No need to track the returned future as all records will be flushed when the producer transaction is committed and any
			// failures to send records will be surfaced then
			records.forEach(record -> {
				producer.send(
					serializeRecord(record),
					(metadata, exception) -> {
						// Track the resulting record metadata so that it can be given to the source task for record commit
						if (metadata != null)
							recordsToCommit.put(record, metadata)
					}
				)
			});
		}
	}
}

// The offset commit method that is invoked periodically by the framework
public void commitOffsets() {

	// Synchronize to ensure we don't commit offsets while polling for records
	synchronized (this) {

		// Short-circuit if there's no offsets to commit so we don't try to commit a producer transaction that hasn't been started yet
		if (sourceOffsets.isEmpty())
			return;

		// (4a) Write source offsets to Kafka
		sourceOffsets.forEach(sourcePartition, sourceOffset -> producer.send(serializeSourceOffset(sourceOffset, sourcePartition)));
		sourceOffsets.clear();

		// (4b) Commit the producer transaction
		try {
			producer.commitTransaction();
			transactionInProgress = false;
		} catch (ProducerFencedException e) {
			throw new ConnectException("Fenced out while attempting to commit transaction; likely caused by another, newer, instance of this task being created", e);
		}

		// (5a) Allow the task to commit each record
		recordsToCommit.forEach((record, metadata) -> task.commitRecord(record, metadata));
		recordsToCommit.clear();

		// (5b) And allow the task to offsets, if it would like
		task.commit();
	}
}

// Called externally by the framework on uncaught exceptions in the poll-produce loop or during offset commit
private void onFailure(Throwable t) {
	log.error("Task failing due to uncaught error", t);
	stopping = true;
	if (transactionInProgress) {
		try {
			producer.abortTransaction();
		} catch (Exception e) {
			log.warn("Failed to abort producer transaction", e);
		}
	}
}

private ProducerRecord<byte[], byte[]> serializeRecord(SourceRecord record) {
	// Use the task's key, value, and header converter to serialize the record and return a producer record using the serialized
	// byte arrays for the record's key, value, and headers
}

private ProducerRecord<byte[], byte[]> serializeSourceOffset(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
	// Use the worker's internal key and value converter to serialize the partition and offset respectively into byte arrays, then return
	// a producer record using those as its key and value
}


Drawbacks and pitfalls

This proposal makes assumptions about how Connect is run that should apply to most users, but possibly not all. These assumptions are:

  1. It is acceptable to give source connectors direct access to the offsets topic
  2. The Kafka cluster that source connectors are writing to is the same Kafka cluster that the Connect worker uses for its internal topics

Assumption 1 may not hold in extremely security-conscious and/or multitenant environments, since it technically becomes possible for malicious users to corrupt the content of the offsets topic for any/all connectors on the cluster if they are able to create a source connector with atomic source offset enabled that produces arbitrary source records.

Assumption 2 may not hold in multitenant environments where a single global Connect cluster hosts connectors that write to and read from a variety of Kafka clusters.

Compatibility, Deprecation, and Migration Plan

Enabling atomic source offset commit

Producer principal permissions

Before enabling atomic source offset commit for a connector, its producer principal must be given the following permissions on the Kafka cluster it writes to:

OperationResource TypeResource Name
WriteTransactionalId${connector}-${taskId}, for each task that the connector will create, where ${connector} is the name of the connector and ${taskId} is the ID of the task (starting from zero). A prefix of ${connector}- can be used for convenience if there is no risk of conflict with other transaction IDs or if conflicts are acceptable to the user.
IdempotentWriteClusterKafka cluster targeted by the Connect cluster
WriteTopicInternal offsets topic used by the Connect cluster

Rolling upgrade

Once the necessary permissions are granted to the producers of all source connectors on the cluster that will use atomic source offset commit, it should be possible to enable atomic source offset commit via a single rolling upgrade of the cluster. If enabling atomic source offset for the entire worker, atomic.source.offset.commit=true should configured on each worker before restart. If enabling for a specific set of connectors, atomic.source.offset.commit=true  can be configured for these connectors either before, during, or after the rolling upgrade; it makes little difference either way.

During this process, it is possible that source offsets that have been written to Kafka by an upgraded worker but not yet committed will be visible to workers that have not yet been upgraded; this is because the default isolation.level  for consumers set up by not-yet-upgraded workers will be read_uncommitted  and will pick up all writes to Kafka regardless of transactional boundaries. This should be acceptable as there is no potential for data loss, no increased risk of duplicate delivery, and it will cease to be a problem entirely once the upgrade is complete.

Additional configuration properties

The new atomic.source.offset.commit worker configuration property is only at risk of conflicting with configuration properties for existing REST extensions. Given the length and specificity of the property name, it is highly unlikely for this kind of conflict.

Although the source connector configuration property with the same name is at heightened risk of conflict since it shares a namespace with connector configurations, this conflict is still unlikely given the length and specificity of the property name.

Rejected Alternatives

Connector-defined transaction boundaries

Summary: allow connectors to communicate to the framework when a transaction should be started, committed, rolled back, etc.

Rejected because: out of scope for this KIP. Can be pursued as an additional feature later on, but the proposed approach should cover most cases without requiring connector developers to alter their code.

Only worker-level atomic source offset property

Summary: instead of allowing atomic source offset to be configured on a per-connector basis, only allow per-connector atomic source offset to be enabled in an all-or-nothing fashion for an entire worker

Rejected because: would restrict configurability for connectors, require a worker restart in order to toggle, and make incremental upgrades and testing more difficult as every source connector principal would have to be given permission to write to the offsets topic before enabling atomic source offset commit

Only connector-level atomic source offset property

Summary: instead of allowing atomic source offset to be configured on a worker-wide basis, only allow atomic source offset to be enabled on a per-connector basis

Rejected because: would be inconvenient for users who want every (or nearly every) source connector to use atomic source offset commit, and doesn't align with precedent set by converter and Kafka client configurations where there are worker-wide defaults that can be selectively overridden on a per-connector basis.

Future Work

Finer control over offset commits

The framework periodically commits source offsets at a fixed interval. Since source offset commits are now tied to the commits of the transactions in which source records are sent, Connect users and connector developers may want to trigger offset commits differently. Options for this include allowing source tasks to define their own transaction boundaries, allowing per-connector source offset intervals, and allowing offsets to be committed after a certain number of records.

  • No labels