RFC - 06 : Add indexing support to the log file format

Proposers

Approvers

Status

Current stateINACTIVE

Discussion thread: here

JIRA:   Unable to render Jira issues macro, execution error.

Released: TBD

Abstract

Currently, the hudi log is not indexable. Hence, we cannot send inserts into the logs, forcing us to write out parquet files instead.. If we can solve this, we can unlock truly near-real time ingest on MOR storage, as well as take our file sizing story much further. Specifically, it will unlock a very common use-case where users just want to log incoming data as quickly as possible and later compact it to larger files. Through this effort, we will also be laying the groundwork for improving RT view performance as explained in detail below. 

Background

Current log format is depicted here. Its a simple, yet flexible format that consists of - version, type (avro data, deletes, command), header, optional content, footers. Headers and footers allow us to encode various metadata like commit instant time, schema for processing that log block. On DFS implementations that support append, there will be multiple of these log blocks present in a single log file version. However, on cloud/object stores without append support 1 log file version contains 1 log block. 

Coming back to the issue of indexing, the block of specific interest to us here is the `DataBlock`. BloomIndex is able to currently index the parquet/base files using bloom filters and range information stored inside the parquet footer. In copy-on-write storage, there are no log files and since the base file is indexable fully, we are able to send even inserts into the smallest base file and have it be visible for index lookup in the next commit. However, when it comes to merge-on-read, the lack of indexing ability on log blocks means we cannot safely assign inserts to an existing small file group and write them to the its log. Instead we write out base files also for inserts and thus incur additional cost for near real-time ingestion. Small file handling logic in MOR picks the file group with smallest base file that has no log files (if any) and expands it with new inserts. If it cannot find such a base file, a new parquet file is written. Thus we could also create more file groups than is really needed in such cases.. This RFC aims to address all of these issues.

Implementation

At a high level, we will introduce the following changes to a data block. 

  • Introduce a new version of avro data block, that also writes a sorted list of keys in the avro data.
  • Introduce three new footers : bloom filter, min, max key ranges for avro data (all cumulative) 
    • bloom filter is built with all keys encountered so far in the log i.e including all previous committed data blocks
    • Similarly min, max ranges are for the entire set of committed data blocks in that log file. 

Diagram below shows one file group with several log file versions and multiple data blocks inside them. Old data blocks simply contain records, where as new blocks the additional keys, bloom filter and ranges as described above. Once a compaction runs, all logs will only contain new blocks. For indexing lookup, we will simply read the bloom filter and min/max from the last data block (similar overhead/performance as reading base file footers today). If there are matches or false positives, then we will need to read all data blocks in the logs. Thanks to logging keys separately, we don't pay the overhead of reading the entire avro records for checking against the actual keys. It can be done by simply reading the keys we wrote in the new data blocks. Old data blocks again would pay this penalty until compaction runs at least once for that file group.


hudi-log-indexing


Rollout/Adoption Plan

  • This change does introduce a change to log format, but the indexing will seamlessly handle older log blocks which were written without this by computing the bloom filters, min, max from actual data.
  • One thing worth calling out is that : as users pick up this change there might be a slowdown due to the approach above, since a lot of avro data is being full read out. but once sufficient compactions run, the overhead will approach zero and index lookup should perform equivalent to how base file index lookup.
  • Once the log indexing is deemed stable in the next 1-2 releases, we will eventually get rid of all code paths that special case based on index.canIndexLogFiles() 

Test Plan

  • Unit tests including ones that cover a log with both old and new data blocks.
  • Testing with delta streamer continuous mode, by running a test pipeline for few days: checking if the keys are matching, no duplicates, so forth

