You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »


Proposers

  • Balaji Varadarajan (vbalaji)

Approvers

  • Vinoth Chandar @vinoth  : [APPROVED/REQUESTED_INFO/REJECTED]
  • Nishith Agarwal @nagarwal : [APPROVED/REQUESTED_INFO/REJECTED]
  • ...

Status

Current state: "Under Discussion"

Discussion thread: here

JIRA: here

Released: N/A

Abstract

With Apache Hudi growing in popularity, one of the fundamental challenges for users has been about efficiently migrating their historical datasets to Apache Hudi. Apache Hudi maintains per record metadata to perform core operations such as upserts and incremental pull. To take advantage of Hudi’s upsert and incremental processing support, users would need to rewrite their whole dataset to make it an Apache Hudi table. This RFC provides a mechanism to efficiently migrate their datasets without the need to rewrite the entire dataset.


Background

Here is some background information necessary to understand the design. Readers are expected to be familiar with basic Hudi concepts described here

Per Record Hudi Metadata


The above figure shows the layout of records in hudi. Each record has 5 Hudi metadata fields :

  • _hoodie_commit_time : Commit time associated with the latest mutation of the record
  • _hoodie_commit_seqno : To be used in incremental pull for creating multiple windows within a single ingested batch. 
  • _hoodie_record_key : Hudi Record Key used for updates and deletes
  • _hoodie_partition_path : Partition-Path associated with the record
  • _hoodie_file_name : File Name where the records are stored

Current Bootstrap Process:

Hudi provides built in support for migrating your entire dataset to Hudi one-time using HDFSParquetImporter tool available from the hudi-cli . You could also do this via a simple read and write of the dataset using the Spark datasource APIs. 

Once migrated, writes can be performed using normal means discussed here. This topic is discussed in detail here, including ways to doing partial migrations. At the risk of repeating, here are the two current approaches:

Onboard Hudi for new partitions alone:


Apache Hudi partitions can coexist with other non-hudi partitions. Apache Hudi query engine integration is carefully implemented to handle queries that span across these partitions. This would let users use Hudi for managing new partitions while keeping older partitions untouched. In the example above, historical partitions from Jan 1 2010 to Nov 30 2019 are in non-hudi format while newer partitions starting from Dec 01 2019  support Apache hudi capabilities. As the historical partitions are not managed by Apache HUDI, none of the primitives provided by Apache HUDI work on the data in those partitions. For append only type of datasets (like a table built from reading mobile/time-series data from kafka), this would work perfect.

Rewriting Existing Dataset to Apache Hudi:

If users need Apache Hudi support for all portions of their datasets, they need to completely rewrite their data in Hudi. This is needed because Hudi maintains per-record metadata and index information. They can either perform this rewrite as a whole as one unit or employ mechanism to split the datasets by partitions and load them. More concrete details are present here

On Rewriting existing dataset to Hudi:

Even though this is a one-time operation, users with very large data-lake installations may find it  challenging to perform these migration at full-scale. 

Large historical Fact tables usually have a pattern with a large number of columns. Nested columns are also not uncommon in these cases. Rewriting a large volume of such records has high read and write cost along with massive compute power (for columnar generation). 

Providing an efficient mechanism to migrating historical tables is crucial to painless adoption of Apache Hudi. This RFC proposes a mechanism to achieve that.

Proposal

The below figure represents a conceptual layout for each record. For ease of visualization, It is represented in row format even though Parquet uses columnar format.  Also, in the below figure, we assume that the index being used is Bloom Index which is the de-facto index deployed


As you can note from the above diagram, an Apache HUDI physical file contains 3 things relevant to our discussion

  1. For each record, 5 HUDI metadata fields with column indices 0 to 4
  2. For each record, the original data columns that comprises the record (Original Data)
  3. Additional Metadata at file footer for index lookup

Raw-data tables are usually bulkier with large number of columns per record. Only (1) and (3) are the additional metadata that makes a given Apache HUDI parquet file special.

For the purpose of this discussion, let us name the combination of (1) and (3) as “Hudi skeleton”. Hudi skeleton contains additional metadata that it maintains in each physical parquet files for supporting Hudi primitives.

The conceptual idea is to decouple Hudi skeleton data from original data (2). Hudi skeleton can be stored in Hudi file while the original data is stored in an external non-Hudi file. 

As long as Hudi primitives can be made to understand this new file format changes, bootstrapping an existing table would only require generating Hudi skeleton metadata only. From initial experiments on some production grade raw-data table, we find this bootstrap mechanism to be an order of magnitude faster than normal bootstrap. The new bootstrap process for a dataset containing around 3500 partitions, 250K files and around 60 billion rows with a single field as row-key took around 1 hour to bootstrap with 500 executors each containing 1 core and 4 GB memory. The old bootstrap process though required 4x the number of executors (2000) to finish bootstrapping in a day (~24 hrs). 

New Bootstrap Process:

The new bootstrap process would involve the following steps. Let us imagine a parquet dataset “fact_events” needs to be bootstrapped to hudi dataset.  Let us imagine that the root path of this dataset is “/user/hive/warehouse/fact_events” and there are several day based partitions within fact_events. Within each partition, there are several parquet files.  Please see below for a pictorial representation.


