Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When Kafka partitions are increased, users want to be able to dynamically discover new partitions without having to restart the Flink job. Although Flink already provides the ability of dynamic partition discovery, there are still two some problems:

  1. Dynamic partition discovery is disabled by default, and users have to specify the interval of discovery in order to turn it on.
  2. The strategy used for new partitions is same as the initial offset strategy. According to the semantics, if the startup strategy is latest, the consumed data should include all data from the moment of startup, which also includes all messages from new created partitions. However, the latest strategy currently maybe used for new partitions, leading to the loss of some data (thinking a new partition is created and might be discovered by Kafka source several minutes later, and the message produced into the partition within the gap might be dropped if we use for example "latest" as the initial offset strategy).if the data from all new partitions is not read, it does not meet the user's expectations.
  3. OffsetsInitializers do not behave as described in the current JavaDoc. There will be a different offset strategy when the partition does not meet the condition.(See Public Interfaces Section).


Therefore, this Flip has two there main objectives:

  1. Enable partition discovery by default.
  2. Provide a different strategy for later discovered partitions, by which users can also customize strategies for new-discovered and initial partitions.

  3. Organize the code logic of the current built-in OffsetsInitializer, then modify the JavaDoc to let users know.

 

Public Interfaces

JavaDoc of SpecifiedOffsetsInitializer

add this: "Use Specified offset for specified partitions while  use Earliest for unspecified partitions. Specified partitions should less than latest offset."


JavaDoc of TimestampOffsetsInitializer

add this: "Initialize the offsets based on a timestamp. If the message meeting the requirement of the timestamp have not been produced to Kafka yet, just use the latest offset"


Kafka table source (Table / SQL API)

...

Both negative and zero will be interpreted as disabling the feature.

KafkaSourceBuilder

Add the OffsetsInitializer for the newly discovered partition. Its default value is EARLIEST.

...

languagejava
titleKafkaSourceBuilder

...

.

...


KafkaSourceEnumerator

Add variable attributes newDiscoveryOffsetsInitializerunassignedInitialPartitons and initialPartitionDone to KafkaSourceEnumerator, which will be explained in the Proposed Changes section.

...

However, this method is only to reduce one boolean variable in the snapshot state, but the event-loop thread will be blocked for this purpose, which is not worth it. Partition discovery can be a heavily time-consuming operation, especially when pattern matching a large number of topics. In this case, the SourceCoordinator cannot process other event operations during the waiting period, such as Reader registration.


User specifies OffsetsInitializer for new partition

Plan

The Add the OffsetsInitializer for the newly discovered partition. Its default value is EARLIEST. If the user needs to, they can adopt different strategies.

Code Block
languagejava
titleKafkaSourceBuilder
 public class KafkaSourceBuilder<OUT> {
  public KafkaSourceBuilder<OUT> setNewDiscoveryOffsets(
            OffsetsInitializer newDiscoveryOffsetsInitializer) {
        this.newDiscoveryOffsetsInitializer = newDiscoveryOffsetsInitializer;
        return this;
    }
}


Reject Reason

First, let's take a look at what happens in the current SpecifiedOffsetsInitializer. When use SpecifiedOffsetsInitializer, the following strategy is adopted for unspecified partitions(See SpecifiedOffsetsInitializer#getPartitionOffsets):

  1. Use committed offset first.However, new partitions don’t have committed offsets.
  2. If there is no committed offset, use Kafka's OffsetResetStrategy. Currently, Kafka's OffsetResetStrategy is set to Earliest. (See KafkaDynamicSource#createKafkaSource -> OffsetsInitializer#offsets)

That is, for specified partitions, use Specified offset, and for unspecified partitions, use Earliest.

For continuous streaming jobs, the latter is more common.

The problem of the former is that the latest offset of a new partition is generally 0 or a small value when discovery, so if the Specified offset is greater than the that value, an error will occur when consuming. It depends on how many messages are written between partition creation and discovery, which is very unpredictable and unreasonable. Using Earliest ensures that a message definitely exists.


Next, let's examine what happens in the current TimestampOffsetsInitializer. If the timestamp is too late, it will fail to hit the new partition’s message. In this case, the offset will be set to the latest offset (see TimestampOffsetsInitializer#getPartitionOffsets).

It would be difficult to estimate the time if the new partition uses timestamps. If we set the new partition as a timestamp during startup:

  • For the new partitions before this timestamp, they will be immediately adopted using the latest version once discovered, instead of waiting until the timestamp. It also depends on how many messages are written between partition creation and discovery, which is very unpredictable and unreasonable.
  • For the new partitions created after this timestamp, it will be equivalent to the earliest.(For continuous streaming jobs, this is more common.)

For example, if the timestamp is set to tomorrow at 7:00, partitions created before 7:00 tomorrow will be consumed from the latest offset of discovered moment, while partitions created after 7:00 tomorrow will be consumed from the earliest offset.

The timestamp only affects existing partitions and can start from a certain point in time. It should not be used for new partitions.


After analyzing the above,  using EARLIEST is the most reasonable option.