Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Increased Efficiency : Ingesting data often needs to deal with updates (resulting from def~database-change-capture), deletions (due to def~data-privacy-regulations) and enforcing def~unique-key-constraints (to ensure def~data-quality of event streams/analytics). However, due to lack of standardized support for such functionality using a system like Hudi, data engineers often resort to big batch jobs that reprocess entire day's events or reload the entire upstream database every run, leading to massive waste of def~computational-resources. Since Hudi supports record level updates, it brings an order of magnitude improvement to these operations, by only reprocessing changes records and rewriting only the part of the def~table, that was updated/deleted, as opposed to rewriting entire def~table-partitions or even the entire def~table.
  • Faster ETL/Derived Pipelines : An ubiquitous next step, once the data has been ingested from external sources is to build derived data pipelines using Apache Spark/Apache Hive or any other data processing framework to  def~ETL the ingested data for a variety of use-cases like def~data-warehousing, def~machine-learning-feature-extraction, or even just def~analytics. Typically, such processes again rely on def~batch-processing jobs expressed in code or SQL, that process all input data in bulk and recompute all the output results. Such data pipelines can be sped up dramatically, by querying one or more input tables using an def~incremental-query instead of a regular def~snapshot-query, resulting once again in only processing the incremental changes from upstream tables and then def~upsert or delete the target derived table, like above.
  • Access to fresh data :  It's not everyday, that you will find that reduced resource usage also result in improved performance, since typically we add more resources (e.g memory) to improve performance metric (e.g query latency) . By fundamentally shifting away from how datasets have been traditionally managed for may be the first time since the dawn of the big data era, Hudi, in fact, realizes this rare combination. A sweet side-effect of incrementalizing def~batch-processing is that the pipelines also much much smaller amount of time to run, putting data into hands of organizations much much quickly, than it has been possible with def~data-lakes before.
  • Unified Storage : Building upon all the three benefits above, faster and lighter processing right on top of existing def~data-lakes mean lesser need for specialized storage or  def~data-marts, simply for purposes of gaining access to near real-time data.

System Overview

With an understanding of key technical motivations for the projects, let's now dive deeper into design of the system itself. At a high level,  components for writing Hudi tables are embedded into an Apache Spark job using one of the supported ways and it produces a set of files on def~backing-dfs-storage, that represents a Hudi def~table. Query engines like Apache Spark, Presto, Apache Hive can then query the table, with certain guarantees (that will discuss below).

There are three main components to a def~table

  1. Set of  def~data-files that actually contain the records that were written to the table.
  2. An def~index (which could be implemented in many ways), that maps a given record to a subset of the data-files that contains the record.
  3. Ordered sequence of def~timeline-metadata about all the write operations done on the table, akin to a database transaction log.

Image Removed

Hudi provides the following capabilities for writers, queries and on the underlying data, which makes it a great building block for large def~data-lakes.

...

Design Principles

Streaming Reads/Writes : Hudi is designed, from ground-up, for streaming records in and out of large datasets, borrowing principles from database design. To that end, Hudi provides def~index implementations, that can quickly map a record's key to the file location it resides at. Similarly, for streaming data out, Hudi adds and tracks record level metadata via def~hoodie-special-columns, that enables providing a precise incremental stream of all changes that happened. 

...

Everything is a log : Hudi also has an append-only, cloud data storage friendly design, that lets Hudi manage data on across all the major cloud providers seamlessly, implementing principles from def~log-structured-storage systems

System Overview

With an understanding of key technical motivations for the projects, let's now dive deeper into design of the system itself. At a high level,  components for writing Hudi tables are embedded into an Apache Spark job using one of the supported ways and it produces a set of files on def~backing-dfs-storage, that represents a Hudi def~table. Query engines like Apache Spark, Presto, Apache Hive can then query the table, with certain guarantees (that will discuss below).

There are three main components to a def~table

  1. Ordered sequence of def~timeline-metadata about all the write operations done on the table, akin to a database transaction log.
  2. Set of  def~data-files that actually contain the records that were written to the table.
  3. An def~index (which could be implemented in many ways), that maps a given record to a subset of the data-files that contains the record.

Image Added


Hudi provides the following capabilities for writers, queries and on the underlying data, which makes it a great building block for large def~data-lakes.

  • upsert() support with fast, pluggable indexing
  • Incremental queries that scan only new data efficiently
  • Atomically publish data with rollback support, Savepoints for data recovery
  • Snapshot isolation between writer & queries using def~mvcc style design
  • Manages file sizes, layout using statistics
  • Self managed def~compaction of updates/deltas against existing records.
  • Timeline metadata to audit changes to data
  • GDPR, Data deletions, Compliance.

Timeline

Excerpt Include
def~timeline
def~timeline
nopaneltrue

...

Data Files

Table Types 

The implementation specifics of the two def~table-types are detailed below.

Excerpt Include
def~table-type
def~table-type
nopaneltrue

Copy On

...

Write Table

def~copy-on-write

Excerpt Include
def~copy-on-write
def~copy-on-write
nopaneltrue




Merge On Read Table

def~merge-on-read

Excerpt Include
def~merge-on-read
def~merge-on-read
nopaneltrue

