Proposers

Approvers

Status

Current state


Current State

UNDER DISCUSSION


IN PROGRESS


ABANDONED


COMPLETED

(tick)

INACTIVE


Discussion thread: here

JIRA: here

Released: <Hudi Version>

Abstract

For tables or partitions, where the majority of records change every cycle, it is inefficient to do upsert or merge.  We want to provide hive like 'insert overwrite' API to ignore all the existing data and create a commit with just new data provided. These  API can also be used for certain operational tasks to fix a specific corrupted partition. We can do 'insert overwrite'  on that partition with records from the source. This can be much faster than restore and replay for some data sources. 

Implementation


Hoodie supports multiple write operations such as insert, upsert, bulk_insert on the target table.  At a high level, we like to add two new operations:

  1. insert overwrite: overwrite partitions touched.   Example: Say a table has 3 total partitions (p0, p1, p2). Client performs insert overwrite with 10 records. Lets say all 10 new records belong to p2.  Then overwrite is only performed on p2.  All previous records in p0, p1 will continue to exist as before.
  2. insert overwrite_table: overwrite all partitions. For the above example, p0 and p1 will have 0 records after the write operation completes successfully. p2 will only have new records

Below, we discuss some high level implementation choices. Most of the discussion is centered on one partition. It is relatively easy to extend this to multiple partitions.

Also, note that my focus is primarily on COW tables. I tried giving examples for MOR tables too. But I don't fully grasp complexity and edge cases in MOR tables, so please highlight any mistakes/bad assumptions.

Implementation Approach #1: Reuse existing file groups 

We reuse existing file groups in partition and distribute new records among them. We create new file groups if number of new records is much larger than existing file groups can support.  If number of new records is much smaller, then there are two high level strategies to distribute records.

  1. Minimize number of file groups required after insert overwrite. Fill the first file group until it meets the size criteria specified in the config. Move to second file group and so on. Some file groups will be empty after distribution.
  2. Round robin records across all existing file groups. None of the file groups will be empty (unless total new records size < number of file groups)

I am inclined towards #1 to minimize IO in subsequent writes. 


Example for Copy On Write:

files in partition before

insert overwrite with similar number of records

insert overwrite with much larger number of records

insert overwrite with 1 record

Partition contains  file1-t0.parquet, file2-t0.parquet

Partition will add file1-t1.parquet, file2-t1.parquet

Partition will add file1-t1.parquet, file2-t1.parquet

file3-t1.parquet

.

.

.fileN-t1.parquet

Partition will add 

file1-t1.parquet,

file2-t1.parquet 

One of the parquet files above will be empty. Other will have one record

Example for Merge On Read:

Note that for MOR, we schedule compaction inline before doing ‘insert overwrite’. We don’t have to wait for compaction to complete.

So assume initial state of partition has commit at t0. ‘Insert overwrite’ is done at t2. We schedule compaction at t1 before starting ‘insert overwrite’ at t2.

files in partition before

insert overwrite with similar number of records

insert overwrite with much larger number of records

insert overwrite with 1 record

Partition contains  file1-t0.parquet, file2-t0.parquet

.file1-t00.log

Partition will add file1-t2.parquet, file2-t2.parquet


Previous files continue to exist. After compaction runs, they will change to:

file1-t1.parquet

file2-t0.parquet

Partition will add file1-t2.parquet, file2-t2.parquet

file3-t2.parquet

.

.

.fileN-t2.parquet

Previous files continue to exist. After compaction runs, they will change to:

file1-t1.parquet

file2-t0.parquet

Partition will add 

file1-t2.parquet,

file2-t2.parquet 

One of the parquet files above will be empty. Other will have one record.

Previous files continue to exist. After compaction runs, they will change to:

file1-t1.parquet

file2-t0.parquet


Advantages:

  • Fits in well with existing hudi implementation. Because the same set of fileIds are used, it is easy to reason about versioning. Rollback and restore will continue to work as is. 

