Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Dynamic partition discovery is disabled by default, and users have to specify the interval of discovery in order to turn it on.
  2. The strategy used for new partitions is same as the initial offset strategy, instead of the earliest strategy. According to the semantics, if the startup strategy is latest, the consumed data should include all data from the moment of startup, which also includes all messages from new created partitions. However, the latest strategy currently maybe used for new partitions, leading to the loss of some data (thinking a new partition is created and might be discovered by Kafka source several minutes later, and the message produced into the partition within the gap might be dropped if we use for example "latest" as the initial offset strategy).if the data from all new partitions is not read, it does not meet the user's expectations.

...

  1. Enable partition discovery by default.
  2. Modify OffsetsInitializer to provide a different strategy for later discovered partitions, by which users can also customize strategies for new and old partitions.For the provided OffsetsInitializer :

    1.  EARLIEST startup offset strategy:  use EARLIEST for the new discovered partitions
    2.  LATEST startup offset strategy:  use EARLIEST for the new discovered partitions
    3. TIMESTAMP startup offset strategy:  use TIMESTAMP for the new discovered partitions
    4. SPECIFIC_OFFSET  startup offset strategy:   use SPECIFIC_OFFSET for the new discovered partitions
    Use earliest as the offset strategy for new partitions after the first discovery , regardless of the startup offset strategy. Except for the first partition discovered during a clean startup, which uses the user-specified startup strategy, all subsequent partitions (including newly added partitions after a failure restart) should use earliest.

 

Public Interfaces

Kafka table source (Table / SQL API)

...

Both negative and zero will be interpreted as disabling the feature.


OffsetsInitializer

Modify OffsetsInitializer#getPartitionOffsets to adopt different offset strategies for the first and subsequent discoveries of partitions:

Code Block
languagejava
@PublicEvolving
public interface OffsetsInitializer extends Serializable {
		
    Map<TopicPartition, Long> getPartitionOffsets(
            Collection<TopicPartition> partitions,
            PartitionOffsetsRetriever partitionOffsetsRetriever,
			boolean firstDiscovery);
}


KafkaSourceEnumerator

Add variable attributes unassignedInitialPartitons and initialPartitionDone to KafkaSourceEnumerator, which will be explained in the Proposed Changes section.

...