Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Add variable attributes unassignedInitialPartitons and initialPartitionDone to initialDiscoveryFinished to KafkaSourceEnumerator, which will be explained in the Proposed Changes section.

Code Block
languagejava
titleKafkaSourceEnumerator
public KafkaSourceEnumerator(
            KafkaSubscriber subscriber,
            OffsetsInitializer startingOffsetInitializer,
            OffsetsInitializer stoppingOffsetInitializer,
            Properties properties,
            SplitEnumeratorContext<KafkaPartitionSplit> context,
            Boundedness boundedness,
            Set<TopicPartition> assignedPartitions,
    		 Set<TopicPartition> unassignedInitialPartitons,
             boolean noDiscovery
initialDiscoveryFinished                  ) {

}

    public KafkaSourceEnumerator(
            KafkaSubscriber subscriber,
            OffsetsInitializer startingOffsetInitializer,
            OffsetsInitializer stoppingOffsetInitializer,
            Properties properties,
            SplitEnumeratorContext<KafkaPartitionSplit> context,
            Boundedness boundedness) {
        this(
                subscriber,
                startingOffsetInitializer,
                stoppingOffsetInitializer,
                properties,
                context,
                boundedness,
                Collections.emptySet(),
                Collections.emptySet(),
                true
            );
    }

 

...

Depends on the proposed changes.


Code Block
public class KafkaSourceEnumState {
    private final Set<KafkaPartitionWithAssignStatus> assignedAndInitialPartitions;
    private final boolean initialDiscoveryFinished;
}

class KafkaPartitionWithAssignStatus {
    private final TopicPartition topicPartition;
    private int assignStatus;
}

enum KafkaPartitionSplitAssignStatus{
    ASSIGNED,
    UNASSIGNED_INITIAL
}

 



Proposed Changes

Main Idea

...

Disadvantage: The partition discovery, partition assignment, and checkpoint operation are coupled together. if the partition is not assigned before checkpointing, the SourceCoordinator's event-loop thread will be blocked, but partition assignment also requires the event-loop thread to execute, which will cause thread self-locking. Therefore, the existing thread model needs to be changed, which is more complicated.


Alternative to the

...

initialDiscoveryFinished variable

As mentioned before, the reason for introducing firstDiscoveryDone is that, after restarting, if the unAssignedInitialPartitions in the snapshot state is empty, it cannot be determined whether it’s because of non-discovery or empty-discovery.

...