Disadvantages:

  • In some cases, we create empty parquet files. In my testing, smallest size we could create was 400KB. This is because we store metadata including schema in empty parquet file. These empty parquet files can be reused if the partition grows in subsequent writes. Otherwise, we need a strategy to delete these empty file groups cleanly to reclaim this space.
    • One option to reclaim this space is to extend BaseFileOnlyView to inspect small parquet files and ignore them when listing splits. (Appreciate any other suggestions here as I do not have a lot of experience reading this code)
    • Other option is to extend metadata to mark these file groups as invalid. We update all operations to read metadata first to discard these empty files. There can be race conditions if this is not done carefully. (More details can be discussed after we finalize an approach)
  • For MOR tables, this is somewhat awkward.
    • We have to schedule compaction before writing new data.
    • For ‘insert overwrite’, we are forced to write parquet files as opposed to writing inserts also into log files for performance reasons. There is additional work required to support inserts go into log files.
  • External index such as HBase Index needs to be updated to ‘delete’ keys that are no longer present in the partitions. Otherwise, index can get inconsistent and may effect record distribution. 

Implementation Approach #2: Create new set of file groups in same partition

Existing file groups are marked for 'deletion'. New file groups are created based on number of new records


Example for Copy On Write:

files in partition before

insert overwrite with similar number of records

insert overwrite with much larger number of records

insert overwrite with 1 record

Partition contains  file1-t0.parquet, file2-t0.parquet

Partition will add file3-t1.parquet, file4-t1.parquet


file1, file2 marked invalid in metadata after  t1

Partition will add file3-t1.parquet, file4-t1.parquet

file5-t1.parquet

.

.

.fileN-t1.parquet

file1, file2 marked invalid in metadata after  t1

Partition will add file3-t1.parquet




file1, file2 marked invalid in metadata after  t1

Example for Merge On Read:


files in partition before

insert overwrite with similar number of records

insert overwrite with much larger number of records

insert overwrite table with 1 record

Partition contains  file1-t0.parquet, file2-t0.parquet

.file1-t00.log

file3-t1.parquet

file4-t1.parquet


file1, file2 marked invalid in metadata after  t1

Partition will add file3-t1.parquet, file4-t1.parquet

.

.

.fileN-t1.parquet

file1, file2 marked invalid in metadata after  t1

Partition will add file3-t1.parquet


file1, file2 marked invalid in metadata after  t1

