...
Workers are already very permissive about the content of their offsets topics and will not break or even log a warning message if they encounter anything that doesn’t appear to be a source offset. For the sake of cleanliness, we choose a key format that closely mirrors the existing key format for source offsets, which consists of an array containing the name of the connector and the source partition. The value only needs to be non-null so that record of a migration can be kept even after topic compaction has occurred; for readability’s sake (if the topic ever needs to be inspected for debugging, for example), we make it more explicit that this record refers to offset migration.
Admin API to Fence out Transactional Producers
Java API additions
The Admin interface will be expanded to include new methods for fencing out producers by transactional ID:
Code Block | ||||
---|---|---|---|---|
| ||||
public interface Admin {
/**
* Fence out all active producers that use any of the provided transactional IDs.
*
* @param transactionalIds The IDs of the producers to fence.
* @param options The options to use when fencing the producers.
* @return The FenceProducersResult.
*/
FenceProducersResult fenceProducers(Collection<String> transactionalIds,
FenceProducersOptions options);
/**
* Fence out all active producers that use any of the provided transactional IDs, with the default options.
* <p>
* This is a convenience method for {@link #fenceProducers(Collection, FenceProducersOptions)}
* with default options. See the overload for more details.
*
* @param transactionalIds The IDs of the producers to fence.
* @return The FenceProducersResult.
*/
default FenceProducersResult fenceProducers(Collection<String> transactionalIds) {
return fenceProducers(groupIds, new FenceProducersOptions());
}
} |
New classes for the API will be added as well.
The FenceProducersResult
class:
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.clients.admin;
/**
* The result of the {@link Admin#fenceProducers(Collection)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class FenceProducersResult {
/**
* Return a map from transactional ID to futures which can be used to check the status of
* individual fencings.
*/
public Map<String, KafkaFuture<Void>> fencedProducers() {
// Implementation goes here
}
/**
* Return a future which succeeds only if all the producer fencings succeed.
*/
public KafkaFuture<Void> all() {
// Implementation goes here
}
} |
And the FenceProducersOptions
class:
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.clients.admin;
/**
* Options for {@link AdminClient#fenceProducers(Collection, FenceProducersOptions)}
*
* The API of this class is evolving. See {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class FenceProducersOptions extends AbstractOptions<FenceProducersOptions> {
// No options (yet) besides those inherited from AbstractOptions
} |
Implementation
Under the hood, this will be implemented by finding the transaction coordinator for each transactional ID via the FindCoordinator protocol, then initializing a new transaction with that ID by invoking the InitProducerId protocol with no producer ID and no producer epoch. In the words of KIP-98, this:
- Bumps up the epoch of the PID, so that the any previous zombie instance of the producer is fenced off and cannot move forward with its transaction.
- Recovers (rolls forward or rolls back) any transaction left incomplete by the previous instance of the producer.
Zombie Fencing
With exactly.once.source.enabled
set to true for a worker, the worker will begin taking steps to ensure that zombie source tasks in the cluster do not compromise exactly-once delivery support.
...
- For each to-be-assigned source connector, if there is a set of task configurations present in the config topic after the most recent task count record for that connector:
- Fence out all producers that may still be active for prior task instances of this connector by : Instantiating a producer with the transactional ID for each task, up to the number of tasks in
- instantiating an admin client using the connector principal and invoking
Admin::fenceProducers
. The transactional IDs will be the IDs that each task of the connector would have used, assuming as many tasks were active as the most recent task count record for the connector.* These producers will be configured specially to avoid allocating unused resources; specifically, their buffer.memory will be 0. - Initializing transactions on each of these producers.*
- Immediately closing each of these producers.*
- Write the new task count record for all of the fenced-out connectors to the config topic
- Read to the end of the config topic to verify that all task count records have been written successfully, and use the new latest offset of the config topic as the offset in the assignment given out to members of the cluster.
* - These steps This step will be done in parallel for all tasks of all connectors
Source task startup
...
Worker principal permissions
Producer
Before enabling exactly-once source support on a worker, its producer principal must be given the following permissions on the Kafka cluster it writes to:
...
Connector principal permissions
Producer
Each source connector’s producer principal must be given the following permissions on the Kafka cluster it writes to:
Operation | Resource Type | Resource Name |
|
|
|
|
|
|
|
| Kafka cluster targeted by the Connect cluster |
|
| Offsets topic used by the connector, which is either the value of the |
|
| Kafka cluster targeted by the Connect cluster |
not. |
Consumer
Each source connector’s consumer principal must be given the following permissions on the Kafka cluster it reads offsets from:
Operation | Resource Type | Resource Name |
|
| Offsets topic used by the connector, which is either the value of the |
Consumer
Admin
If the offsets topic for a connector does not exist yet, the connector’s admin Each source connector’s consumer principal must be given the following permissions on the Kafka cluster it reads offsets from:
Operation | Resource Type | Resource Name |
Write |
| Offsets topic used by the connector, which is either the value of the |
Admin
If the offsets topic for a connector does not exist yet, the connector’s admin principal must be given the following permissions on the Kafka cluster it reads offsets from:
TransactionalId | ${groupId}-${connector}-${taskId} , for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user. | |
Operation | Resource Type | Resource Name |
|
| Offsets topic used by the connector, which is the value of the |
...
The two user-facing configuration properties introduced here should not violate backwards compatibility, as they both come with default values that preserve existing behaviorcompatibility, as they both come with default values that preserve existing behavior.
New Admin API
The new Admin
methods for fencing producers will be implemented using existing protocols (specifically, FindCoordinator and InitProducerId), so no changes to the Kafka binary protocol will be required. As a result, these new methods should work on any broker that supports those protocols.
Rejected Alternatives
Connector-defined transaction boundaries
...
Since the design for atomic writes of source records and their offsets relies on source offsets being stored in a Kafka topic, standalone mode is not eligible. If there is sufficient demand, we may add this capability to standalone mode in the future.
Admin API to fence out transactional IDs
In order to fence out producers by transactional ID, the leader spins up an otherwise-unused producer and calls initTransactions
on it. It may be better to add an API to the admin interface that can perform the task of fencing out one or more transactional IDs to obviate the need for these non-producing producers.