Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Dynamic partition discovery is disabled by default, and users have to specify the interval of discovery in order to turn it on.
  2. The strategy used for new partitions is same as the initial offset strategy. According to the semantics, if the startup strategy is latest, the consumed data should include all data from the moment of startup, which also includes all messages from new created partitions. However, the latest strategy currently maybe used for new partitions, leading to the loss of some data (thinking a new partition is created and might be discovered by Kafka source several minutes later, and the message produced into the partition within the gap might be dropped if we use for example "latest" as the initial offset strategy).if the data from all new partitions is not read, it does not meet the user's expectations.

When the second case will occur? Currently, there are three ways to specify partitions in Kafka: by topic, by partition, and by matching the topic with a regular expression. Currently, if the initial partition number is 0, an error will occur for the first two methods. However, when using a regular expression to match topics, it is allowed to have 0 matched topics.


Therefore, this Flip has two main objectives:

  1. Enable partition discovery by default.
  2. Modify OffsetsInitializer to provide Provide a different strategy for later discovered partitions, by which users can also customize strategies for new-discovered and old partitions.For the provided OffsetsInitializer :

  3.  EARLIEST startup offset strategy:  use EARLIEST for the new discovered partitions
  4.  LATEST startup offset strategy:  use EARLIEST for the new discovered partitions
  5. TIMESTAMP startup offset strategy:  use TIMESTAMP for the new discovered partitions
  6. SPECIFIC_OFFSET  startup offset strategy:   use SPECIFIC_OFFSET for the new discovered partitions

    initial partitions.

 

Public Interfaces

Kafka table source (Table / SQL API)

...

Both negative and zero will be interpreted as disabling the feature.

OffsetsInitializer

...


KafkaSourceBuilder

Add the OffsetsInitializer for the newly discovered partition. Its default value is EARLIEST.

Code Block
languagejava
titleKafkaSourceBuilder
 public class KafkaSourceBuilder<OUT> {
  public KafkaSourceBuilder<OUT> setNewDiscoveryOffsets(
 @PublicEvolving
public interface OffsetsInitializer extends Serializable {
		
    Map<TopicPartition, Long> getPartitionOffsets(
     OffsetsInitializer newDiscoveryOffsetsInitializer) {
     Collection<TopicPartition> partitions,
  this.newDiscoveryOffsetsInitializer = newDiscoveryOffsetsInitializer;
        PartitionOffsetsRetrieverreturn partitionOffsetsRetriever,
			boolean firstDiscovery);this;
    }
}


KafkaSourceEnumerator

Add variable attributes newDiscoveryOffsetsInitializerunassignedInitialPartitons and initialPartitionDone to KafkaSourceEnumerator, which will be explained in the Proposed Changes section.

Code Block
languagejava
titleKafkaSourceEnumerator
public KafkaSourceEnumerator(
            KafkaSubscriber subscriber,
            OffsetsInitializer startingOffsetInitializer,
            OffsetsInitializer stoppingOffsetInitializer,
			OffsetsInitializer newDiscoveryOffsetsInitializer
            Properties properties,
            SplitEnumeratorContext<KafkaPartitionSplit> context,
            Boundedness boundedness,
            Set<TopicPartition> assignedPartitions,
    		 Set<TopicPartition> unassignedInitialPartitons,
             boolean noDiscovery
            ) {

}

    public KafkaSourceEnumerator(
            KafkaSubscriber subscriber,
            OffsetsInitializer startingOffsetInitializer,
            OffsetsInitializer stoppingOffsetInitializer,
			OffsetsInitializer newDiscoveryOffsetsInitializer,
            Properties properties,
            SplitEnumeratorContext<KafkaPartitionSplit> context,
            Boundedness boundedness) {
        this(
                subscriber,
                startingOffsetInitializer,
                stoppingOffsetInitializer,
                properties,
                context,
                boundedness,
                Collections.emptySet(),
                Collections.emptySet(),
                true
            );
    }

 



KafkaSourceEnumState

Depends on the proposed changes.

...

Add unassignedInitialPartitons and firstDiscoveryDone to snapshot state

Add the unassignedInitialPartitons(collection) and firstDiscoveryDone(boolean) to the snapshot state. unassignedInitialPartitons represents the collection of first-discovered partitions that have not yet been assigned. firstDiscoveryDone represents whether the first-discovery has been done.

The reason for these two attributes is explained below:

Why do we need the unassignedInitialPartitons collection? Because we cannot guarantee that all first-discovery partitions will be assigned before checkpointing. After a restart, all partitions will be re-discovered, but only whether the partition has been assigned can be determined now, not whether the unassigned partition is a first-discovered partition or a new partition. Therefore, unassignedInitialPartitons is needed to represent the first-discovered partitions that have not been assigned, which should be an empty set during normal operation and will not take up much storage.

Why do we need firstDiscoveryDone? Only relying on the unassignedInitialPartitons attribute cannot distinguish between the following two cases (which often occur in pattern mode):

  • The first partition discovery is so slow, before which the checkpoint is executed and then job is restarted . At this time, the restored unassignedInitialPartitons is an empty set, which means non-discovery. The next discovery will be treated as first discovery.
  • The first time the partition is discovered is empty, and new partitions can only be found after multiple partition discoveries. If a restart occurs between this period, the restored unassignedInitialPartitons is also an empty set, which means empty-discovery.The next discovery will be treated as new discovery.

Now that the partitions must be greater than 0 when creating topics, when the second case will occur? Currently, there There are three ways to specify partitions in Kafka: by topic, by partition, and by matching the topic with a regular expression. Currently, if the initial partition number is 0, an error will occur for the first two methods. However, when using a regular expression to match topics, it is allowed to have 0 matched topics.

...

Following the previous plan, the snapshot state needs to save both assignedPartitions and unassignedInitialPartitons collections. Item of both collections are TopicPartition, but with different status. Therefore, we can use one collection to pull all them in.

...

  • assigned: the partition has already been assigned to the reader , which can be restored as assignedPartition.
  • unassignedInitial: unassigned part of the first-discovered partition, which can be restored as unAssignedInitialPartitions.



The following is a modification of KafkaSourceEnumState:

...