Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under discussion

Discussion threadhere (formerly here)

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10000
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6080

...

Code Block
languagejava
titleFenceProducersResult.java
package org.apache.kafka.clients.admin;

/**
 * The result of the {@link Admin#fenceProducers(Collection)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersResult {

    /**
     * Return a map from transactional ID to futures which can be used to check the status of
     * individual fencings.
     */
    public Map<String, KafkaFuture<Void>> fencedProducers() {
		// Implementation goes here
    }

    	/**
	 * Returns a future *that Returnprovides athe futureproducer whichID succeedsgenerated onlywhile if allinitializing the producergiven fencingstransaction succeed.
when the request  completes.
	 */
    	public KafkaFuture<Void>KafkaFuture<Long> allproducerId(String transactionalId) {
		// Implementation goes here
	}

	/**
 	 * Returns a future that provides  }

}

And the FenceProducersOptions  class:

the epoch ID generated while initializing the given transaction when the request completes.
	 */
	public KafkaFuture<Short> epochId(String transactionalId) {
		// Implementation goes here
	}

    /**
     * Return a future which succeeds only if all the producer fencings succeed.
     */
    public KafkaFuture<Void> all() {
		// Implementation goes here
    }

}

And the FenceProducersOptions  class:

Code Block
Code Block
languagejava
titleFenceProducersOptions.java
package org.apache.kafka.clients.admin;

/**
 * Options for {@link AdminClient#fenceProducers(Collection, FenceProducersOptions)}
 *
 * The API of this class is evolving. See {@link AdminClient} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersOptions extends AbstractOptions<FenceProducersOptions> {
    // No options (yet) besides those inherited from AbstractOptions
}

...

Create

Operation

Resource Type

Resource Name

WriteTransactionalId${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user..

Create

Topic

Offsets topic used by the connector, which is the value of the offsets.storage.topic property in the connector’s configuration. If not provided, this ACL is not needed, as the worker’s shared offsets topic should be created automatically before any connectors are started.

Additionally, the connector's admin principal must be given the following permissions on the Kafka cluster it reads offsets from no matter what:

OperationResource TypeResource Name
DescribeTopicOffsets topic used by the connector, which is the value of the offsets.storage.topic property in the connector’s configuration. If not provided, this ACL is not needed, as the worker’s shared offsets topic should be created automatically before any connectors are started.

This is necessary in order to ensure that the Connect worker has read up to the end of the offsets topic, even when there are open transactions on the topic at the start of the read to the end. This will be accomplished by listing the end offsets for the offsets topic using an admin client with the read_uncommitted  isolation level, and then consuming from that topic with the read_committed  isolation level until at least those offsets listed by the admin client.

Rolling upgrade

Rolling upgrades that enable exactly-once support on a cluster will be possible. Users can stop each worker, upgrade to a later version if necessary, set exactly.once.source.enabled to true in the config, then restart, one by one.

...