Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Background & Motivation 

<WIP>

System Overview

<WIP>

Implementation

Writing 

Hudi writing is implemented as a Spark library, which makes it easy to integrate into existing data pipelines or ingestion libraries (which we will refer to as `Hudi clients`). Hudi Clients prepare an `RDD[HoodieRecord]` that contains the data to be upserted and Hudi upsert/insert is merely a Spark DAG, that can be broken into two big pieces.

  • Indexing : A big part of Hudi's efficiency comes from indexing the mapping from record keys to the file ids, to which they belong to. This index also helps the `HoodieWriteClient` separate upserted records into inserts and updates, so they can be treated differently. `HoodieReadClient` supports operations such as `filterExists` (used for de-duplication of table) and an efficient batch `read(keys)` api, that can read out the records corresponding to the keys using the index much quickly, than a typical scan via a query. The index is also atomically updated each commit, and is also rolled back when commits are rolled back.
  • Storage : The storage part of the DAG is responsible for taking an `RDD[HoodieRecord]`, that has been tagged as an insert or update via index lookup, and writing it out efficiently onto storage.

Index

Hudi currently provides two choices for indexes : `BloomIndex` and `HBaseIndex` to map a record key into the file id to which it belongs to. This enables us to speed up upserts significantly, without scanning over every record in the dataset. Hudi Indices can be classified based on their ability to lookup records across partition. A `global` index does not need partition information for finding the file-id for a record key but a `non-global` does.

HBase Index (global)

Here, we just use HBase in a straightforward way to store the mapping above. The challenge with using HBase (or any external key-value store for that matter) is performing rollback of a commit and handling partial index updates.
Since the HBase table is indexed by record key and not commit Time, we would have to scan all the entries which will be prohibitively expensive. Instead, we store the commit time with the value and discard its value if it does not belong to a valid commit.

Bloom Index (non-global)

...

Storage

The implementation specifics of the two storage types are detailed below.

Copy On Write (COW)

The Spark DAG for this storage, is relatively simpler. The key goal here is to group the tagged Hudi record RDD, into a series of updates and inserts, by using a partitioner. To achieve the goals of maintaining file sizes, we first sample the input to obtain a `workload profile` that understands the spread of inserts vs updates, their distribution among the partitions etc. With this information, we bin-pack the records such that

  • For updates, the latest version of the that file id, is rewritten once, with new values for all records that have changed
  • For inserts, the records are first packed onto the smallest file in each partition path, until it reaches the configured maximum size.

...

Compactor

...

In this page hierarchy, we explain the concepts, design and the overall architectural underpinnings of Apache Hudi. This content is intended to be the technical documentation of the project and will be kept up-to date with 

Info
title"def~" annotations

In an effort to keep this page crisp for reading, any concepts that we need to explain are annotated with a def~  and hyperlinked off. You can contribute immensely to our docs, by writing the missing pages for annotated terms. These are marked in brownPlease mention any PMC/Committers on these pages for review.

Introduction

Apache Hudi (Hudi for short, here on) allows you to store vast amounts of data, on top existing def~hadoop-compatible-storage, while providing two primitives, that enable def~stream-processing on def~data-lakes, in addition to typical  def~batch-processing.

Specifically,

  • Update/Delete Records : Hudi provides support for updating/deleting records, using fine grained file/record level indexes, while providing transactional guarantees for the write operation. Queries process  the last such committed snapshot, to produce results.
  • Change Streams : Hudi also provides first-class support for obtaining an incremental stream of all the records that were updated/inserted/deleted in a given table, from a given point-in-time, and unlocks a new def~incremental-query category.


Image Added
These primitives work closely hand-in-glove and unlock stream/incremental processing capabilities directly on top of def~DFS-abstractions. If you are familiar def~stream-processing, this is very similar to consuming events from a def~kafka-topic and then using a def~state-stores to accumulate intermediate results incrementally.