With the above setup, let us imagine that the user is using this new bootstrap mechanism to bootstrap this table to a new hudi dataset “fact_events_hudi” located at “/user/hive/warehouse/fact_events_hudi”

  1. User initiates this new bootstrap one time either through delta-streamer or through a standalone tool. Users provide the following information as part of the bootstrap:
    1. Original (non-hudi) dataset base location
    2. Columns to be used for generating hudi keys.
    3. Parallelism for running this bootstrap
    4. New Hudi Dataset location 
  2. Hudi bootstrap scans partitions and files in the original root location “/user/hive/warehouse/fact_events”  and performs the following operations :
    1. Creates similar hudi partitions in the new dataset location. In the above example, there will be day based partitions created under “/user/hive/warehouse/fact_events_hudi”
    2. Using Spark parallelism, generates unique file ID and uses it to generate a hudi skeleton parquet file for each original parquet file. A special commit timestamp called “BOOTSTRAP_COMMIT” is used. For the remainder of this document, let us imagine BOOTSTRAP_COMMIT having the timestamp “001”. For example, if an original parquet file is at the path /user/hive/warehouse/fact_events/year=2015/month=12/day=31/file1.parquet. Let the unique file id getting generated is h1, then the skeletor file corresponding to the above parquet file will be stored at path /user/hive/warehouse/fact_events_hudi/year=2015/month=12/day=31/h1_1-0-1_001.parquet.
    3. Generates a special bootstrap index which maps each newly generated hudi skeleton file id to its corresponding original parquet file.
    4. Atomically commits the bootstrap operation using same hudi timeline state transitions. Standard rollback will also be supported to rollback inflight and committed bootstrap.
  3. If hive syncing is enabled, creates a brand new hudi hive table pointing to the new location - “/user/hive/warehouse/fact_events_hudi”

Bootstrap Index:

The purpose of this index is to map the Hudi skeleton file with its associated external parquet-file containing original data columns. This information is part of Hudi’s file-system view and is  used when generating file slices. In this respect, there is a similarity between compaction plan and bootstrap index. But unlike compaction plan, bootstrap index could be larger in size for large historical tables. Hence, a proper storage format needs to be selected to be read and write efficient.

Hudi’s file-system view is an abstraction that translates physical file-names to file-groups and file-slices. The granularity for the apis supported from this layer is at partition-level. Hence, a bootstrap index must provide faster lookups to read a single partition’s index information. 

The suitable storage format is a variation of Hadoop Map File containing 2 types of files:

  • Bootstrap Log : This is a bunch of sequence files with each entry containing bootstrap index information for all files within a single partition. This is a log as any changes to bootstrap index for a given partition can be done by merely adding a new entry this log files.
  • Index to Bootstrap Log: This is a sequence file containing hudi-partition and the file-name and offset where bootstrap index log entry for the corresponding hudi partition is present.


With this layout, the bootstrap process using spark parallelism can control the number of such bootstrap log files containing bootstrap index information and consequently the speed of bootstrap index generation. Using the index to bootstrap log, both Hudi readers and writer can simply load the bootstrap index of the partitions they care about efficiently. The index to bootstrap log has one entry per partition and can be read into memory or into rocksDB.

Hudi cleaner operations typically removes old file slices that are no longer required. As hudi skeleton file generated due to bootstrap is part of a file-slice, it is also subjected to cleaning. Whenever such file-slices are cleaned, the bootstrap index also needs to be updated to reflect the state. Even though such cleanup is not necessary for correctness, it will keep the state consistent and gradually reduce the footprint of the bootstrap index. To support such mutations with ACID guarantees, similar MVCC mechanism like that of timeline management will be supported to keep the bootstrap index up to date while keeping concurrent readers isolated from the updates. 



Supporting Upserts and Read Use-Cases:

This section describes how rest of Hudi abstractions support this new file storage and how they work together to support Hudi primitives on the bootstrapped partitions. 

From the concepts page, A “file slice” refers to one complete snapshot of a logical hudi file. It contains one base file and one or more delta files. Conceptually, we encapsulate the bootstrap index information at file-slice (data-file) level. So a file-slice would be able to provide information about the external location where the original columns are residing.

With this model, if we need to update a batch of records “1-K” in old partitions for the newly bootstrapped table “fact_events_hudi”, the following steps are performed,

  • Let us say the commit time associated with this upsert operation is “C1”. It is given that C1 is greater than the “BOOTSTRAP_COMMIT” (001).
  • Assuming Bloom Index, index lookup happens directly on Hudi skeleton files.  Let’s say the hudi skeleton file with file id “h1” has all the records, 
  • In the coming description, “regular” hudi file means it is a hudi parquet file with per-record hudi metadata columns, original columns and bloom index in the single file. For Copy-On-Write table, the  writing phase identifies that the latest file-slice for the file Id “h1” is generated by bootstrap using special bootstrap commit time. It reads the original external file stored under original root location “/user/hive/warehouse/fact_events”. Hudi Merge Handle reads both this external file and the metadata-only hudi file parallelly, stitching the records together and merging them with incoming batch of records to create a “regular” hudi file with brand new version for the fileId “h1”.

  • For Merge-On-Read table, ingestion would simply append to a delta log file and a subsequent compaction performs similar steps as Copy-On-Write table to generate a “regular” hudi file with brand new version for the fileId “h1”.

Hudi implements custom input formats to integrate with query engines. These existing custom input formats will recognize special bootstrap commit and performs column stitching between hudi record-level metadata fields in the skeleton hudi file and other columns present in external parquet file to provide same views as existing hudi tables. Note that only projected columns required by the query will be read from the physical parquet files. Please see below for a pictorial representation of how query engine integration is done

Rollout/Adoption Plan

  • This will be rolled out as an experimental feature in 0.5.1 
  • Hudi Writers and Readers do not need special configuration to identify tables using this new bootstrap mechanism. The presence of special bootstrap commit and bootstrap index will automatically trigger correct handling of these tables.

Test Plan

    This change will affect all use-cases  of Hudi and hence would need to be tested comprehensively at unit-testing, integration testing and in long-running testing modes

TODOs

  •   Spark DataSource Integration for reading such bootstrapped tables is yet to be designed.
  • No labels