You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

 RFC - 08 : Record level indexing mechanisms for Hudi datasets (Partition agnostic, similar to existing HbaseIndex)

Proposers

Approvers

Status

Current state: "Under Discussion"

Discussion thread: here

JIRA: here

Released: <Hudi Version>

Abstract

Hudi tables allow many operations to be performed on it, along with a very popular one, upsert(). To be able to support upserts, Hudi depends on an indexing scheme to route the incoming record to the correct file. 

Currently, Hudi supports partitioned and non-partitioned datasets. A partitioned dataset is one which bucketizes groups of files (data) into buckets called partitions. A hudi dataset may be composed of N number of partitions with M number of files. This structure helps canonical hive/presto/spark queries to limit the amount of data read by using the partition as a filter. The value of the partition/bucket in most cases is derived from the incoming data itself. The requirement is that once a record is mapped to a partition/bucket, this mapping should be a) known to hudi b) should remain constant for the lifecycle of the dataset for hudi to perform upserts on them. 

Consequently, in a non-partitioned dataset one can think of this problem as a record key <-> file id mapping that is required for hudi to be able to perform upserts on a record.  

Current solution is either a) for the client/user to provide the correct partition value as part of the payload or b) use a GlobalBloomIndex implementation to scan all the files under a given path (say non-partitioned table). In both these cases, we are limited either by the capability of the user to provide this information or by the performance overhead of scanning all files' bloom index. 

The proposal is to implement a new index format that is a mapping of (recordKey <-> fileId). This mapping will be stored and maintained by Hudi as another implementation of HoodieIndex and will address the 2 limitations mentioned above.

Background

Types of datasets


HUDI storage abstraction is composed of 2 main components : 1) The actual data stored 2) An index that helps in looking up the location (file_Id) of a particular record key. Without this information, HUDI cannot perform upserts to datasets. We can broadly classify all datasets ingested in the data lake into 2 categories.

Append Only


An append only dataset is one where every new entry written to that table is mutually disjoint from any other entry written previously to that table. More concretely, each row in the table is a new row and has no overlap with a previously written row. You can imagine a table that is ingesting logs from an application, each new log is a new entry into the table, with little or no relation to the log entries written before it. A new entry into such a dataset hence does not require ANY context from the previous entries in the dataset to determine where this entry should be written to. 

Append + Update


An append + update, a.k.a, upsert type of dataset presents a different challenge. Any entry written to such a dataset may depend or belong to a previously written row. More concretely, each row in the table is a NOT necessarily a new row and may have overlap with a previously written row. In such a scenario, a system needs to be able to determine which row should this new update to written to, hence requiring the need to find out which file-id, update should be routed to.e

The 3 approaches uses by consumers of HUDI are as follows : 

  1. The data layout is divided in buckets called partitions and each partition consists of N number of files. The client maintains a mapping of record key <-> file id for updatable tables and provides the partition information before passing off the records to Hudi for processing. The HoodieBloomIndex implementation scans all the BloomIndexes of all files under a partition, and if matches, verifies the record key lies in that file. This way hoodie is table to “tag” the incoming records with the correct file ids.
  2. The data layout is a flat structure, where 1 single directory contains all the files for the updatable table. The GlobalHoodieBloomIndex implementation scans all the BloomIndexes of all files, and if matches, verifies the record key lies in that file. This way hoodie is table to “tag” the incoming records with the correct file ids. The difference between the first and second is that if the number of files is very large under this base path, the indexing time can blow up.
  3. For append only datasets which do not require updating, simply use the partition from the current payload, for eg. the current timestamp.

With a large amount of data being written to data warehouses, there is a need to better partition this data to be able to improve query performance. Hive is a good tool for performing queries on large datasets, especially datasets that require full table scans. But quite often there are instances where users need to filter the data on specific column values. Partitions help in efficient querying of the dataset since it limits the amount of data that needs to be queried. In non-partitioned tables, Hive/Presto/Spark would have to read all the files in a table’s data directory and subsequently apply filters on it. This is slow and expensive—especially in cases of large tables. 

Implementation

Option 1

Writer