It has several architectural advantages.

  • Increased Efficiency : Ingesting data often needs to deal with updates (resulting from def~database-change-capture), deletions (due to def~data-privacy-regulations) and enforcing def~unique-key-constraints (to ensure def~data-quality of event streams/analytics). However, due to lack of standardized support for such functionality using a system like Hudi, data engineers often resort to big batch jobs that reprocess entire day's events or reload the entire upstream database every run, leading to massive waste of def~computational-resources. Since Hudi supports record level updates, it brings an order of magnitude improvement to these operations, by only reprocessing changes records and rewriting only the part of the def~table, that was updated/deleted, as opposed to rewriting entire def~table-partitions or even the entire def~table.
  • Faster ETL/Derived Pipelines : An ubiquitous next step, once the data has been ingested from external sources is to build derived data pipelines using Apache Spark/Apache Hive or any other data processing framework to  def~ETL the ingested data for a variety of use-cases like def~data-warehousing, def~machine-learning-feature-extraction, or even just def~analytics. Typically, such processes again rely on def~batch-processing jobs expressed in code or SQL, that process all input data in bulk and recompute all the output results. Such data pipelines can be sped up dramatically, by querying one or more input tables using an def~incremental-query instead of a regular def~snapshot-query, resulting once again in only processing the incremental changes from upstream tables and then def~upsert or delete the target derived table, like above.
  • Access to fresh data :  It's not everyday, that you will find that reduced resource usage also result in improved performance, since typically we add more resources (e.g memory) to improve performance metric (e.g query latency) . By fundamentally shifting away from how datasets have been traditionally managed for may be the first time since the dawn of the big data era, Hudi, in fact, realizes this rare combination. A sweet side-effect of incrementalizing def~batch-processing is that the pipelines also much much smaller amount of time to run, putting data into hands of organizations much much quickly, than it has been possible with def~data-lakes before.
  • Unified Storage : Building upon all the three benefits above, faster and lighter processing right on top of existing def~data-lakes mean lesser need for specialized storage or  def~data-marts, simply for purposes of gaining access to near real-time data.

Design Principles

Streaming Reads/Writes : Hudi is designed, from ground-up, for streaming records in and out of large datasets, borrowing principles from database design. To that end, Hudi provides def~index implementations, that can quickly map a record's key to the file location it resides at. Similarly, for streaming data out, Hudi adds and tracks record level metadata via def~hoodie-special-columns, that enables providing a precise incremental stream of all changes that happened. 

Self-Managing : Hudi recognizes the different expectation of data freshness (write friendly) vs query performance (read/query friendliness) users may have, and supports three different def~query-types that provide real-time snapshots, incremental streams or purely columnar data that slightly older. At each step, Hudi strives to be self-managing (e.g: autotunes the writer parallelism, maintains file sizes) and self-healing (e.g: auto rollbacks failed commits), even if it comes at cost of slightly additional runtime cost (e.g: caching input data in memory to profile the workload). The core premise here, is that, often times operational costs of these large data pipelines without such operational levers/self-managing features built-in, dwarf the extra memory/runtime costs associated.

Everything is a log : Hudi also has an append-only, cloud data storage friendly design, that lets Hudi manage data on across all the major cloud providers seamlessly, implementing principles from def~log-structured-storage systems. 

key-value data model : On the writer side, Hudi table is modeled as a key-value dataset, where each def~record has a unique def~record-key. Additionally, a record key may also include the def~partitionpath under which the record is partitioned and stored. This often helps in cutting down the search space during index lookups.

Table Layout

With an understanding of key technical motivations for the projects, let's now dive deeper into design of the system itself. At a high level,  components for writing Hudi tables are embedded into an Apache Spark job using one of the supported ways and it produces a set of files on def~backing-dfs-storage, that represents a Hudi def~table. Query engines like Apache Spark, Presto, Apache Hive can then query the table, with certain guarantees (that will discuss below).

There are three main components to a def~table

  1. Ordered sequence of def~timeline-metadata about all the write operations done on the table, akin to a database transaction log.
  2. A hierarchical layout of a set of  def~data-files that actually contain the records that were written to the table.
  3. An def~index (which could be implemented in many ways), that maps a given record to a subset of the data-files that contains the record.

Image Added


Hudi provides the following capabilities for writers, queries and on the underlying data, which makes it a great building block for large def~data-lakes.

  • upsert() support with fast, pluggable indexing
  • Incremental queries that scan only new data efficiently
  • Atomically publish data with rollback support, Savepoints for data recovery
  • Snapshot isolation between writer & queries using def~mvcc style design
  • Manages file sizes, layout using statistics
  • Self managed def~compaction of updates/deltas against existing records.
  • Timeline metadata to audit changes to data
  • GDPR, Data deletions, Compliance.

Timeline

Excerpt Include
def~timeline
def~timeline
nopaneltrue

Excerpt Include
def~timeline-instant
def~timeline-instant
nopaneltrue

Excerpt Include
def~instant-action
def~instant-action
nopaneltrue

Excerpt Include
def~instant-state
def~instant-state
nopaneltrue

Data Files

