...
Page properties | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
- Dynamic partition discovery is disabled by default, and users have to specify the interval of discovery in order to turn it on.
- The strategy used for new partitions is same as the initial offset strategy. According to the semantics, if the startup strategy is latest, the consumed data should include all data from the moment of startup, which also includes all messages from new created partitions. However, the latest strategy currently maybe used for new partitions, leading to the loss of some data (thinking a new partition is created and might be discovered by Kafka source several minutes later, and the message produced into the partition within the gap might be dropped if we use for example "latest" as the initial offset strategy).if the data from all new partitions is not read, it does not meet the user's expectations. Other ploblems see final Section:
User specifies OffsetsInitializer for new partition
. - Current JavaDoc does not display the whole behavior of OffsetsInitializers. There will be a different offset strategy when the partition does not meet the condition.(See Public Interfaces Sectionfinal Section:
User specifies OffsetsInitializer for new partition
.).
Therefore, this Flip has there main objectives:
...
add this: "Use Specified offset for specified partitions while while use commit offset or Earliest for unspecified partitions. Specified partition offset should be less than the latest offset, otherwise it will start from the earliest. "
...
KafkaSourceEnumerator
Add variable attributes newDiscoveryOffsetsInitializer、 attributes unassignedInitialPartitons and initialPartitionDone to initialDiscoveryFinished to KafkaSourceEnumerator, which will be explained in the Proposed Changes section.
Code Block | ||||
---|---|---|---|---|
| ||||
public KafkaSourceEnumerator( KafkaSubscriber subscriber, OffsetsInitializer startingOffsetInitializer, OffsetsInitializer stoppingOffsetInitializer, OffsetsInitializer newDiscoveryOffsetsInitializer Properties properties, SplitEnumeratorContext<KafkaPartitionSplit> context, Boundedness boundedness, Set<TopicPartition> assignedPartitions, Set<TopicPartition> unassignedInitialPartitons, boolean noDiscovery initialDiscoveryFinished ) { } public KafkaSourceEnumerator( KafkaSubscriber subscriber, OffsetsInitializer startingOffsetInitializer, OffsetsInitializer stoppingOffsetInitializer, OffsetsInitializer newDiscoveryOffsetsInitializer, Properties properties, SplitEnumeratorContext<KafkaPartitionSplit> context, Boundedness boundedness) { this( subscriber, startingOffsetInitializer, stoppingOffsetInitializer, properties, context, boundedness, Collections.emptySet(), Collections.emptySet(), true ); } |
...
Depends on the proposed changes.
Proposed Changes
Main Idea
Code Block |
---|
public class KafkaSourceEnumState {
private final Set<KafkaPartitionWithAssignStatus> assignedAndInitialPartitions;
private final boolean initialDiscoveryFinished;
}
class KafkaPartitionWithAssignStatus {
private final TopicPartition topicPartition;
private int assignStatus;
}
enum KafkaPartitionSplitAssignStatus{
ASSIGNED,
UNASSIGNED_INITIAL
}
|
Proposed Changes
Main Idea
Currently, the offset of Currently, the offset of newly discovered partitions is initialized by the user-specified OffsetsInitializer, which could lead to potential data loss (thinking a new partition is created and might be discovered by Kafka source several minutes later, and the message produced into the partition within the gap might be dropped if we use for example "latest" as the initial offset strategy).
...
Note: The current design only applies to cases where all existing partitions can be discovered at once. If all old partitions cannot be discovered at once, the subsequent old partitions discovered will be treated as new partitions, leading to message duplication. Therefore, this point needs to be particularly noted.
...
Add unassignedInitialPartitons
and
...
initialDiscoveryFinished to snapshot state
Add the unassignedInitialPartitons
(collection) and firstDiscoveryDone
initialDiscoveryFinished(boolean) to the snapshot state. unassignedInitialPartitons
represents the collection of first-discovered partitions that have not yet been assigned. firstDiscoveryDone
represents whether the first-discovery has been done.
...
Why do we need the unassignedInitialPartitons
collection? Because we cannot guarantee that all first-discovery partitions will be assigned before checkpointing. After a restart, all partitions will be re-discovered, but only whether the partition has been assigned can be determined now, not whether the unassigned partition is a first-discovered partition or a new partition. Therefore, unassignedInitialPartitons
is needed to represent the first-discovered partitions that have not been assigned, which should be an empty set during normal operation and will not take up much storage.
Why do we need firstDiscoveryDone
initialDiscoveryFinished? Only relying on the unassignedInitialPartitons
attribute cannot distinguish between the following two cases (which often occur in pattern mode):
...
Code Block | ||
---|---|---|
| ||
public class KafkaSourceEnumState { private final Set<TopicPartition> assignedPartitions; private final Set<TopicPartition> unassignedInitialPartitons; private final boolean firstDiscoveryDoneinitialDiscoveryFinished; } |
Merge assignedPartitions
and unassignedInitialPartitons
collections
...
Code Block | ||
---|---|---|
| ||
public class KafkaSourceEnumState { private final Set<KafkaPartitionWithAssignStatus> assignedAndInitialPartitions; private final boolean firstDiscoveryDoneinitialDiscoveryFinished; } class KafkaPartitionWithAssignStatus { private final TopicPartition topicPartition; private int assignStatus; } enum KafkaPartitionSplitAssignStatus{ ASSIGNED, UNASSIGNED_INITIAL } |
...
Disadvantage: The partition discovery, partition assignment, and checkpoint operation are coupled together. if the partition is not assigned before checkpointing, the SourceCoordinator's event-loop thread will be blocked, but partition assignment also requires the event-loop thread to execute, which will cause thread self-locking. Therefore, the existing thread model needs to be changed, which is more complicated.
Alternative to the
...
initialDiscoveryFinished variable
As mentioned before, the reason for introducing firstDiscoveryDone
is that, after restarting, if the unAssignedInitialPartitions
in the snapshot state is empty, it cannot be determined whether it’s because of non-discovery or empty-discovery.
...
For example, if the Specified offset of topic1-1 is set to 10, when when new partition-1 is discovered with an offset offsets range of [0, 5], all messages in the range of [0, 5] will be consumed. If When new partition-1 already has more than 10 messages written when it is discovered with offsets range of [0, 15], then consumption will begin from offset 10. If partition-2 has no specified offset, when new partition-2 is discovered with an offset offsets range of [0, 515], all messages in the range of [0, 515] will be consumed.
Using "Earliest" ensures that the same behavior of consuming offsets for new partitions is maintained.
The specified offset strategy only affects existing partitions and can start from a certain point in middle of offset range . It should not be used for new partitions.
TimestampOffsetsInitializer
...
- For the new partitions before this timestamp, they will be immediately adopted using the latest version offset once discovered, instead of waiting until the timestamp. It also depends on how many messages are written between partition creation and discovery, which is very unpredictable and unreasonable.
- For the new partitions created after this timestamp, it will be equivalent to the earliest.(For continuous streaming jobs, this is more common.)
For example, if the timestamp is set to tomorrow at 7:00, partitions created before 7:00 tomorrow will be consumed from the latest offset of discovered moment, while partitions created after 7:00 tomorrow will be consumed from the earliest offset.
The timestamp only affects existing partitions and can start from a certain point in time. It should not be used for new partitions.
- unreasonable.
- For the new partitions created after this timestamp, it will be equivalent to the earliest.(For continuous streaming jobs, this is more common.)
For example, if the timestamp is set to tomorrow at 7:00, partitions created before 7:00 tomorrow will be consumed from the latest offset of discovered moment, while partitions created after 7:00 tomorrow will be consumed from the earliest offset.
The timestamp strategy only affects existing partitions and can start from a certain point in middle of time range. It should not be used for new partitions.
Conclusion
All these problems can be reproducible in the current version. The reason why they haven't been exposed is probably because users usually set the existing specified offset or timestamp, so it appears as earliest in production.
After analyzing the above, using EARLIEST is the most reasonable option.
All these problems can be reproducible in the current version. The reason why they haven't been exposed is probably because users usually set the existing specified offset or timestamp, so it appears as earliest in production.
All these problems can be reproducible in the current version. The reason why they haven't been exposed is probably because users usually set the existing specified offset or timestamp, so it appears as earliest in production.
All these problems can be reproducible in the current version. The reason why they haven't been exposed is probably because users usually set the existing specified offset or timestamp, so it appears as earliest in productionAfter analyzing the above, using EARLIEST is the most reasonable option.