The batch of ingested records are mapped into (HoodieKey, ParitionPath, FileID). Once this is done, the HoodieKey is hashed and a bucketID is assigned to each ROW, each bucket holds a bunch of Triplets ((HoodieKey, ParitionPath, FileID). This helps to bound the number of entries written to each bucket and controls growth/size of each bucket.

At a high level, each bucket stores these mappings as HFiles. For a batch, records are first tagged using the existing index. For new inserts, the records are assigned bucket ID’s by hashing HoodieKey. The records are then partitioned, each Spark Partition works on a bucket, opening a HFile and adding entries to it. 

Reader

The incoming records (current batch) are partitioned into their respective buckets (by hashing the HoodieKey). Each Spark partition is responsible for a bucket. A HFile reader is opened in each partition and the HoodieKey is searched in the HFile. Since HFiles are indexed, the search is relatively fast (need to do some POC tests to get numbers). 

After this process, one can tag the incoming records with HoodieRecordLocations. 

A more detailed explanation of Option 1 is here.

Option 2

In this method, rows are hashed into nb number of buckets such that the total num of buckets are bounded. A spark DAG is used to partition rows and to sort them within each partition. Actual read and write operations happen within the executors.

Write Phase

Input rows are sorted and merged with existing ones into a sequence file. Hence, every index write to a bucket write re-writes the whole file. The file is named after the current Hoodie commit, so that only index files corresponding to successful commits are considered. Index files corresponding to failed commits are rolled-back before the beginning of the job. Thus partial failures are handled.

Read Phase

Input rows are sorted and read from an iterator, at the same time the sorted index file corresponding to the bucket is read to collect values for existing indexes and returned. Hence every index read of a bucket reads the file sequentially. Using this one can basically iterate over the 2 sides - sorted rows and sorted records from index file, compare all uuids with one iteration.

Estimated size of a large dataset (some back of the envelope calculation)

Number of records 100 billion

Size of each entry = sum(sizeOf(key) + sizeOf(partitionPath) + sizeOf(FileID))

Taking each entry of the size of a UUID (36 bytes), total size of 100 billion entries = 100 billion * ~ 100 bytes ~= 10 TB.

Reduced size of a large dataset

(Optimized storage format)

PartitionPath -> {FileID1 FileID2}

FileID1 -> {UUID1, ….}

Total Size of Index = 100 billion * sizeOf(UUID) + Number of Partitions* SizeOf(partitionPath) + Number Of Files* sizeOf(FileID)

As you can see, we have stopped the linear growth of partitionPath and FileId leading to significant size reduction of the Index ~ (1/3rd) of the original size.

So for 100 billion records, the size of the index ~= 3TB

Detailed design of Option 1


Components of a Global Index

IndexCompactor -> Runs similar to cleaner() based on a minNumIndexFilesPerBucket and maxNumIndexFilesPerBucket. Each compaction operation will create a new logFile with a new baseCommit for each UUID (fileID). 

NOTE : The INDEX_COMPACTION and INGESTION need to be part of the same commit to support atomic publish and rollback of both. 

FileSystemView.getLatestIndexFiles() -> This gets all FileGroups/FileSlices under .index/buckets, checks the baseCommit against the Timeline to validate the eligible ones and return them to the index compactor or the LogIndexReader.
LogScanner -> Similar to the current log scanner, always Lazy. This scanner will perform the operations of rolling back invalid log blocks and finally providing a list of valid log blocks to be read.
CustomIndexLogReader -> This will read the lazy log blocks. The trick is to move the pointer to the correct logBlock locations on HDFS and pass it to open a HFile.Reader (if using HFile) or something similar  instead of passing the file name. HFile.Reader does not have an API to read just from the FsDataInputStream, a path needs to be passed every-time.

One way to do this is to borrow concepts from LevelDB, where we have a bloomfilter for existence check of a row key in a log file and then a sorted string output format to be able to binary search in the file if bloomfilters results in false positive.

Evaluate using an embedded RocksDB output into the logblock.

HoodieRealTimeCompactor -> This will be refactored in a way that Compaction can work for any type of workload, hence, some abstractions will be required here.
IndexCompactorOutput -> Once the log blocks are read, a CustomIndexCompactorOutput will merge all log blocks, write 1 IndexFile into 1 LogBlock into 1 LogFile. 
Rollback -> IndexCompactor will perform rollback operation for index files similar to how ingestion and compaction do. Essentially, when an index.update() is taking place, a .inflight file is opened. We will write the workload information (the bucketID, FileUUID) for the index RDD into the .inflight file (similar to how we do for MergeOnRead workload). If this batch results in a failure, just use the MergeOnRead rollback path to rollback the indexes as well, this means writing ROLLBACK_BLOCKS to the identified buckets and logFiles from the .inflight information.
Hashing Sequence (Simple version) -> First Hash(row_key) % nb to get the bucketID. Next, open a LogFileWriter for a new logFile (internally opens a IndexFileWriter in a LogBlock) and writes out the hashed row_keys, partition_path and file_id. On the tagLocation, each batch of row_keys for a bucketID is checked against all IndexFiles - This means all LogFiles and LogBlocks present. We may open up multiple IndexFileReaders at the same time and parallelize this operation. Finally, the idea is for the IndexCompactor to compact aggressively to bound the tagLocation time.


Caveats 

  1. For large datasets, if the number of buckets is kept static (which it needs to once it hashes out the row_keys), the size of each bucket (size of the IndexFile) will keep growing and increasing the lookup time since more random IO will be done. 
  2. We may run into some IndexFile limits  at some point and the large IndexFile might need to be splitted into multiple smaller IndexFiles, thus increasing the lookup time even further (signified by uuid1 and uuid3 in the above diagram) and adding complexity.
  3. How do we handle corruption of an IndexFile ?


Terminology 

IndexFile -> A file that stores key-values in an efficient file format that allows for quick, random look up through using indexes, bloom filters etc, doing the least amount of seek possible.

Option 3 (discarded)


Some suggestions came up as part of the discussion thread. A UUID is generally composed of multiple components, one of them being a timestamp. One can use timestamp ordering to generate a (min,max) range for each file which would eventually help in answering which file contains the uuid rather than maintaining a separate indexing system (some details here : https://www.percona.com/blog/2014/12/19/store-uuid-optimized-way/)

Update : On looking further into this, found out that there are multiple types and variants of UUID generation, the popular ones being a) random b) time-based. The popularly used java.util.UUID is actual a random based generator and hence does not guarantee any ordering of parts of the uuid. Although there is another variant of UUID that is time-based, it is possible to have the uuid be generated upstream (outside of hudi) in which case uuid ordering will anyways be broken. Hence, we are discarding this solution.