5 Comments

  1. I want to understand how this interplays with compaction. Lets assume all logs are of new format. 

    Once compaction completes, those log blocks/files not compacted will have range info pertaining to compacted ones right? When this will get fixed? Won't the look up return true for those keys from compacted log files. I have attached two diagrams depicting before and after compaction. If you look at 2nd pic (after compaction), ideally min and max should have been 6 and 11. 

    In general, when does the key range pruning will happen? And will the bloom filter also be adjusted? 

    Also, can you help me understand, how often a partition will be left w/ no delta files? Conversely speaking, if there is always going to be few delta files at any point in time, then the range pruning will never happen right? 

    For eg,

    at t1 :  we have delta_file1 to delta_file5. 

    at t2 : compaction removes delta_file1 to delta_file4. delta_file5 will have key range from delta_file1 to delta_file5

    at t3: 3 more delta files are added. delta_file6 to delta_file8. delta_file8 will have key range from delta_file1 to delta_file8

    at t4: compaction removes delta_file4 to delta_file7. delta_file8 will have key range from delta_file1 to delta_file8

    at t5: 5 more are added. delta_file9 to delta_file13. delta_file13 will have key range from delta_file1 to delta_file13.

    If this pattern continues, then key range pruning will never happen only. 

    So, is there any guarantee on how often full clean up will happen? I mean, all files are compacted fully and only base file exists. Only in this case, new delta file will have new key range? 

    Let me know if my understanding is wrong anywhere. 

    // Looks like I need permission to attach images. But I don't find any link to request permission though. Can someone approve me. 



  2. Gave you perms.. you are right.

    With inserts and deletes happening 

    • The range information post compaction i.e real range can be actually smaller (higher min/lower max) than what the last log block contains. But we will safely prune by a large range info that can include more files than it should.
    • Same goes for bloom filter. it could contain hits for deleted records, leading to false positives

    We could do special handling for this. But first, want to get on the same page as to whether or not you think this is a correctness issue i.e leading to missing a file that can potentially contain the record or bloom filter not reflecting a key that could be present in the file group ? 

  3. I feel that if the table is active for sufficient amount of time, then bloom will start to return true for every key, isnt? 

    I don't see any issues w/ correctness, but the effectiveness of introducing this might diminish over time is what I am concerned about. 

    Here is an idea that I feel could work out. 

    On a high level, the key range and bloom will cover only for N log files at a time. at N+1 we start over again, but keep track of prev log file to be read. Let me illustrate with an example. 

    // to keep things simple, I am considering only one block per log file and keys are evenly lined up with each log file having 100 keys. 


    Based on this example, once every 10 log files, we start over, but keep track of prev log file to be looked up. For eg: Log file 1 to 10 is a clique(I am not sure if we can call it partition, but don't want to overload the term, hence using clique). Log file 11 to 20 is a clique, and so on. What this means is that, log file 10 will contain all key range info from log file 1 to 10. Log file 20 will contain key range info from log file 11 to 20 and so on. 

    In general, for bloom look up, we just need to traverse the chain from latest log file until null or we don't find the log file (if its compacted) for "Prev log file for bloom look up" 

    Assuming no compaction, with current state, traversing the chain will give us log file 30, 20 and 10. We do bloom look up in all these. Infact, we could even search for keys only in the clique which returned true among these instead of entire set of log files.

    Lets say, compaction compacts log file 1 to 15. 


    Traversing the chain will give log file 30 and log file 20 and do a bloom look up in these only. Since log file 10 doesn't exist, we can ignore. 

    Positives I see in this approach:

    - Keys are always trimmed and contains key range info only for previous N of log files. So, our key range info will not be too outdated after many iterations of compactions. 

    - We keep the bloom also within bounds since each clique contains key range info only for certain no of log files and not everything that have been seen. 

    - We don't heavily rely on compaction to compact all delta files (a.k.a major compaction) as per current proposal. In the sense that, as per the proposal, if atleast one log file exists all the time, then after 2 to 3 months of active data set, bloom look up might start to return true for any key look up.

    Note: I am not claiming that current proposal claims that it relies heavily on major compaction, but I feel that the success of this relies on that. If not for major compaction, I don't see a major benefit of this as it may not serve the purpose after sufficient active period.

    - Since we have different cliques with each having its own bloom, we can look up the log files in those clique which returned true and not in all log files. For eg: from the figure attached, lets say Log file 20 bloom alone returned true while log file 30 and log file 10 returned false, we could just look up for keys in log file 11 to 20 instead of all log files. 

    I am making certain assumptions wrt key distributions, and hence feel that key pruning from time to time would give us a good benefit. Not sure whats in reality. 

    Anyways, let me know your thoughts and comments on this.




  4. If we feel that traversing the chain one file at time would be costly, we can list the chain only in every log file. 


    Whenever new log file is generated, we can remove those log files already compacted from the chain, hence keeping the list within bounds. 




  5. sivabalan narayanan  First of all , thanks for such a detailed write up! Really appreciate the time you have taken to illustrate the point... 

    The concern you raise is very valid. you examples for have used inserts purely? or atleast non-overlapping ranges of keys for each log file, which may be far from reality. I am wondering if conceptually, the clique can be mapped to a FileSlice? A new FileSlice is created when compaction is scheduled and the until the compaction is done, we pretend like the new log files written by ingestion is part of the old file slice.. You are right that if there are always uncompacted log files (I think its fair to make this assumption even if it might be true only for actively written partitions), the bloom and range info will keep expanding. yes! However, the main problem is for deletes - i.e the type of records that can cause the compacted file's bloom filter and range to be smaller than whats carried over in the log. ?  For updates  in the log, they don't cause any changes to range/bloom. For inserts, they can expand the range/bloom and rightfully so that they can be located next time around.. 


    I think we can do a smart approach to have the ingestion add a field marking whether a log file (assume 1 contains 1 log block) was written when pending compaction, later when the compacted base file is actually available, then it can recompute the bloom/index accurately once and log that?  This is a good perf optimization and in any case, I think it makes sense to write both accumulated and individual bloom/range information, so these can be easily combined by just reading the footers vs actually reading the keys out.. ? WDYT?