...
Add variable attributes unassignedInitialPartitons and initialPartitionDone to initialDiscoveryFinished to KafkaSourceEnumerator, which will be explained in the Proposed Changes section.
Code Block | ||||
---|---|---|---|---|
| ||||
public KafkaSourceEnumerator( KafkaSubscriber subscriber, OffsetsInitializer startingOffsetInitializer, OffsetsInitializer stoppingOffsetInitializer, Properties properties, SplitEnumeratorContext<KafkaPartitionSplit> context, Boundedness boundedness, Set<TopicPartition> assignedPartitions, Set<TopicPartition> unassignedInitialPartitons, boolean noDiscovery initialDiscoveryFinished ) { } public KafkaSourceEnumerator( KafkaSubscriber subscriber, OffsetsInitializer startingOffsetInitializer, OffsetsInitializer stoppingOffsetInitializer, Properties properties, SplitEnumeratorContext<KafkaPartitionSplit> context, Boundedness boundedness) { this( subscriber, startingOffsetInitializer, stoppingOffsetInitializer, properties, context, boundedness, Collections.emptySet(), Collections.emptySet(), true ); } |
...
Depends on the proposed changes.
Code Block |
---|
public class KafkaSourceEnumState {
private final Set<KafkaPartitionWithAssignStatus> assignedAndInitialPartitions;
private final boolean initialDiscoveryFinished;
}
class KafkaPartitionWithAssignStatus {
private final TopicPartition topicPartition;
private int assignStatus;
}
enum KafkaPartitionSplitAssignStatus{
ASSIGNED,
UNASSIGNED_INITIAL
}
|
Proposed Changes
Main Idea
...
Disadvantage: The partition discovery, partition assignment, and checkpoint operation are coupled together. if the partition is not assigned before checkpointing, the SourceCoordinator's event-loop thread will be blocked, but partition assignment also requires the event-loop thread to execute, which will cause thread self-locking. Therefore, the existing thread model needs to be changed, which is more complicated.
Alternative to the
...
initialDiscoveryFinished variable
As mentioned before, the reason for introducing firstDiscoveryDone
is that, after restarting, if the unAssignedInitialPartitions
in the snapshot state is empty, it cannot be determined whether it’s because of non-discovery or empty-discovery.
...