Rollout/Adoption Plan

  • Existing Users will have to perform a 1 time migration of existing index (bloom, user driven partition path etc) to this index. We will look into write a tool for this.
  • This new index and the old index’s contracts will not change and that should be seamless to the client. 
  • We will need a migration tool to bootstrap the global index from the existing dataset.

Test Plan

<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>

 RFC-8 : Record level indexing mechanisms for Hudi datasets (Partition agnostic, similar to existing HbaseIndex)

Proposers

Approvers

Status

Current state: "Under Discussion"

Discussion thread: here

JIRA: here

Released: <Hudi Version>

Abstract

Hudi tables allow many operations to be performed on it, along with a very popular one, upsert(). To be able to support upserts, Hudi depends on an indexing scheme to route the incoming record to the correct file. 

Currently, Hudi supports partitioned and non-partitioned datasets. A partitioned dataset is one which bucketizes groups of files (data) into buckets called partitions. A hudi dataset may be composed of N number of partitions with M number of files. This structure helps canonical hive/presto/spark queries to limit the amount of data read by using the partition as a filter. The value of the partition/bucket in most cases is derived from the incoming data itself. The requirement is that once a record is mapped to a partition/bucket, this mapping should be a) known to hudi b) should remain constant for the lifecycle of the dataset for hudi to perform upserts on them. 

Consequently, in a non-partitioned dataset one can think of this problem as a record key <-> file id mapping that is required for hudi to be able to perform upserts on a record.  

Current solution is either a) for the client/user to provide the correct partition value as part of the payload or b) use a GlobalBloomIndex implementation to scan all the files under a given path (say non-partitioned table). In both these cases, we are limited either by the capability of the user to provide this information or by the performance overhead of scanning all files' bloom index. 

The proposal is to implement a new index format that is a mapping of (recordKey <-> fileId). This mapping will be stored and maintained by Hudi as another implementation of HoodieIndex and will address the 2 limitations mentioned above.

Background

Types of datasets


HUDI storage abstraction is composed of 2 main components : 1) The actual data stored 2) An index that helps in looking up the location (file_Id) of a particular record key. Without this information, HUDI cannot perform upserts to datasets. We can broadly classify all datasets ingested in the data lake into 2 categories.

Append Only


An append only dataset is one where every new entry written to that table is mutually disjoint from any other entry written previously to that table. More concretely, each row in the table is a new row and has no overlap with a previously written row. You can imagine a table that is ingesting logs from an application, each new log is a new entry into the table, with little or no relation to the log entries written before it. A new entry into such a dataset hence does not require ANY context from the previous entries in the dataset to determine where this entry should be written to. 

