...
Current state: Under discussion
Discussion thread: here (formerly here)
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||
---|---|---|---|---|---|---|
|
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.clients.admin; /** * The result of the {@link Admin#fenceProducers(Collection)} call. * * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class FenceProducersResult { /** * Return a map from transactional ID to futures which can be used to check the status of * individual fencings. */ public Map<String, KafkaFuture<Void>> fencedProducers() { // Implementation goes here } /** * Returns a future *that Returnprovides athe futureproducer whichID succeedsgenerated onlywhile if allinitializing the producergiven fencingstransaction succeed. when the request completes. */ public KafkaFuture<Void>KafkaFuture<Long> allproducerId(String transactionalId) { // Implementation goes here } /** * Returns a future that provides } } |
And the FenceProducersOptions
class:
the epoch ID generated while initializing the given transaction when the request completes.
*/
public KafkaFuture<Short> epochId(String transactionalId) {
// Implementation goes here
}
/**
* Return a future which succeeds only if all the producer fencings succeed.
*/
public KafkaFuture<Void> all() {
// Implementation goes here
}
} |
And the FenceProducersOptions
class:
Code Block | ||||
---|---|---|---|---|
Code Block | ||||
| ||||
package org.apache.kafka.clients.admin; /** * Options for {@link AdminClient#fenceProducers(Collection, FenceProducersOptions)} * * The API of this class is evolving. See {@link AdminClient} for details. */ @InterfaceStability.Evolving public class FenceProducersOptions extends AbstractOptions<FenceProducersOptions> { // No options (yet) besides those inherited from AbstractOptions } |
...
Operation | Resource Type | Resource Name | |
Write | TransactionalId | ${groupId}-${connector}-${taskId} , for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user. | . |
|
| Offsets topic used by the connector, which is the value of the |
Additionally, the connector's admin principal must be given the following permissions on the Kafka cluster it reads offsets from no matter what:
Operation | Resource Type | Resource Name |
---|---|---|
Describe | Topic | Offsets topic used by the connector, which is the value of the offsets.storage.topic property in the connector’s configuration. If not provided, this ACL is not needed, as the worker’s shared offsets topic should be created automatically before any connectors are started. |
This is necessary in order to ensure that the Connect worker has read up to the end of the offsets topic, even when there are open transactions on the topic at the start of the read to the end. This will be accomplished by listing the end offsets for the offsets topic using an admin client with the read_uncommitted
isolation level, and then consuming from that topic with the read_committed
isolation level until at least those offsets listed by the admin client.
Rolling upgrade
Rolling upgrades that enable exactly-once support on a cluster will be possible. Users can stop each worker, upgrade to a later version if necessary, set exactly.once.source.enabled
to true
in the config, then restart, one by one.
...