...
Note: The current design only applies to cases where all existing partitions can be discovered at once. If all old partitions cannot be discovered at once, the subsequent old partitions discovered will be treated as new partitions, leading to message duplication. Therefore, this point needs to be particularly noted.
...
Add unassignedInitialPartitons
and
...
initialDiscoveryFinished to snapshot state
Add the unassignedInitialPartitons
(collection) and firstDiscoveryDone
(boolean) to the snapshot state. unassignedInitialPartitons
represents the collection of first-discovered partitions that have not yet been assigned. firstDiscoveryDone
represents whether the first-discovery has been done.
...
Why do we need the unassignedInitialPartitons
collection? Because we cannot guarantee that all first-discovery partitions will be assigned before checkpointing. After a restart, all partitions will be re-discovered, but only whether the partition has been assigned can be determined now, not whether the unassigned partition is a first-discovered partition or a new partition. Therefore, unassignedInitialPartitons
is needed to represent the first-discovered partitions that have not been assigned, which should be an empty set during normal operation and will not take up much storage.
Why do we need firstDiscoveryDone
initialDiscoveryFinished? Only relying on the unassignedInitialPartitons
attribute cannot distinguish between the following two cases (which often occur in pattern mode):
...
Code Block | ||
---|---|---|
| ||
public class KafkaSourceEnumState { private final Set<KafkaPartitionWithAssignStatus> assignedAndInitialPartitions; private final boolean firstDiscoveryDoneinitialDiscoveryFinished; } class KafkaPartitionWithAssignStatus { private final TopicPartition topicPartition; private int assignStatus; } enum KafkaPartitionSplitAssignStatus{ ASSIGNED, UNASSIGNED_INITIAL } |
...