Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note: The current design only applies to cases where all existing partitions can be discovered at once. If all old partitions cannot be discovered at once, the subsequent old partitions discovered will be treated as new partitions, leading to message duplication. Therefore, this point needs to be particularly noted.


...

Add unassignedInitialPartitons and

...

initialDiscoveryFinished to snapshot state

Add the unassignedInitialPartitons(collection) and firstDiscoveryDone(boolean) to the snapshot state. unassignedInitialPartitons represents the collection of first-discovered partitions that have not yet been assigned. firstDiscoveryDone represents whether the first-discovery has been done.

...

Why do we need the unassignedInitialPartitons collection? Because we cannot guarantee that all first-discovery partitions will be assigned before checkpointing. After a restart, all partitions will be re-discovered, but only whether the partition has been assigned can be determined now, not whether the unassigned partition is a first-discovered partition or a new partition. Therefore, unassignedInitialPartitons is needed to represent the first-discovered partitions that have not been assigned, which should be an empty set during normal operation and will not take up much storage.

Why do we need firstDiscoveryDone initialDiscoveryFinished? Only relying on the unassignedInitialPartitons attribute cannot distinguish between the following two cases (which often occur in pattern mode):

...

Code Block
languagejava
public class KafkaSourceEnumState {
    private final Set<KafkaPartitionWithAssignStatus> assignedAndInitialPartitions;
    private final boolean firstDiscoveryDoneinitialDiscoveryFinished;
}

class KafkaPartitionWithAssignStatus {
    private final TopicPartition topicPartition;
    private int assignStatus;
}

enum KafkaPartitionSplitAssignStatus{
    ASSIGNED,
    UNASSIGNED_INITIAL
}


...