Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Div
classhome-banner

 RFC - 18 : Insert Overwrite API

Table of Contents

Proposers

  • @satishkotha

Approvers

  • @varadarb : [APPROVED/REQUESTED_INFO/REJECTED]
  • @nagarwal : [APPROVED/REQUESTED_INFO/REJECTED]

Status

Current state


Current State

Status
titleUnder Discussion

(tick)

Status
colourYellow
titleIn Progress


Status
colourRed
titleABANDONED


Status
colourGreen
titleCompleted


Status
colourBlue
titleINactive


Discussion thread: here

JIRA: here

Released: <Hudi Version>

Abstract

For tables or partitions, where the majority of records change every cycle, it is inefficient to do upsert or merge.  We want to provide hive like 'insert overwrite' API to ignore all the existing data and create a commit with just new data provided.  Doing this in Hoodie will provide better snapshot isolation than Hive because of atomic commits. These API can also be used for certain operational tasks to fix a specific corrupted partition. We can do 'insert overwrite'  on that partition with records from the source. This can be much faster than restore and replay for some data sources.

Background

<Introduce any much background context which is relevant or necessary to understand the feature and design choices.>

Implementation


Hoodie supports multiple write operations such as insert, upsert, bulk_insert on the target table.  At a high level, we like to add two new operations:

...

Also, note that my focus is primarily on COW tables. I tried giving examples for MOR tables too. But I don't fully grasp complexity and edge cases in MOR tables, so please highlight any mistakes/bad assumptions.

Implementation Approach #1: Reuse existing file groups 


We reuse existing file groups in partition and distribute new records among them. We create new file groups if number of new records is much larger than existing file groups can support.  If number of new records is much smaller, then there are two high level strategies to distribute records.

...

Example for Copy On Write:

files in partition before

insert overwrite with similar number of records

insert overwrite with much larger number of records

insert overwrite with 1 record

Partition contains  file1-t0.parquet, file2-t0.parquet

Partition will add file1-t1.parquet, file2-t1.parquet

Partition will add file1-t1.parquet, file2-t1.parquet

file3-t1.parquet

.

.

.fileN-t1.parquet

Partition will add 

file1-t1.parquet,

file2-t1.parquet 

One of the parquet files above will be empty. Other will have one record

Example for Merge On Read:

...

So assume initial state of partition has commit at t0. ‘Insert overwrite’ is done at t2. We schedule compaction at t1 before starting ‘insert overwrite’ at t2.

files in partition before

insert overwrite with similar number of records

insert overwrite with much larger number of records

insert overwrite with 1 record

Partition contains  file1-t0.parquet, file2-t0.parquet

.file1-t00.log

Partition will add file1-t2.parquet, file2-t2.parquet


Previous files continue to exist. After compaction runs, they will change to:

file1-t1.parquet

file2-t0.parquet

Partition will add file1-t2.parquet, file2-t2.parquet

file3-t2.parquet

.

.

.fileN-t2.parquet

Previous files continue to exist. After compaction runs, they will change to:

file1-t1.parquet

file2-t0.parquet

Partition will add 

file1-t2.parquet,

file2-t2.parquet 

One of the parquet files above will be empty. Other will have one record.

Previous files continue to exist. After compaction runs, they will change to:

file1-t1.parquet

file2-t0.parquet


Advantages:

  • Fits in well with existing hudi implementation. Because the same set of fileIds are used, it is easy to reason about versioning. Rollback and restore will continue to work as is. 

...

  • This means new parquet file size can be smaller than previous version for same file. We saw this happen earlier because parquet version incompatibility. We added guarding checks against that. So we may have to add exceptions for these rules.
  • In some cases, we create empty parquet files. In my testing, smallest size we could create was 400KB. This is because we store metadata including schema in empty parquet file. These empty parquet files can be reused if the partition grows in subsequent writes. Otherwise, we need a strategy to delete these empty file groups cleanly to reclaim this space.
    • One option to reclaim this space is to extend BaseFileOnlyView to inspect small parquet files and ignore them when listing splits. (Appreciate any other suggestions here as I do not have a lot of experience reading this code)
  • For MOR tables, this is somewhat awkward.
    • We have to schedule compaction before writing new data.
    • For ‘insert overwrite’, we are forced to write parquet files as opposed to writing inserts also into log files for performance reasons. There is additional work required to support inserts go into log files.
  • External index such as HBase Index needs to be updated to ‘delete’ keys that are no longer present in the partitions. Otherwise, index can get inconsistent and may effect record distribution. 

Implementation Approach #2: Create new set of file groups in same partition


Existing file groups are marked for 'deletion'. New file groups are created based on number of new records

...

Example for Copy On Write:

files in partition before

insert overwrite with similar number of records

insert overwrite with much larger number of records

insert overwrite with 1 record

Partition contains  file1-t0.parquet, file2-t0.parquet

Partition will add file3-t1.parquet, file4-t1.parquet


file1, file2 marked invalid in metadata after  t1

Partition will add file3-t1.parquet, file4-t1.parquet

file5-t1.parquet

.

.

.fileN-t1.parquet

file1, file2 marked invalid in metadata after  t1

Partition will add file3-t1.parquet




file1, file2 marked invalid in metadata after  t1

