Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

add this: "Use Specified offset for specified partitions while  use Earliest for unspecified partitions. Specified partitions should be less than the latest offset, otherwise it will start from the earliest. "


JavaDoc of TimestampOffsetsInitializer

...

Code Block
languagejava
titleKafkaSourceBuilder
 public class KafkaSourceBuilder<OUT> {
  public KafkaSourceBuilder<OUT> setNewDiscoveryOffsets(
            OffsetsInitializer newDiscoveryOffsetsInitializer) {
        this.newDiscoveryOffsetsInitializer = newDiscoveryOffsetsInitializer;
        return this;
    }
}


Reject Reason

SpecifiedOffsetsInitializer

First, let's take a look at what happens in the current SpecifiedOffsetsInitializer. When use SpecifiedOffsetsInitializer, the following strategy is adopted for unspecified partitions(See SpecifiedOffsetsInitializer#getPartitionOffsets):

  1. Use committed offset first.However, new partitions don’t have committed offsets.
  2. If there is no committed offset, use Kafka's OffsetResetStrategy. Currently, Kafka's OffsetResetStrategy is set to Earliest. (See KafkaDynamicSource#createKafkaSource -> OffsetsInitializer#offsets)

Image Added

That is, for specified partitions, use Specified offset, and for unspecified partitions, use Earliest.

For continuous streaming jobs, the latter is unspecified partitions are more common.

The problem of the former specified partitions is that the latest offset of a new partition is generally 0 or a small value when discovery, so if the Specified which is very unpredictable and unreasonable. If the specified offset is greater than the that value, an error will occur when consuming. It depends on how many messages are written between partition creation and discovery, which is very unpredictable and unreasonable. Using Earliest ensures that a message definitely exists.Kafka Client will consume according to the OffsetResetStrategy, that is EARLIEST (See org.apache.kafka.clients.consumer.internals.Fetcher#handleOffsetOutOfRange).

image.pngImage Added

Using "Earliest" ensures that the same behavior of consuming offsets for new partitions is maintained.


TimestampOffsetsInitializer 

Next, let's examine what happens in the current TimestampOffsetsInitializer. If the timestamp is too late, it will fail to hit the new partition’s message. In this case, the offset will be set to the latest offset (see TimestampOffsetsInitializer#getPartitionOffsets).

...