...
- @<approver1 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
- @<approver2 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
- ...
Status
Current state:
Current State | |||||||||
---|---|---|---|---|---|---|---|---|---|
| |||||||||
| |||||||||
| |||||||||
| |||||||||
|
Discussion thread: here
JIRA: here
...
At a high level there are 3 components to implement z-order support: z-value generation, statistical info preservation, query engine integration. Next 3 subsections discuss this in detail.
Z-value generation
The core mechanism underlying Z-ordering is the ability to translate a tuple of several field values into a single numbe (call it z-value). Refer to Wik, we simply interleave the bits of each field in the record
...
we cannot convert it to bits directly, since in java the first bit of a negative number represents a sign。
Decimal value | Two’s complement signed integer |
0 | 0000 0000 |
1 | 0000 0001 |
2 | 0000 0010 |
126 | 0111 1110 |
127 | 0111 1111 |
-128 | 1000 0000 |
-127 | 1000 0001 |
-126 | 1000 0010 |
-2 | 1111 1110 |
-1 | 1111 1111 |
From the above table we find that, the bits value of -1/-2 is larger than 1/2 which is wrong.
Fortunately, we only need to modify our transformation method slightly to solve this problem just reverse the first bit of above table。
Decimal value | Two’s complement signed integer | With first bit flipped | Resulting unsigned integer decimal value |
0 | 0000 0000 | 1000 0000 | 128 |
1 | 0000 0001 | 1000 0001 | 129 |
2 | 0000 0010 | 1000 0010 | 130 |
126 | 0111 1110 | 1111 1110 | 254 |
127 | 0111 1111 | 1111 1111 | 255 |
-128 | 1000 0000 | 0000 0000 | 0 |
-127 | 1000 0001 | 0000 0001 | 1 |
-126 | 1000 0010 | 0000 0010 | 2 |
-2 | 1111 1110 | 0111 1110 | 126 |
-1 | 1111 1111 | 0111 1111 | 127 |
we find the bits value -1/-2 is small than 1/2 and the lexicographically order is same as origin value. Obviously, this method meets the requirements of zorder
Float Float/Double/Long/ converter:
follow the convert method just like int type.
Decimal Decimal/Date/TimeStamp converter:
firstly convert those value to long , then follow the convert method just like int type
Date UTF-8 String converter:
the bits value of String has the same lexicographically order with String, so we can produce the final result by follow steps:
TimeStamp converter:
String converter:
if the string value is exactly 8 bytes long, use the word’s UTF-8 representation as is.
if the word is shorter than 8 bytes long, add bytes with a value of zero to the end of the word’s UTF-8 representation until it is exactly 8 bytes.
if the word is longer than 8 bytes, use only the first 8 bytes of its UTF-8 representation.
The conversion method above seems perfect, however it cannot solve the following problems:
problem 1: If all dimension data are incremental positive integers starting from 0, the Z-value of the calculation results will be ordered according to the z-order curve.
However, if the dimension data involved in the calculation are not incremental data sets starting from 0, the z-values calculated are only part of the complete z-order curve, and have a poor aggregation effect
The above conversion method is obviously cannot guaranteed that the convert results are incremental positive integers starting from 0
problem 2: for string value, the convert methond can cause words with a common prefix to have the same 8-byte value . the reason of this problem is that By truncating each UTF-8 string to 4 bytes, we lose a great deal of precision.
...
...
eg: "https://www.baidu.com" , "https:www.google.com" has the same 8-byte value
val newRDD = df.rdd.map { row => |
to solve those two problems,Boundary-base Interleaved Index seems good。
- Boundary-based Interleaved Index
Sampling the data. From the sampled data, we filter the specified number of boundaries for each field participating in z-order and sort them.
Each field is mapped to the index of the data in boundaries, and then participates in the calculation of Z-value.
Since the index of boundaries must be continuous positive integer data starting from 0, it fully meets the calculation requirements of interleaving index
val indexRdd = internalRdd.mapPartitionsInternal { iter => |
2.The bits number of each dimension value are handled differently, such as how a short type and an int type cross merge bits. How to deal with those values
...
answer: Using Array[Byte] to stored the z-value (we limit the length of this array to 1024, just like Amazon DynamoDB); notice that now hbase rowkey is sorted by Array[Byte] internal we can copy those code directly as compartor for z-value.
statistical info save
support collect min-max and null value info is necessary when we finished clustering data by z-order. Use those statistical info, hudi can do file level filtering directly.
which information will be collected?
min-max value, null value count for all z-order columns will be collected.
How to collect those information?
There are two ways to achive goals.
- Collect those information by reading parquet header which stored min-max value and null value count. This method is not universal, only valid for parquet file. And
not all statistics stored in parquet header are collectly. For example parquet stored wrong statistics for timestamp value.
val sc = df.sparkSession.sparkContext |
- Calcaute those information by sparksql, which can collect all types of column statistics correctly with poor efficiency
val inputFiles = df.inputFiles |
Example generated: (zorder columns: col, col1)
file | col_minValue | col_maxValue | col_num_nulls | col1_minValue | col1_maxValue | col1_num_nulls |
xx-1 | 49152 | 65535 | 0 | 65536 | 98303 | 0 |
xx-2 | 65536 | 90111 | 0 | 65536 | 98303 | 0 |
- How to save those statistics info
We can introduce _index_ directory to store those statistics info, what’s more we can use a internal hudi cow table to save those statistics info, just like hudi metadata
Integrations with query engines
Generally speaking, we no need to do any extra jobs for query engine, since most query engines automatic use min-max value which stored in parquet/orc.
But, if we do can use statistics info in sql preparation phase, the effect of z-order will be better.
How to apply those statistic info to SparkSQL
- Load index table to indexDataFrame
- Construct data filter for indexDataFrame by using origin query filter
- Query indexDataFrame to select candidate files
- Use those candidate files to reconstruct hudiMemoryIndex.
The only question is how to construct data filter for indexDataFrame: just use the min-max value and null counts info to construct data filters.
def createZindexFilter(condition: Expression): Expression = { |
Rollout/Adoption Plan
- <What impact (if any) will there be on existing users?>
- <If we are changing behavior how will we phase out the older behavior?>
- <If we need special migration tools, describe them here.>
- <When will we remove the existing behavior?>
...