...
Page properties | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Add variable attributes unassignedInitialPartitons and initialPartitionDone to initialDiscoveryFinished to KafkaSourceEnumerator, which will be explained in the Proposed Changes section.
Code Block | ||||
---|---|---|---|---|
| ||||
public KafkaSourceEnumerator( KafkaSubscriber subscriber, OffsetsInitializer startingOffsetInitializer, OffsetsInitializer stoppingOffsetInitializer, Properties properties, SplitEnumeratorContext<KafkaPartitionSplit> context, Boundedness boundedness, Set<TopicPartition> assignedPartitions, Set<TopicPartition> unassignedInitialPartitons, boolean noDiscovery initialDiscoveryFinished ) ) { } public KafkaSourceEnumerator( KafkaSubscriber subscriber, OffsetsInitializer startingOffsetInitializer, OffsetsInitializer stoppingOffsetInitializer, Properties properties, SplitEnumeratorContext<KafkaPartitionSplit> context, Boundedness boundedness) { this( subscriber, startingOffsetInitializer, stoppingOffsetInitializer, properties, context, boundedness, Collections.emptySet(), Collections.emptySet(), true ); } |
KafkaSourceEnumState
Depends on the proposed changes.
Code Block |
---|
public class KafkaSourceEnumState { private final Set<KafkaPartitionWithAssignStatus> assignedAndInitialPartitions; private final boolean initialDiscoveryFinished; } class KafkaPartitionWithAssignStatus { private final trueTopicPartition topicPartition; private int assignStatus; } enum KafkaPartitionSplitAssignStatus{ );ASSIGNED, UNASSIGNED_INITIAL } |
...
Depends on the proposed changes.
Proposed Changes
Main Idea
...
Note: The current design only applies to cases where all existing partitions can be discovered at once. If all old partitions cannot be discovered at once, the subsequent old partitions discovered will be treated as new partitions, leading to message duplication. Therefore, this point needs to be particularly noted.
...
Add unassignedInitialPartitons
and
...
initialDiscoveryFinished to snapshot state
Add the unassignedInitialPartitons
(collection) and firstDiscoveryDone
initialDiscoveryFinished(boolean) to the snapshot state. unassignedInitialPartitons
represents the collection of first-discovered partitions that have not yet been assigned. firstDiscoveryDone
represents whether the first-discovery has been done.
...
Why do we need the unassignedInitialPartitons
collection? Because we cannot guarantee that all first-discovery partitions will be assigned before checkpointing. After a restart, all partitions will be re-discovered, but only whether the partition has been assigned can be determined now, not whether the unassigned partition is a first-discovered partition or a new partition. Therefore, unassignedInitialPartitons
is needed to represent the first-discovered partitions that have not been assigned, which should be an empty set during normal operation and will not take up much storage.
Why do we need firstDiscoveryDone
initialDiscoveryFinished? Only relying on the unassignedInitialPartitons
attribute cannot distinguish between the following two cases (which often occur in pattern mode):
...
Code Block | ||
---|---|---|
| ||
public class KafkaSourceEnumState { private final Set<TopicPartition> assignedPartitions; private final Set<TopicPartition> unassignedInitialPartitons; private final boolean firstDiscoveryDoneinitialDiscoveryFinished; } |
Merge assignedPartitions
and unassignedInitialPartitons
collections
...
Code Block | ||
---|---|---|
| ||
public class KafkaSourceEnumState { private final Set<KafkaPartitionWithAssignStatus> assignedAndInitialPartitions; private final boolean firstDiscoveryDoneinitialDiscoveryFinished; } class KafkaPartitionWithAssignStatus { private final TopicPartition topicPartition; private int assignStatus; } enum KafkaPartitionSplitAssignStatus{ ASSIGNED, UNASSIGNED_INITIAL } |
...
Disadvantage: The partition discovery, partition assignment, and checkpoint operation are coupled together. if the partition is not assigned before checkpointing, the SourceCoordinator's event-loop thread will be blocked, but partition assignment also requires the event-loop thread to execute, which will cause thread self-locking. Therefore, the existing thread model needs to be changed, which is more complicated.
Alternative to the
...
initialDiscoveryFinished variable
As mentioned before, the reason for introducing firstDiscoveryDone
is that, after restarting, if the unAssignedInitialPartitions
in the snapshot state is empty, it cannot be determined whether it’s because of non-discovery or empty-discovery.
...