Append + Update


An append + update, a.k.a, upsert type of dataset presents a different challenge. Any entry written to such a dataset may depend or belong to a previously written row. More concretely, each row in the table is a NOT necessarily a new row and may have overlap with a previously written row. In such a scenario, a system needs to be able to determine which row should this new update to written to, hence requiring the need to find out which file-id, update should be routed to.e

The 3 approaches uses by consumers of HUDI are as follows : 

  1. The data layout is divided in buckets called partitions and each partition consists of N number of files. The client maintains a mapping of record key <-> file id for updatable tables and provides the partition information before passing off the records to Hudi for processing. The HoodieBloomIndex implementation scans all the BloomIndexes of all files under a partition, and if matches, verifies the record key lies in that file. This way hoodie is table to “tag” the incoming records with the correct file ids.
  2. The data layout is a flat structure, where 1 single directory contains all the files for the updatable table. The GlobalHoodieBloomIndex implementation scans all the BloomIndexes of all files, and if matches, verifies the record key lies in that file. This way hoodie is table to “tag” the incoming records with the correct file ids. The difference between the first and second is that if the number of files is very large under this base path, the indexing time can blow up.
  3. For append only datasets which do not require updating, simply use the partition from the current payload, for eg. the current timestamp.

With a large amount of data being written to data warehouses, there is a need to better partition this data to be able to improve query performance. Hive is a good tool for performing queries on large datasets, especially datasets that require full table scans. But quite often there are instances where users need to filter the data on specific column values. Partitions help in efficient querying of the dataset since it limits the amount of data that needs to be queried. In non-partitioned tables, Hive/Presto/Spark would have to read all the files in a table’s data directory and subsequently apply filters on it. This is slow and expensive—especially in cases of large tables. 

Implementation

Option 1

Writer

