Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Approvers

Status

Current state

Status
subtletrue
colourBlue
titleUnder DiscussionIN PROGRESS

Discussion thread: here

JIRA: here

Released: <Hudi Version>

Abstract

...

Incremental fetch supports a limit on the number of records to fetch in one round trip. The records are sorted by the incremental column in ascending order and then the limit is applied. This could lead to potential loss of records in certain scenarios as listed below:

Scenario 1: Suppose ‘ckpt’ is the incremental column and the last checkpoint (last_ckpt) is 10, while the fetch size is 100. Further suppose there were more than 120 records written to the table before the next interval of incremental fetch started. Then a simple query as below

...

Code Block
languagejava
fetchSize := 100

dataset := new DataSet()

resultData := new DataSet()

do {

  dataset := records with ckpt >= last_ckpt order by ckpt limit fetchSize

  last_ckpt := max(ckpt) in dataset

  resultData.union(dataset)

} while (!dataset.isEmpty());

NOTE: The above algorithm could run indefinitely in case of a source with high write throughput. So, we should have an upper limit based on sync interval or max seen checkpoint before starting the sync. For phase 1, as discussed in the comments, instead of running iteratively, we are going to just fetch once. 

Scenario 2: Suppose ‘ckpt’ is the incremental column and the last checkpoint (last_ckpt) is 10. Further suppose there was a long running transaction in the database which wrote a record with ckpt=8. In this case, incremental fetch will pull all the records with ckpt > 10 and miss the one with ckpt=8. This scenario is not handled in Phase 1. Probably, we would need some kind of sweeper to look for such records in the background.

Scenario 3: A record is deleted in the source table. This would make the downstream table inconsistent with the source table. This scenario is also not handled in Phase 1. A workaround is to do insert_overwrite_table at regular interval. Another suggestion is to join with hudi dataset and find non-intersecting records, which could be expensive. In order to support such kind of CDC, we need to evaluate the pros and cons of both approaches, which we will take up in Phase 2.

Scenario 4: If the user chooses an `auto_increment` column as the incremental column, how do we handle updates? Auto-increment increments the value only in case of inserts. Updates are not handled in Phase 1. A workaround is to use `_updated_at` column as well in case the source table has such a column. Else, we might have to do batch sync at regular cadence for such use-cases.

From the above, we note that the current one-size-fits-all approach does not solve all the problems. We need to evaluate different data reconciliation strategies. However, Phase 1 brings immediate benefit to the users and also gives us a chance to get feedback while we evaluate along the way. 

Rollout/Adoption Plan

  • There won’t be any impact for existing users. This is just a new feature.
  • New configurations will be added to the documentation.
  • Add one JdbcSource example in the demo.

...