Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Table of Contents

Proposers

Approvers

Status

Current state


Current State

Status
titleUnder Discussion

(tick)

Status
colourYellow
titleIn Progress


Status
colourRed
titleABANDONED


Status
colourGreen
titleCompleted

(tick)

Status
colourBlue
titleINactive


...

For tables or partitions, where the majority of records change every cycle, it is inefficient to do upsert or merge.  We want to provide hive like 'insert overwrite' API to ignore all the existing data and create a commit with just new data provided.   Doing this in Hoodie will provide better snapshot isolation than Hive because of atomic commits. These These  API can also be used for certain operational tasks to fix a specific corrupted partition. We can do 'insert overwrite'  on that partition with records from the source. This can be much faster than restore and replay for some data sources.

Background

<Introduce any much background context which is relevant or necessary to understand the feature and design choices.>

 

Implementation


Hoodie supports multiple write operations such as insert, upsert, bulk_insert on the target table.  At a high level, we like to add two new operations:

  1. insert overwrite: overwrite partitions touched (default):.   Example: Say a table has 3 total partitions (p0, p1, p2). Client performs insert overwrite with 10 records. Lets say all 10 new records belong to p2.  Then overwrite is only performed on p2.  All previous records in p0, p1 will continue to exist as before.
  2. insert overwrite_table: overwrite all partitions. For the above example, p0 and p1 will have 0 records after the write operation completes successfully. p2 will only have new records

...

  • Fits in well with existing hudi implementation. Because the same set of fileIds are used, it is easy to reason about versioning. Rollback and restore will continue to work as is. 

Disadvantages:

...

  • In some cases, we create empty parquet files. In my testing, smallest size we could create was 400KB. This is because we store metadata including schema in empty parquet file. These empty parquet files can be reused if the partition grows in subsequent writes. Otherwise, we need a strategy to delete these empty file groups cleanly to reclaim this space.
    • One option to reclaim this space is to extend BaseFileOnlyView to inspect small parquet files and ignore them when listing splits. (Appreciate any other suggestions here as I do not have a lot of experience reading this code)
    • Other option is to extend metadata to mark these file groups as invalid. We update all operations to read metadata first to discard these empty files. There can be race conditions if this is not done carefully. (More details can be discussed after we finalize an approach)
  • For MOR tables, this is somewhat awkward.
    • We have to schedule compaction before writing new data.
    • For ‘insert overwrite’, we are forced to write parquet files as opposed to writing inserts also into log files for performance reasons. There is additional work required to support inserts go into log files.
  • External index such as HBase Index needs to be updated to ‘delete’ keys that are no longer present in the partitions. Otherwise, index can get inconsistent and may effect record distribution. 

...

  • Need to add versioning related complexity to partitions. Example: clean process for older versioned partitions. 
  • Supporting incremental reads across different versions might be difficult.
  • Additional listing is required to figure out latest version (Or rely on consolidated metadata to query latest partition version)
  • HUDI uses user specified partition paths. This is a change of behavior to add a version and additional complexity is required to support all query engines.


API

Regardless of implementation approach chosen, we need to add/change existing high level API. At the moment, we incline towards creating two new operations on spark dataframe instead of adding flags on existing operation because the actions taken are equivalent to deleting all pre-existing data. But open to feedback.

...

Similarly a new high level public method is added on HoodieWriteClient (We can discuss merging these two into one as well)

  • public JavaRDD<WriteStatus> insertOverwrite(JavaRDD<HoodieRecord<T>> records, final String commitTime)
  • public JavaRDD<WriteStatus> insertOverwriteTable(JavaRDD<HoodieRecord<T>> records, final String commitTime)

...

  • incremental reads with insert_overwrite
    • Lets say there is a pipeline setup using incremental reads. table1 -- (transformation) --> table2 
    • if there is an insert overwrite on table1, previous records are considered 'deleted'  in table1. But, incremental reads on table1 will not send deletions with current implementation. So table2 data will continue to have the previous records. This is not being tracked as part of this RFC, but an important feature to implement IMO.
  • scaling insert overwrite
    • Partitioner implementation today stores all file groups effected in memory. We may have to make the internal data structures of Partitioner spillable, if 'insert_overwrite_table' is done on a large table with large number of file groups.


Recommendation

...

After weighing trade-offs, we are inclined towards going with Option 2.  Option 1 has lot of complexity especially with MOR tables. Option 3 would add a lot of complexity . Option 2 looks like a cleaner approach, but is introducing new behavior of ignoring file groups. Option1 is more practical, fits in with existing framework and is relatively easier to implement.  in general.  With Option2, we can take multiple steps.

  1. Before consolidated metadata launches, in the initial version, we keep list of file groups to be filtered out in active commit metadata. We change reader to query active commits and filter appropriate file groups. We also make changes to cleaner to archive active commits only after removing corresponding file groups from disk. (This approach can also be used for RFC-19)
  2.  After consolidated metadata launches, we can change metadata lookup to leverage that instead of reading from active commit files.

Rollout/Adoption Plan

  • What impact (if any) will there be on existing users? None these are new API
  • If we are changing behavior how will we phase out the older behavior? NA
  • If we need special migration tools, describe them here. NA
  • When will we remove the existing behavior?  NA

...