...
- Dynamic partition discovery is disabled by default, and users have to specify the interval of discovery in order to turn it on.
- The strategy used for new partitions is same as the initial offset strategy, instead of the earliest strategy. According to the semantics, if the startup strategy is latest, the consumed data should include all data from the moment of startup, which also includes all messages from new created partitions. However, the latest strategy currently maybe used for new partitions, leading to the loss of some data (thinking a new partition is created and might be discovered by Kafka source several minutes later, and the message produced into the partition within the gap might be dropped if we use for example "latest" as the initial offset strategy).if the data from all new partitions is not read, it does not meet the user's expectations.
...
- Enable partition discovery by default.
Modify OffsetsInitializer to provide a different strategy for later discovered partitions, by which users can also customize strategies for new and old partitions.For the provided OffsetsInitializer :
- EARLIEST startup offset strategy: use EARLIEST for the new discovered partitions
- LATEST startup offset strategy: use EARLIEST for the new discovered partitions
- TIMESTAMP startup offset strategy: use TIMESTAMP for the new discovered partitions
- SPECIFIC_OFFSET startup offset strategy: use SPECIFIC_OFFSET for the new discovered partitions
Public Interfaces
Kafka table source (Table / SQL API)
...
Both negative and zero will be interpreted as disabling the feature.
OffsetsInitializer
Modify OffsetsInitializer#getPartitionOffsets to adopt different offset strategies for the first and subsequent discoveries of partitions:
Code Block | ||
---|---|---|
| ||
@PublicEvolving
public interface OffsetsInitializer extends Serializable {
Map<TopicPartition, Long> getPartitionOffsets(
Collection<TopicPartition> partitions,
PartitionOffsetsRetriever partitionOffsetsRetriever,
boolean firstDiscovery);
} |
KafkaSourceEnumerator
Add variable attributes unassignedInitialPartitons
and initialPartitionDone
to KafkaSourceEnumerator, which will be explained in the Proposed Changes section.
...