Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Proposers

Approvers

  • @<approver1 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
  • @<approver2 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]

Status

Current state


Current State

UNDER DISCUSSION

(tick)

IN PROGRESS


ABANDONED


COMPLETED

(tick)

INACTIVE


Discussion thread:

...

 here

JIRA:

...

here

Released: <Hudi Version>

Abstract

Hudi tables allow many operations to be performed on it, along with a very popular one, upsert(). To be able to support upserts, Hudi depends on an indexing scheme to route the incoming record to the correct file.
Currently, Hudi index implementation is pluggable and provides two options:

...

Write Implementation

How hash index works

In order to obtain the mapping between files and buckets, filename is modified to `$`${bucketId}_${filegroupId}_${UUID}_${timestamp}.parquet/log`, like `00000002`00000002-0000-4035-a392-22a91eafd130_0-2-0_20210527160855.parquet`. In this way, it is no need to apply extra space to store the mapping.If

For the mapping between bucket and hoodie record, the record finds its bucket with its index key's hash mod value, `Hash(recordKey) % N`, N is the bucket num of a Hudi table is bucketed . If the table is partitioned, each partition is divided into N buckets and the record need to get the partition first, then find the bucket. 

each bucket will hold a mapping of record key to bucket id implicitly by calculating `Hash(recordKey)%N`. There are two usage patterns for the relationship between bucket and file group:

...

  1. One file group per bucket: this is the simplest and most efficient way.

BucketId calculated by has mod value is assigned to fileGroupId, similar to other index methods. The only difference is the `tag location` proccess.

Image AddedImage Removed

2. Multiple file groups per bucket: this is useful if data is skewed writing or grows a lot. 

(todo)

In order to obtain the mapping between files and buckets, filename is modified to `${bucketId}_${filegroupId}_${UUID}_${timestamp}.parquet/log`.

Comparsion


Pattern 1

Pattern 2

Number of file groups per bucket

1

>1

Can avoid random accessyesno

Implementation complexity

simple

complex

Can avoid data skew when writing

no

yes

Good support for data growth

bad

great

This proposal will implement pattern 1.

...

Because the number of buckets is calculated according to the estimated amount of data, with the rapid growth of data, the size of a single bucket becomes too large, which will reduce the read and write performance.
Similar to the hashmap expansion process, the number of buckets can be expanded by multiples, so

Overwrite

A naive way is to reshuffle and rewrite the whole table when bucket num is changed. It is suitable for small table and doesn't care about how the bucket num changed.

Multiple Rehash

The number of buckets expanded by multiples is recommended. For multiple expansion, cluster the data by rehashing so that the existing data can be redistributed in a lightweight manner without shuffling.

For example, 2 buckets expanding to 4 will split the 1st bucket and rehashing data in it to two smaller buckets: 1st and 3st bucket, and the 2st bucket before is changed to 2st and 4st smaller one.

Image Added

         Extendable Hash

(todo)

  • Data skew

Data skew means that the data size of some buckets will be too small or too large, resulting in a long tail of reads and writes, and increasing end-to-end time.
It is difficult to have a better solution on the compute engine side to solve this problem.

Configuration

hoodie.index.type=BUCKET_HASH_INDEX
hoodie.hash.index.bucket.num=1024
hoodie.datasource.write.indexkey.field=colA (index key should be the super set of the record key)

...