Example for Merge On Read:


files in partition before

insert overwrite with similar number of records

insert overwrite with much larger number of records

insert overwrite table with 1 record

Partition contains  file1-t0.parquet, file2-t0.parquet

.file1-t00.log

file3-t1.parquet

file4-t1.parquet


file1, file2 marked invalid in metadata after  t1

Partition will add file3-t1.parquet, file4-t1.parquet

.

.

.fileN-t1.parquet

file1, file2 marked invalid in metadata after  t1

Partition will add file3-t1.parquet


file1, file2 marked invalid in metadata after  t1

Advantages:

  • COW and MOR look very similar w.r.t. Implementation. Don't have to interfere with MOR compaction.
  • parquet file size reduction (as mentioned in disadvantages of approach#1) is not an issue. 
  • Don’t need to update external index in the critical path. Index implementation can check if file group is invalid (similar to how we check commit is invalid today in HBaseIndex) 
  • We can extend cleaner policy to delete old file groups after certain time window.

...

  • We need to carry forward metadata from previous commits
    1. at t1, say file1 is marked as invalid. we store “invalidFiles=file1” in t1.commit  (or deltacommit for MOR)
    2. at t2, say file2 is also marked invalid. We carry forward previous files and mark “invalidFiles=file1, file2” in t2.commit (or deltacommit for MOR)
  • This can be error prone as ignoring parquet files present in the disk is also a new behavior in Hudi. We have to be cognizant of new behavior and update all views on filesystem to ignore these. Overlooking this might cause issues when implementing other features.

Implementation Approach #3: version partition paths


Add an extra level of versioning on the partition itself.

...

Example for Copy On Write:

files in partition before

insert overwrite with similar number of records

insert overwrite with much larger number of records

insert overwrite with 1 record

Partition contains  file1-t0.parquet, file2-t0.parquet

Partition contains  file1-t0.parquet, file2-t0.parquet

A new directory ‘partition-t1’ is created. 

partition-t1 will add file3-t1.parquet, file4-t1.parquet

Partition contains  file1-t0.parquet, file2-t0.parquet

A new directory ‘partition-t1’ is created. 

partition-t1 will add file3-t1.parquet, file4-t1.parquet

file5-t1.parquet

.

.

.fileN-t1.parquet

Partition contains  file1-t0.parquet, file2-t0.parquet

A new directory ‘partition-t1’ is created. 

partition-t1 will add file3-t1.parquet

Example for Merge On Read:


files in partition before

insert overwrite with similar number of records

insert overwrite with much larger number of records

insert overwrite table with 1 record

Partition contains  file1-t0.parquet, file2-t0.parquet

.file1-t00.log

Partition contains  file1-t0.parquet, file2-t0.parquet

.file1-t00.log

A new directory ‘partition-t1’ is created. 


partition-t1 will add file3-t1.parquet, file4-t1.parquet

Partition contains  file1-t0.parquet, file2-t0.parquet

.file1-t00.log

A new directory ‘partition-t1’ is created. 


partition-t1 will add file3-t1.parquet, file4-t1.parquet

.

.

.

fileN-t1.parquet

Partition contains  file1-t0.parquet, file2-t0.parquet

.file1-t00.log

A new directory ‘partition-t1’ is created. 


partition-t1 will add file3-t1.parquet, file4-t1.parquet

Advantages:

  • COW and MOR look very similar w.r.t. Implementation. Don't have to interfere with MOR compaction.
  • parquet file size reduction (as mentioned in disadvantages of approach#1) is not an issue. 
  • Can ignore old partition entries for row keys in HBase index.
  • We can extend the cleaner policy to delete old partition versions after a certain time window.
  • Likely easy to debug. Logically we are creating a new partition. Low chances of data mixup. Can register new partition in HMS and unregister old partition

...

  • public JavaRDD<WriteStatus> insertOverwrite(JavaRDD<HoodieRecord<T>> records, final String commitTime)
  • public JavaRDD<WriteStatus> insertOverwriteTable(JavaRDD<HoodieRecord<T>> records, final String commitTime)


Other considerations

  • incremental reads with insert_overwrite
    • Lets say there is a pipeline setup using incremental reads. table1 --(transformation)--> table2 
    • if there is an insert overwrite on table1, previous records are considered 'deleted'  in table1. But, incremental reads on table1 will not send deletions with current implementation.
  • scaling insert overwrite
    • Partitioner implementation today stores all file groups effected in memory. We may have to make the internal data structures of Partitioner spillable, if 'insert_overwrite_table' is done on a large table with large number of file groups.

Recommendation:

After weighing trade-offs, we are inclined towards going with Option 1.  Option 3 would add a lot of complexity. Option 2 looks like a cleaner approach, but is introducing new behavior of ignoring file groups. Option1 is more practical, fits in with existing framework and is relatively easier to implement.  

Rollout/Adoption Plan

  • What impact (if any) will there be on existing users? None these are new API
  • If we are changing behavior how will we phase out the older behavior? NA
  • If we need special migration tools, describe them here. NA
  • When will we remove the existing behavior?  NA

Test Plan

  • We will add unit tests for all new code and  try to achieve 90% coverage
  • Add integration tests using hudi-test-suite framework
  • Load tests to verify scaling limitations

...