Advantages:

  • COW and MOR look very similar w.r.t. Implementation. Don't have to interfere with MOR compaction.
  • parquet file size reduction (as mentioned in disadvantages of approach#1) is not an issue. 
  • Don’t need to update external index in the critical path. Index implementation can check if file group is invalid (similar to how we check commit is invalid today in HBaseIndex) 
  • We can extend cleaner policy to delete old file groups after certain time window.


Disadvantages:

  • We need to carry forward metadata from previous commits
    1. at t1, say file1 is marked as invalid. we store “invalidFiles=file1” in t1.commit  (or deltacommit for MOR)
    2. at t2, say file2 is also marked invalid. We carry forward previous files and mark “invalidFiles=file1, file2” in t2.commit (or deltacommit for MOR)
  • This can be error prone as ignoring parquet files present in the disk is also a new behavior in Hudi. We have to be cognizant of new behavior and update all views on filesystem to ignore these. Overlooking this might cause issues when implementing other features.

Implementation Approach #3: version partition paths

Add an extra level of versioning on the partition itself.


Example for Copy On Write:

files in partition before

insert overwrite with similar number of records

insert overwrite with much larger number of records

insert overwrite with 1 record

Partition contains  file1-t0.parquet, file2-t0.parquet

Partition contains  file1-t0.parquet, file2-t0.parquet

A new directory ‘partition-t1’ is created. 

partition-t1 will add file3-t1.parquet, file4-t1.parquet

Partition contains  file1-t0.parquet, file2-t0.parquet

A new directory ‘partition-t1’ is created. 

partition-t1 will add file3-t1.parquet, file4-t1.parquet

file5-t1.parquet

.

.

.fileN-t1.parquet

Partition contains  file1-t0.parquet, file2-t0.parquet

A new directory ‘partition-t1’ is created. 

partition-t1 will add file3-t1.parquet

Example for Merge On Read:

files in partition before

insert overwrite with similar number of records

insert overwrite with much larger number of records

insert overwrite table with 1 record

Partition contains  file1-t0.parquet, file2-t0.parquet

.file1-t00.log

Partition contains  file1-t0.parquet, file2-t0.parquet

.file1-t00.log

A new directory ‘partition-t1’ is created. 


partition-t1 will add file3-t1.parquet, file4-t1.parquet

Partition contains  file1-t0.parquet, file2-t0.parquet

.file1-t00.log

A new directory ‘partition-t1’ is created. 


partition-t1 will add file3-t1.parquet, file4-t1.parquet

.

.

.

fileN-t1.parquet

Partition contains  file1-t0.parquet, file2-t0.parquet

.file1-t00.log

A new directory ‘partition-t1’ is created. 


partition-t1 will add file3-t1.parquet, file4-t1.parquet

Advantages:

  • COW and MOR look very similar w.r.t. Implementation. Don't have to interfere with MOR compaction.
  • parquet file size reduction (as mentioned in disadvantages of approach#1) is not an issue. 
  • Can ignore old partition entries for row keys in HBase index.
  • We can extend the cleaner policy to delete old partition versions after a certain time window.
  • Likely easy to debug. Logically we are creating a new partition. Low chances of data mixup. Can register new partition in HMS and unregister old partition


Disadvantages:

  • Need to add versioning related complexity to partitions. Example: clean process for older versioned partitions. 
  • Supporting incremental reads across different versions might be difficult.
  • Additional listing is required to figure out latest version (Or rely on consolidated metadata to query latest partition version)
  • HUDI uses user specified partition paths. This is a change of behavior to add a version and additional complexity is required to support all query engines.


API

Regardless of implementation approach chosen, we need to add/change existing high level API. At the moment, we incline towards creating two new operations on spark dataframe instead of adding flags on existing operation because the actions taken are equivalent to deleting all pre-existing data. But open to feedback.

  • hoodie.datasource.write.operation: insert_overwrite
  • hoodie.datasource.write.operation: insert_overwrite_table

Similarly a new high level public method is added on HoodieWriteClient (We can discuss merging these two into one as well)

  • public JavaRDD<WriteStatus> insertOverwrite(JavaRDD<HoodieRecord<T>> records, final String commitTime)
  • public JavaRDD<WriteStatus> insertOverwriteTable(JavaRDD<HoodieRecord<T>> records, final String commitTime)

Other considerations

  • incremental reads with insert_overwrite
    • Lets say there is a pipeline setup using incremental reads. table1 -- (transformation) --> table2 
    • if there is an insert overwrite on table1, previous records are considered 'deleted'  in table1. But, incremental reads on table1 will not send deletions with current implementation. So table2 data will continue to have the previous records. This is not being tracked as part of this RFC, but an important feature to implement IMO.
  • scaling insert overwrite
    • Partitioner implementation today stores all file groups effected in memory. We may have to make the internal data structures of Partitioner spillable, if 'insert_overwrite_table' is done on a large table with large number of file groups.


Recommendation

After weighing trade-offs, we are inclined towards going with Option 2.  Option 1 has lot of complexity especially with MOR tables. Option 3 would add a lot of complexity in general.  With Option2, we can take multiple steps.

  1. Before consolidated metadata launches, in the initial version, we keep list of file groups to be filtered out in active commit metadata. We change reader to query active commits and filter appropriate file groups. We also make changes to cleaner to archive active commits only after removing corresponding file groups from disk. (This approach can also be used for RFC-19)
  2.  After consolidated metadata launches, we can change metadata lookup to leverage that instead of reading from active commit files.

Rollout/Adoption Plan

  • What impact (if any) will there be on existing users? None these are new API
  • If we are changing behavior how will we phase out the older behavior? NA
  • If we need special migration tools, describe them here. NA
  • When will we remove the existing behavior?  NA

Test Plan

  • We will add unit tests for all new code and  try to achieve 90% coverage
  • Add integration tests using hudi-test-suite framework
  • Load tests to verify scaling limitations







1 Comment

  1. > I changed recommendation to approach 2 here since similar functionality is needed for RFC-19. 

    > This approach also provides a easy way to transition into using consolidated metadata than option 1.

    > Please take a look. If you guys agree on the approach, I'll start implementing POC for both COW and MOR tables. Let me know


    In general, prefer approach 2.. I also left a comment on whether we call the underlying action `replace()` instead and reuse the infrastructure for clustering as well.  I think that's a better approach..  ofc your api at the WriteClient level can be called insertOverwrite() , which is then implemented at the table level using a `ReplaceActionExecutor`. Makes sense ?


    Satish Kotha Nishith Agarwal Balaji Varadarajan In any case, good to start PoCing a bit to get better understanding of implementation challenges.. 

    There are implementation choices here.. Should replace be a top level action or just a flag inside commit metadata.. etc.. we need to make these choices prudently.