Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Dynamic partition discovery is disabled by default, and users have to specify the interval of discovery in order to turn it on.
  2. The strategy used for new partitions is same as the initial offset strategy, instead of the earliest strategy. According to the semantics, if the startup strategy is latest, the consumed data should include all data from the moment of startup, which also includes all messages from new created partitions. However, the latest strategy currently maybe used for new partitions, leading to the loss of some data (thinking a new partition is created and might be discovered by Kafka source several minutes later, and the message produced into the partition within the gap might be dropped if we use for example "latest" as the initial offset strategy).if the data from all new partitions is not read, it does not meet the user's expectations.

When the second case will occur? Currently, there are three ways to specify partitions in Kafka: by topic, by partition, and by matching the topic with a regular expression. Currently, if the initial partition number is 0, an error will occur for the first two methods. However, when using a regular expression to match topics, it is allowed to have 0 matched topics.


Therefore, this Flip has two main objectives:

...

KafkaSourceEnumerator

Add variable attributes unAssignedInitialPartitonsunassignedInitialPartitons and initialPartitionDone to KafkaSourceEnumerator, which will be explained in the Proposed Changes section.

Code Block
languagejava
public KafkaSourceEnumerator(
            KafkaSubscriber subscriber,
            OffsetsInitializer startingOffsetInitializer,
            OffsetsInitializer stoppingOffsetInitializer,
            Properties properties,
            SplitEnumeratorContext<KafkaPartitionSplit> context,
            Boundedness boundedness,
            Set<TopicPartition> assignedPartitions,
    		    Set<TopicPartition> unAssignedInitialPartitons
unassignedInitialPartitons                  boolean noDiscovery
            ) {

}

    public KafkaSourceEnumerator(
            KafkaSubscriber subscriber,
            OffsetsInitializer startingOffsetInitializer,
            OffsetsInitializer stoppingOffsetInitializer,
            Properties properties,
            SplitEnumeratorContext<KafkaPartitionSplit> context,
            Boundedness boundedness) {
        this(
                subscriber,
                startingOffsetInitializer,
                stoppingOffsetInitializer,
                properties,
                context,
                boundedness,
                Collections.emptySet(),
                Collections.emptySet(),
                false
            );
    }

...

Note: The current design only applies to cases where all existing partitions can be discovered at once. If all old partitions cannot be discovered at once, the subsequent old partitions discovered will be treated as new partitions, leading to message duplication. Therefore, this point needs to be particularly noted.


Add

...

unassignedInitialPartitons and firstDiscoveryDone to snapshot state

Add the unAssignedInitialPartitonsunassignedInitialPartitons(collection) and firstDiscoveryDone(boolean) to the snapshot state. unAssignedInitialPartitonsunassignedInitialPartitons represents the collection of first-discovered partitions that have not yet been assigned. firstDiscoveryDone represents whether the first-discovery has been done.

The reason for these two attributes is explained below:

Why do we need the unAssignedInitialPartitonsunassignedInitialPartitons collection? Because we cannot guarantee that all first-discovery partitions will be assigned before checkpointing. After a restart, all partitions will be re-discovered, but only whether the partition has been assigned can be determined now, not whether the unassigned partition is a first-discovered partition or a new partition. Therefore, unAssignedInitialPartitonsunassignedInitialPartitons is needed to represent the first-discovered partitions that have not been assigned, which should be an empty set during normal operation and will not take up much storage.

Why do we need firstDiscoveryDone? Only relying on the unAssignedInitialPartitonsunassignedInitialPartitons attribute cannot distinguish between the following two cases (which often occur in pattern mode):

  • The first partition discovery is so slow, before which the checkpoint is executed and then job is restarted . At this time, the restored unAssignedInitialPartitonsunassignedInitialPartitons is an empty set, which means non-discovery. The next discovery will be treated as first discovery.
  • The first time the partition is discovered is empty, and new partitions can only be found after multiple partition discoveries. If a restart occurs between this period, the restored unAssignedInitialPartitonsunassignedInitialPartitons is also an empty set, which means empty-discovery.The next discovery will be treated as new discovery.

...

Code Block
languagejava
public class KafkaSourceEnumState {
    private final Set<TopicPartition> assignedPartitions;
	  private final Set<TopicPartition> unAssignedInitialPartitonsunassignedInitialPartitons;
    private final boolean firstDiscoveryDone;
}



 Merge assignedPartitions and

...

unassignedInitialPartitons collections

Following the previous plan, the snapshot state needs to save both assignedPartitions and unAssignedInitialPartitonsunassignedInitialPartitons collections. Item of both collections are TopicPartition, but with different status. Therefore, we can use one collection to pull all them in.

With a status, it would be easier to extend it for rules for discovering more partitions later.

Every Kafka partition in the Kafka snapshot state will have a "type" attribute indicating whether it is assigned or initial:

...