The batch of ingested records are mapped into (HoodieKey, ParitionPath, FileID). Once this is done, the HoodieKey is hashed and a bucketID is assigned to each ROW, each bucket holds a bunch of Triplets ((HoodieKey, ParitionPath, FileID). This helps to bound the number of entries written to each bucket and controls growth/size of each bucket.

At a high level, each bucket stores these mappings as HFiles. For a batch, records are first tagged using the existing index. For new inserts, the records are assigned bucket ID’s by hashing HoodieKey. The records are then partitioned, each Spark Partition works on a bucket, opening a HFile and adding entries to it. 

Reader

The incoming records (current batch) are partitioned into their respective buckets (by hashing the HoodieKey). Each Spark partition is responsible for a bucket. A HFile reader is opened in each partition and the HoodieKey is searched in the HFile. Since HFiles are indexed, the search is relatively fast (need to do some POC tests to get numbers). 

After this process, one can tag the incoming records with HoodieRecordLocations. 

A more detailed explanation of Option 1 is here.

Option 2

In this method, rows are hashed into nb number of buckets such that the total num of buckets are bounded. A spark DAG is used to partition rows and to sort them within each partition. Actual read and write operations happen within the executors.

Write Phase

Input rows are sorted and merged with existing ones into a sequence file. Hence, every index write to a bucket write re-writes the whole file. The file is named after the current Hoodie commit, so that only index files corresponding to successful commits are considered. Index files corresponding to failed commits are rolled-back before the beginning of the job. Thus partial failures are handled.

Read Phase

Input rows are sorted and read from an iterator, at the same time the sorted index file corresponding to the bucket is read to collect values for existing indexes and returned. Hence every index read of a bucket reads the file sequentially. Using this one can basically iterate over the 2 sides - sorted rows and sorted records from index file, compare all uuids with one iteration.

Estimated size of a large dataset (some back of the envelope calculation)

Number of records 100 billion

Size of each entry = sum(sizeOf(key) + sizeOf(partitionPath) + sizeOf(FileID))

Taking each entry of the size of a UUID (36 bytes), total size of 100 billion entries = 100 billion * ~ 100 bytes ~= 10 TB.

Reduced size of a large dataset

(Optimized storage format)

PartitionPath -> {FileID1 FileID2}

FileID1 -> {UUID1, ….}

Total Size of Index = 100 billion * sizeOf(UUID) + Number of Partitions* SizeOf(partitionPath) + Number Of Files* sizeOf(FileID)

As you can see, we have stopped the linear growth of partitionPath and FileId leading to significant size reduction of the Index ~ (1/3rd) of the original size.

So for 100 billion records, the size of the index ~= 3TB

Detailed design of Option 1


Components of a Global Index

IndexCompactor -> Runs similar to cleaner() based on a minNumIndexFilesPerBucket and maxNumIndexFilesPerBucket. Each compaction operation will create a new logFile with a new baseCommit for each UUID (fileID). 

NOTE : The INDEX_COMPACTION and INGESTION need to be part of the same commit to support atomic publish and rollback of both. 

FileSystemView.getLatestIndexFiles() -> This gets all FileGroups/FileSlices under .index/buckets, checks the baseCommit against the Timeline to validate the eligible ones and return them to the index compactor or the LogIndexReader.
LogScanner -> Similar to the current log scanner, always Lazy. This scanner will perform the operations of rolling back invalid log blocks and finally providing a list of valid log blocks to be read.
CustomIndexLogReader -> This will read the lazy log blocks. The trick is to move the pointer to the correct logBlock locations on HDFS and pass it to open a HFile.Reader (if using HFile) or something similar  instead of passing the file name. HFile.Reader does not have an API to read just from the FsDataInputStream, a path needs to be passed every-time.

One way to do this is to borrow concepts from LevelDB, where we have a bloomfilter for existence check of a row key in a log file and then a sorted string output format to be able to binary search in the file if bloomfilters results in false positive.

Evaluate using an embedded RocksDB output into the logblock.

HoodieRealTimeCompactor -> This will be refactored in a way that Compaction can work for any type of workload, hence, some abstractions will be required here.
IndexCompactorOutput -> Once the log blocks are read, a CustomIndexCompactorOutput will merge all log blocks, write 1 IndexFile into 1 LogBlock into 1 LogFile. 
Rollback -> IndexCompactor will perform rollback operation for index files similar to how ingestion and compaction do. Essentially, when an index.update() is taking place, a .inflight file is opened. We will write the workload information (the bucketID, FileUUID) for the index RDD into the .inflight file (similar to how we do for MergeOnRead workload). If this batch results in a failure, just use the MergeOnRead rollback path to rollback the indexes as well, this means writing ROLLBACK_BLOCKS to the identified buckets and logFiles from the .inflight information.
Hashing Sequence (Simple version) -> First Hash(row_key) % nb to get the bucketID. Next, open a LogFileWriter for a new logFile (internally opens a IndexFileWriter in a LogBlock) and writes out the hashed row_keys, partition_path and file_id. On the tagLocation, each batch of row_keys for a bucketID is checked against all IndexFiles - This means all LogFiles and LogBlocks present. We may open up multiple IndexFileReaders at the same time and parallelize this operation. Finally, the idea is for the IndexCompactor to compact aggressively to bound the tagLocation time.


Caveats 

  1. For large datasets, if the number of buckets is kept static (which it needs to once it hashes out the row_keys), the size of each bucket (size of the IndexFile) will keep growing and increasing the lookup time since more random IO will be done. 
  2. We may run into some IndexFile limits  at some point and the large IndexFile might need to be splitted into multiple smaller IndexFiles, thus increasing the lookup time even further (signified by uuid1 and uuid3 in the above diagram) and adding complexity.
  3. How do we handle corruption of an IndexFile ?


Terminology 

IndexFile -> A file that stores key-values in an efficient file format that allows for quick, random look up through using indexes, bloom filters etc, doing the least amount of seek possible.

Option 3 (discarded)


Some suggestions came up as part of the discussion thread. A UUID is generally composed of multiple components, one of them being a timestamp. One can use timestamp ordering to generate a (min,max) range for each file which would eventually help in answering which file contains the uuid rather than maintaining a separate indexing system (some details here : https://www.percona.com/blog/2014/12/19/store-uuid-optimized-way/)

Update : On looking further into this, found out that there are multiple types and variants of UUID generation, the popular ones being a) random b) time-based. The popularly used java.util.UUID is actual a random based generator and hence does not guarantee any ordering of parts of the uuid. Although there is another variant of UUID that is time-based, it is possible to have the uuid be generated upstream (outside of hudi) in which case uuid ordering will anyways be broken. Hence, we are discarding this solution.

Rollout/Adoption Plan

  • Existing Users will have to perform a 1 time migration of existing index (bloom, user driven partition path etc) to this index. We will look into write a tool for this.
  • This new index and the old index’s contracts will not change and that should be seamless to the client. 
  • We will need a migration tool to bootstrap the global index from the existing dataset.

Test Plan

<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>

  • No labels