Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Div
classhome-banner

 RFC - 28 : Support Z-order curve

Table of Contents

Proposers

Approvers

  • @<approver1 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
  • @<approver2 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
  • ...

Status

Current state


Current State

Status
titleUnder Discussion

(tick)

Status
colourYellow
titleIn Progress


Status
colourRed
titleABANDONED


Status
colourGreen
titleCompleted

(tick)

Status
colourBlue
titleINactive


...

JIRA: here

Released: <Hudi Version>

Abstract

Z-order is a technique that allows you to map multidimensional data to a single dimension. We can use this feature to improve query performance.


Background

Z-order is not currently supported by open source engines and open source data Lake components. It makes sense to introduce this feature to Hudi


Introducing Z-order

Z-order is a technique that allows you to map multidimensional data to a single dimension.

...

It can be seen that if we sort the data according to the order of z-values and divide it into four files on average, no matter we use X or Y field filtering for point query in the query, we can skip half of the irrelevant files. If the amount of data is larger, the effect will be better. That is to say, the file based on z-order partition storage, It can have better data skipping effect on multiple fields.  Fortunately, Z-order is not limited to 2-dimensional space—it can be abstracted to work in any number of dimensions.

Implementation

At a high level there are 3 components to implement z-order support: z-value generation, statistical info preservation, query engine integration. Next 3 subsections discuss this in detail.


Z-value generation

The core mechanism underlying Z-ordering is the ability to translate a tuple of several field values into a single numbe (call it z-value). Refer to Wik, we simply interleave the bits of each field in the record

...

answer: Using Array[Byte] to stored the z-value (we limit the length of this array to 1024, just like Amazon DynamoDB); notice that now hbase rowkey is sorted by Array[Byte] internal  we can copy those code directly as compartor for z-value.


statistical info save

support collect min-max and null value info is necessary when we finished clustering data by z-order. Use those statistical info, hudi can do file level filtering directly.

...


/** collect statistic info*/

val inputFiles = df.inputFiles
val conf = df.sparkSession.sparkContext.hadoopConfiguration
val values = cols.flatMap(c => Seq( min(col(c)).as(c + "_minValue"), max(col(c)).as(c + "_maxValue"), count(c).as(c + "_noNullCount")))
val valueCounts = count("*").as("totalNum")
val projectValues = Seq(col("file")) ++ cols.flatMap(c =>
Seq(col(c + "_minValue"), col(c + "_maxValue"), expr(s"totalNum - ${c + "_noNullCount"}").as(c + "_num_nulls")))


val result = df.select(input_file_name() as "file", col("*"))
.groupBy($"file")
.agg(valueCounts, values: _*).select(projectValues:_*)
result

Example generated: (zorder columns: col, col1)

file

col_minValue

col_maxValue

col_num_nulls

col1_minValue

col1_maxValue

col1_num_nulls

xx-1

49152

65535

0

65536

98303

0

xx-2

65536

90111

0

65536

98303

0

...

We can introduce _index_ directory to store those statistics info, what’s more we can use a internal hudi cow table to save those statistics info, just like hudi metadata

Integrations with query engines

Generally speaking, we no need to do any extra jobs for query engine, since most query engines automatic use min-max value which stored in parquet/orc.  

But, if we do can use statistics info in sql preparation phase, the effect of z-order will be better.

How to apply those statistic info to SparkSQL

  • Load index table to indexDataFrame
  • Construct data filter for indexDataFrame by using origin query filter
  • Query indexDataFrame to select candidate files
  • Use those candidate files to reconstruct hudiMemoryIndex.

...


/** convert filter */

def createZindexFilter(condition: Expression): Expression = {

val minValue = (colName: Seq[String]) =>
col(UnresolvedAttribute(colName) + "_minValue").expr
val maxValue = (colName: Seq[String]) =>
col(UnresolvedAttribute(colName) + "_maxValue").expr
val num_nulls = (colName: Seq[String]) =>
col(UnresolvedAttribute(colName) + "_num_nulls").expr

condition match {
case EqualTo(attribute: AttributeReference, value: Literal) =>
val colName = HudiMergeIntoUtils.getTargetColNameParts(attribute)
And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))
case EqualTo(value: Literal, attribute: AttributeReference) =>
val colName = HudiMergeIntoUtils.getTargetColNameParts(attribute)
And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))

case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @ Literal(null, _)) =>
val colName = HudiMergeIntoUtils.getTargetColNameParts(equalNullSafe.left)
EqualTo(num_nulls(colName), equalNullSafe.right)
............

Rollout/Adoption Plan


  • <What impact (if any) will there be on existing users?>
  • <If we are changing behavior how will we phase out the older behavior?>
  • <If we need special migration tools, describe them here.>
  • <When will we remove the existing behavior?>

Test Plan

<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>

...