Hudi organizes a table into a folder structure under a def~table-basepath on DFS. If the table is partitioned by some columns, then there are additional def~table-partitions under the base path, which are folders containing data files for that partition, very similar to Hive tables. Each partition is uniquely identified by its def~partitionpath, which is relative to the basepath. Within each partition, files are organized into def~file-groups, uniquely identified by a def~file-id. Each file group contains several def~file-slices, where each slice contains a def~base-file (e.g: parquet) produced at a certain commit/compaction def~instant-time, along with set of def~log-files  that contain inserts/updates to the base file since the base file was last written. Hudi adopts a MVCC design, where compaction action merges logs and base files to produce new file slices and cleaning action gets rid of unused/older file slices to reclaim space on DFS.


Image AddedFig : Shows four file groups 1,2,3,4 with base and log files, with few file slices each

Index

Excerpt Include
def~index
def~index
nopaneltrue

Table Types

The implementation specifics of the two def~table-types are detailed below.

Excerpt Include
def~table-type
def~table-type
nopaneltrue

Copy On Write Table

def~copy-on-write (COW)

Excerpt Include
def~copy-on-write (COW)
def~copy-on-write (COW)
nopaneltrue


Image Added



Merge On Read Table

def~merge-on-read (MOR)

Excerpt Include
def~merge-on-read (MOR)
def~merge-on-read (MOR)
nopaneltrue


Image Added


Writing

Write Operations

Excerpt Include
def~write-operation
def~write-operation
nopaneltrue

Compaction

Excerpt Include
def~compaction
def~compaction
nopaneltrue

Cleaning

Excerpt Include
def~cleaning
def~cleaning
nopaneltrue

Optimized DFS Access

Hudi also performs several key storage management functions on the data stored in a def~table. A key aspect of storing data on DFS is managing file sizes and counts and reclaiming storage space. For e.g HDFS is infamous for its handling of small files, which exerts memory/RPC pressure on the Name Node and can potentially destabilize the entire cluster. In general, query engines provide much better performance on adequately sized columnar files, since they can effectively amortize cost of obtaining column statistics etc. Even on some cloud data stores, there is often cost to listing directories with large number of small files.

Here are some ways, Hudi writing efficiently manages the storage of data.

  • The small file handling feature in Hudi, profiles incoming workload and distributes inserts to existing def~file-group instead of creating new file groups, which can lead to small files.
  • Employing a cache of the def~timeline, in the writer such that as long as the spark cluster is not spun up everytime, subsequent def~write-operations never list DFS directly to obtain list of def~file-slices in a given def~table-partition
  • User can also tune the size of the def~base-file as a fraction of def~log-files & expected compression ratio, such that sufficient number of inserts are grouped into the same file group, resulting in well sized base files ultimately.
  • Intelligently tuning the bulk insert parallelism, can again in nicely sized initial file groups. It is in fact critical to get this right, since the file groups once created cannot be deleted, but simply expanded as explained before.

Querying

Excerpt Include
def~query-type
def~query-type
nopaneltrue

Snapshot Queries

Excerpt Include
def~snapshot-query
def~snapshot-query
nopaneltrue

Incremental Queries

Excerpt Include
def~incremental-query
def~incremental-query
nopaneltrue

Read Optimized Queries

Excerpt Include
def~read-optimized-query
def~read-optimized-query
nopaneltrue

...

Code Block
Main API in HoodieWriteClient for running Compaction:
/**
* Performs Compaction corresponding to instant-time
* @param compactionInstantTime Compaction Instant Time
* @return
* @throws IOException
*/
public JavaRDD<WriteStatus> compact(String compactionInstantTime) throws IOException;

To lookup all pending compactions, use the API defined in HoodieReadClient

/**
* Return all pending compactions with instant time for clients to decide what to compact next.
* @return
*/
public List<Pair<String, HoodieCompactionPlan>> getPendingCompactions();
```
API for scheduling compaction

```

/**
* Schedules a new compaction instant
* @param extraMetadata
* @return Compaction Instant timestamp if a new compaction plan is scheduled
*/
Optional<String> scheduleCompaction(Optional<Map<String, String>> extraMetadata) throws IOException;

Refer to __hoodie-client/src/test/java/HoodieClientExample.java__ class for an example of how compaction is scheduled and executed.