...

  • Indexing : A big part of Hudi's efficiency comes from indexing the mapping from record keys to the file ids, to which they belong to. This index also helps the `HoodieWriteClient` separate upserted records into inserts and updates, so they can be treated differently. `HoodieReadClient` supports operations such as `filterExists` (used for de-duplication of table) and an efficient batch `read(keys)` api, that can read out the records corresponding to the keys using the index much quickly, than a typical scan via a query. The index is also atomically updated each commit, and is also rolled back when commits are rolled back.
  • Storage : The storage part of the DAG is responsible for taking an `RDD[HoodieRecord]`, that has been tagged as an insert or update via index lookup, and writing it out efficiently onto storage.

File Layout

<WIP>

Indexing

<WIP>

Hudi currently provides two choices for indexes : `BloomIndex` and `HBaseIndex` to map a record key into the file id to which it belongs to. This enables us to speed up upserts significantly, without scanning over every record in the table. Hudi Indices can be classified based on their ability to lookup records across partition. A `global` index does not need partition information for finding the file-id for a record key but a `non-global` does.

HBase Index (global)

Here, we just use HBase in a straightforward way to store the mapping above. The challenge with using HBase (or any external key-value store for that matter) is performing rollback of a commit and handling partial index updates.
Since the HBase table is indexed by record key and not commit Time, we would have to scan all the entries which will be prohibitively expensive. Instead, we store the commit time with the value and discard its value if it does not belong to a valid commit.

Bloom Index (non-global)

...

Data Files

Hudi organizes a datasets into a directory structure under a `basepath` on DFS. Dataset is broken up into partitions, which are folders containing data files for that partition, very similar to Hive tables. Each partition is uniquely identified by its `partitionpath`, which is relative to the basepath.

Within each partition, files are organized into `file groups`, uniquely identified by a `file id`. Each file group contains several `file slices`, where each slice contains a base columnar file (`*.parquet`) produced at a certain commit/compaction instant time, along with set of log files (`*.log.*`) that contain inserts/updates to the base file since the base file was produced. Hudi adopts a MVCC design, where compaction action merges logs and base files to produce new file slices and cleaning action gets rid of unused/older file slices to reclaim space on DFS.

Hudi provides efficient upserts, by mapping a given hoodie key (record key + partition path) consistently to a file group, via an indexing mechanism. This mapping between record key and file group/file id, never changes once the first version of a record has been written to a file. In short, the mapped file group contains all versions of a group of records.

Index

Excerpt Include
def~index
def~index
nopaneltrue

Writing

Write Operations

It may be helpful to understand the 3 different write operations provided by Hudi datasource or the delta streamer tool and how best to leverage them. These operations can be chosen/changed across each commit/deltacommit issued against the dataset.

def~upsert-operation: This is the default operation where the input records are first tagged as inserts or updates by looking up the index and  the records are ultimately written after heuristics are run to determine how best to pack them on storage to optimize for things like file sizing. This operation is recommended for use-cases like database change capture where the input almost certainly contains updates.
def~insert-operation: This operation is very similar to upsert in terms of heuristics/file sizing but completely skips the index lookup step. Thus, it can be a lot faster than upserts for use-cases like log de-duplication (in conjunction with options to filter duplicates mentioned below). This is also suitable for use-cases where the dataset can tolerate duplicates, but just need the transactional writes/incremental pull/storage management capabilities of Hudi.
def~bulk-insert-operation Both upsert and insert operations keep input records in memory to speed up storage heuristics computations faster (among other things) and thus can be cumbersome for initial loading/bootstrapping a Hudi dataset at first. Bulk insert provides the same semantics as insert, while implementing a sort-based data writing algorithm, which can scale very well for several hundred TBs of initial load. However, this just does a best-effort job at sizing files vs guaranteeing file sizes like inserts/upserts do.

Compaction

<WIP>

Cleaning

<WIP>

File Sizing

Hudi also performs several key storage management functions on the data stored in a Hudi dataset. A key aspect of storing data on DFS is managing file sizes and counts and reclaiming storage space. For e.g HDFS is infamous for its handling of small files, which exerts memory/RPC pressure on the Name Node and can potentially destabilize the entire cluster. In general, query engines provide much better performance on adequately sized columnar files, since they can effectively amortize cost of obtaining column statistics etc. Even on some cloud data stores, there is often cost to listing directories with large number of small files.

Here are some ways to efficiently manage the storage of your Hudi datasets.

  • The [small file handling feature](configurations.html#compactionSmallFileSize) in Hudi, profiles incoming workload and distributes inserts to existing file groups instead of creating new file groups, which can lead to small files.
  • Cleaner can be [configured](configurations.html#retainCommits) to clean up older file slices, more or less aggressively depending on maximum time for queries to run & lookback needed for incremental pull
  • User can also tune the size of the [base/parquet file](configurations.html#limitFileSize), [log files](configurations.html#logFileMaxSize) & expected [compression ratio](configurations.html#parquetCompressionRatio), such that sufficient number of inserts are grouped into the same file group, resulting in well sized base files ultimately.
  • Intelligently tuning the [bulk insert parallelism](configurations.html#withBulkInsertParallelism), can again in nicely sized initial file groups. It is in fact critical to get this right, since the file groups once created cannot be deleted, but simply expanded as explained before.
  • For workloads with heavy updates, the [merge-on-read storage](concepts.html#merge-on-read-storage) provides a nice mechanism for ingesting quickly into smaller files and then later merging them into larger base files via compaction.

...

Compaction

<WIP>

File Sizing

...

Querying

<WIP>


Snapshot Queries

...

Read Optimized Queries

<WIP>


Hive Integration

<wip>