Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/581f2xq5d1tlwc8gcr27gwkp3zp0wrg6
Vote threadhttps://lists.apache.org/thread/opbg0k2v2kdsyt6jt8c334kgv7mo8rk5
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-31953

Releasekafka-4.0.0


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

add this: "Use Specified offset for specified partitions while  while use commit offset or Earliest for unspecified partitions. Specified partition offset should be less than the latest offset, otherwise it will start from the earliest. "

...

KafkaSourceEnumerator

Add variable attributes newDiscoveryOffsetsInitializerattributes unassignedInitialPartitons and initialPartitionDone to initialDiscoveryFinished to KafkaSourceEnumerator, which will be explained in the Proposed Changes section.

Code Block
languagejava
titleKafkaSourceEnumerator
public KafkaSourceEnumerator(
            KafkaSubscriber subscriber,
            OffsetsInitializer startingOffsetInitializer,
            OffsetsInitializer stoppingOffsetInitializer,
            Properties properties,
            SplitEnumeratorContext<KafkaPartitionSplit> context,
            Boundedness boundedness,
            Set<TopicPartition> assignedPartitions,
    		 Set<TopicPartition> unassignedInitialPartitons,
             boolean noDiscovery
initialDiscoveryFinished                  ) {

}

    public KafkaSourceEnumerator(
            KafkaSubscriber subscriber,
            OffsetsInitializer startingOffsetInitializer,
            OffsetsInitializer stoppingOffsetInitializer,
            Properties properties,
            SplitEnumeratorContext<KafkaPartitionSplit> context,
            Boundedness boundedness) {
        this(
                subscriber,
                startingOffsetInitializer,
                stoppingOffsetInitializer,
                properties,
                context,
                boundedness,
                Collections.emptySet(),
                Collections.emptySet(),
                true
            );
    }

 



KafkaSourceEnumState

Depends on the proposed changes.


Code Block
public class KafkaSourceEnumState {
    private final Set<KafkaPartitionWithAssignStatus> assignedAndInitialPartitions;
    private final boolean initialDiscoveryFinished;
}

class KafkaPartitionWithAssignStatus {
    private final TopicPartition  truetopicPartition;
    private int assignStatus;
}

enum KafkaPartitionSplitAssignStatus{
     );ASSIGNED,
    UNASSIGNED_INITIAL
}

 

KafkaSourceEnumState

Depends on the proposed changes.



Proposed Changes

Main Idea

...

Note: The current design only applies to cases where all existing partitions can be discovered at once. If all old partitions cannot be discovered at once, the subsequent old partitions discovered will be treated as new partitions, leading to message duplication. Therefore, this point needs to be particularly noted.


...

Add unassignedInitialPartitons and

...

initialDiscoveryFinished to snapshot state

Add the unassignedInitialPartitons(collection) and firstDiscoveryDone initialDiscoveryFinished(boolean) to the snapshot state. unassignedInitialPartitons represents the collection of first-discovered partitions that have not yet been assigned. firstDiscoveryDone represents whether the first-discovery has been done.

...

Why do we need the unassignedInitialPartitons collection? Because we cannot guarantee that all first-discovery partitions will be assigned before checkpointing. After a restart, all partitions will be re-discovered, but only whether the partition has been assigned can be determined now, not whether the unassigned partition is a first-discovered partition or a new partition. Therefore, unassignedInitialPartitons is needed to represent the first-discovered partitions that have not been assigned, which should be an empty set during normal operation and will not take up much storage.

Why do we need firstDiscoveryDone initialDiscoveryFinished? Only relying on the unassignedInitialPartitons attribute cannot distinguish between the following two cases (which often occur in pattern mode):

...

Code Block
languagejava
public class KafkaSourceEnumState {
    private final Set<TopicPartition> assignedPartitions;
	  private final Set<TopicPartition> unassignedInitialPartitons;
    private final boolean firstDiscoveryDoneinitialDiscoveryFinished;
}



 Merge assignedPartitions and unassignedInitialPartitons collections

...

Code Block
languagejava
public class KafkaSourceEnumState {
    private final Set<KafkaPartitionWithAssignStatus> assignedAndInitialPartitions;
    private final boolean firstDiscoveryDoneinitialDiscoveryFinished;
}

class KafkaPartitionWithAssignStatus {
    private final TopicPartition topicPartition;
    private int assignStatus;
}

enum KafkaPartitionSplitAssignStatus{
    ASSIGNED,
    UNASSIGNED_INITIAL
}


...

Disadvantage: The partition discovery, partition assignment, and checkpoint operation are coupled together. if the partition is not assigned before checkpointing, the SourceCoordinator's event-loop thread will be blocked, but partition assignment also requires the event-loop thread to execute, which will cause thread self-locking. Therefore, the existing thread model needs to be changed, which is more complicated.


Alternative to the

...

initialDiscoveryFinished variable

As mentioned before, the reason for introducing firstDiscoveryDone is that, after restarting, if the unAssignedInitialPartitions in the snapshot state is empty, it cannot be determined whether it’s because of non-discovery or empty-discovery.

...

Using "Earliest" ensures that the same behavior of consuming offsets for new partitions is maintained.

The timestamp specified offset strategy only affects existing partitions and can start from a certain point in middle of offset range . It should not be used for new partitions.

...

For example, if the timestamp is set to tomorrow at 7:00, partitions created before 7:00 tomorrow will be consumed from the latest offset of discovered moment, while partitions created after 7:00 tomorrow will be consumed from the earliest offset.

The timestamp strategy only affects existing partitions and can start from a certain point in middle of time range. It should not be used for new partitions.

...