Deployment Mode
These are typical Hudi Writer and Compaction deployment models
Inline Compaction : At each round, a single spark application ingests new batch to dataset. It then optionally decides to schedule a compaction run and executes it in sequence.
Single Dedicated Async Compactor: The Spark application which brings in new changes to dataset (writer) periodically schedules compaction. The Writer application does not run compaction inline. A separate spark applications periodically probes for pending compaction and executes the compaction.
Multi Async Compactors: This mode is similar to `Single Dedicated Async Compactor` mode. The main difference being now there can be more than one spark application picking different compactions and executing them in parallel.
In order to ensure compactors do not step on each other, they use coordination service like zookeeper to pickup unique pending compaction instants and run them.
The Compaction process requires one executor per file-slice in the compaction plan. So, the best resource allocation strategy (both in terms of speed and resource usage) for clusters supporting dynamic allocation is to lookup the compaction plan to be run to figure out the number of file slices being compacted and choose that many number of executors.
Async Compaction Design Deep-Dive For the purpose of this section, it is important to distinguish between 2 types of commits as pertaining to the file-group: A commit which generates a merged and read-optimized file-slice is called `snapshot commit` (SC) with respect to that file-group.  A commit which merely appended the new/updated records assigned to the file-group into a new log block is called `delta commit` (DC) with respect to that file-group.
The algorithm is described with an illustration. Let us assume a scenario where there are commits SC1, DC2, DC3 that have already completed on a data-set. Commit DC4 is currently ongoing with the writer (ingestion) process using it to upsert data. Let us also imagine there are a set of file-groups (FG1 … FGn) in the data-set whose latest version (`File-Slice`) contains the base file created by commit SC1 (snapshot-commit in columnar format) and a log file containing row-based log blocks of 2 delta-commits (DC2 and DC3).
{% include image.html file="async_compac_1.png" alt="async_compac_1.png" max-width="1000" %}Writer (Ingestion) that is going to commit "DC4" starts. The record updates in this batch are grouped by file-groups and appended in row formats to the corresponding log file as delta commit. Let us imagine a subset of file-groups has
this new log block (delta commit) DC4 added. Before the writer job completes, it runs the compaction strategy to decide which file-group to compact by compactor and creates a new compaction-request commit SC5. This commit file is marked as “requested” with metadata denoting which fileIds to compact (based on selection policy). Writer completes without running compaction (will be run async).
{% include image.html file="async_compac_2.png" alt="async_compac_2.png" max-width="1000" %}
Writer job runs again ingesting next batch. It starts with commit DC6. It reads the earliest inflight compaction request marker commit in timeline order and collects the (fileId, Compaction Commit Id “CcId” ) pairs from meta-data. Ingestion DC6 ensures a new file-slice with base-commit “CcId” gets allocated for the file-group. The Writer will simply append records in row-format to the first log-file (as delta-commit) assuming the base-file (“Phantom-Base-File”) will be created eventually by the compactor.
{% include image.html file="async_compac_3.png" alt="async_compac_3.png" max-width="1000" %}
Compactor runs at some time and commits at “Tc” (concurrently or before/after Ingestion DC6). It reads the commit-timeline and finds the first unprocessed compaction request marker commit. Compactor reads the commit’s metadata finding the file-slices to be compacted. It compacts the file-slice and creates the missing base-file (“Phantom-Base-File”) with “CCId” as the commit-timestamp. Compactor then marks the compaction commit timestamp as completed. It is important to realize that at data-set level, there could be different file-groups requesting compaction at different commit timestamps.
{% include image.html file="async_compac_4.png" alt="async_compac_4.png" max-width="1000" %}Near Real-time reader interested in getting the latest snapshot will have 2 cases. Let us assume that the incremental ingestion (writer at DC6) happened before the compaction (some time “Tc”’).  The below description is with regards to compaction from file-group perspective.  Reader querying at time between ingestion completion time for DC6 and compaction finish “Tc”`: Hudi’s implementation will be changed to become aware of file-groups currently waiting for compaction and merge log-files corresponding to DC2-DC6 with the base-file corresponding to SC1. In essence, Hudi will create a pseudo file-slice by combining the 2 file-slices starting at base-commits SC1 and SC5 to one.
For file-groups not waiting for compaction, the reader behavior is essentially the same - read latest file-slice and merge on the fly.Reader querying at time after compaction finished (> “Tc”)` : In this case, reader will not find any pending compactions in the timeline and will simply have the current behavior of reading the latest file-slice and merging on-the-fly.Read-Optimized View readers will query against the latest columnar base-file for each file-groups.
The above algorithm explains Async compaction w.r.t a single compaction run on a single file-group. It is important to note that multiple compaction plans can be run concurrently as they are essentially operating on different file-groups.