Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Div
classhome-banner

 RFC - 28 : Support Z-order curve

Table of Contents

Proposers

  • @<proposer1 JIRA username>
  • @<proposer2 JIRA username>
  • ...

Approvers

  • @<approver1 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
  • @<approver2 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
  • ...

Status

Current state


Current State

Status
titleUnder Discussion

(tick)

Status
colourYellow
titleIn Progress


Status
colourRed
titleABANDONED


Status
colourGreen
titleCompleted


Status
colourBlue
titleINactive


...

JIRA: here

Released: <Hudi Version>

Abstract

Z-order is a technique that allows you to map multidimensional data to a single dimension. We can use this feature to improve query performance.


Background

Z-order is not currently supported by open source engines and open source data Lake components. It makes sense to introduce this feature to Hudi


Introducing Z-order

Z-order is a technique that allows you to map multidimensional data to a single dimension.

...

It can be seen that if we sort the data according to the order of z-values and divide it into four files on average, no matter we use X or Y field filtering for point query in the query, we can skip half of the irrelevant files. If the amount of data is larger, the effect will be better. That is to say, the file based on z-order partition storage, It can have better data skipping effect on multiple fields.  Fortunately, Z-order is not limited to 2-dimensional space—it can be abstracted to work in any number of dimensions.

Implementation

At a high level there are 3 components to implement z-order support: z-value generation, statistical info preservation, query engine integration. Next 3 subsections discuss this in detail.


Z-value generation

The core mechanism underlying Z-ordering is the ability to translate a tuple of several field values into a single numbe (call it z-value). Refer to Wik, we simply interleave the bits of each field in the record

...

          we cannot convert it to bits directly, since in java the first bit of a negative number represents a sign。

          

Decimal value

Two’s complement

signed integer

0

0000 0000

1

0000 0001

2

0000 0010

126

0111 1110

127

0111 1111

-128

1000 0000

-127

1000 0001

-126

1000 0010

-2

1111 1110

-1

1111 1111


           From the above table we find that, the bits value of -1/-2 is larger than 1/2 which is wrong.

           Fortunately, we only need to modify our transformation method slightly to solve this problem  just reverse the first bit of above table。

Decimal value

Two’s complement signed integer

With first bit flipped

Resulting unsigned integer decimal value

0

0000 0000

1000 0000

128

1

0000 0001

1000 0001

129

2

0000 0010

1000 0010

130

126

0111 1110

1111 1110

254

127

0111 1111

1111 1111

255

-128

1000 0000

0000 0000

0

-127

1000 0001

0000 0001

1

-126

1000 0010

0000 0010

2

-2

1111 1110

0111 1110

126

-1

1111 1111

0111 1111

127


          we find the bits value -1/-2 is small than 1/2 and the  lexicographically order is same as origin value.    Obviously, this method meets the requirements of zorder

...

                          eg: "https://www.baidu.com" , "https:www.google.com" has the same 8-byte value


/** Generates z-value*/

val newRDD = df.rdd.map { row =>
val values = zFields.map { case (index, field) =>
field.dataType match {
case LongType =>
ZOrderingUtil.longTo8Byte(row.getLong(index))
case DoubleType =>
ZOrderingUtil.doubleTo8Byte(row.getDouble(index))
case IntegerType =>
ZOrderingUtil.intTo8Byte(row.getInt(index))
case FloatType =>
ZOrderingUtil.doubleTo8Byte(row.getFloat(index).toDouble)
case StringType =>
ZOrderingUtil.utf8To8Byte(row.getString(index))
case DateType =>
ZOrderingUtil.longTo8Byte(row.getDate(index).getTime)
case TimestampType =>
ZOrderingUtil.longTo8Byte(row.getTimestamp(index).getTime)
case ByteType =>
ZOrderingUtil.byteTo8Byte(row.getByte(index))
case ShortType =>
ZOrderingUtil.intTo8Byte(row.getShort(index).toInt)
case d: DecimalType =>
ZOrderingUtil.longTo8Byte(row.getDecimal(index).longValue())
case _ =>
null
}
}.filter(v => v != null).toArray
val zValues = ZOrderingUtil.interleaveMulti8Byte(values)
Row.fromSeq(row.toSeq ++ Seq(zValues))
}.sortBy(x => ZorderingBinarySort(x.getAs[Array[Byte]](fieldNum)))



to solve those two problems,Boundary-base Interleaved Index seems good。

...

          Sampling the data. From the sampled data, we filter the specified number of boundaries for each field participating in z-order and sort them.
          Each field is mapped to the index of the data in boundaries, and then participates in the calculation of Z-value.
          Since the index of boundaries must be continuous positive integer data starting from 0, it fully meets the calculation requirements of interleaving index


/** Generates z-value */

val indexRdd = internalRdd.mapPartitionsInternal { iter =>
val bounds = boundBroadCast.value
val origin_Projections = sortingExpressions.map { se =>
UnsafeProjection.create(Seq(se), outputAttributes)
}

iter.map { unsafeRow =>
val interleaveValues = origin_Projections.zip(origin_lazyGeneratedOrderings).zipWithIndex.map { case ((rowProject, lazyOrdering), index) =>
val row = rowProject(unsafeRow)
val decisionBound = new DecisionBound(sampleRdd, lazyOrdering)
if (row.isNullAt(0)) {
bounds(index).length + 1
} else {
decisionBound.getBound(row, bounds(index).asInstanceOf[Array[InternalRow]])
}
}.toArray.map(ZOrderingUtil.toBytes(_))
val zValues = ZOrderingUtil.interleaveMulti4Byte(interleaveValues)
val mutablePair = new MutablePair[InternalRow, Array[Byte]]()

mutablePair.update(unsafeRow, zValues)
}
}.sortBy(x => ZorderingBinarySort(x._2), numPartitions = fileNum).map(_._1)


2.The bits number of each dimension value are handled differently, such as how a short type and an int type cross merge bits. How to deal with those values

...

answer: Using Array[Byte] to stored the z-value (we limit the length of this array to 1024, just like Amazon DynamoDB); notice that now hbase rowkey is sorted by Array[Byte] internal  we can copy those code directly as compartor for z-value.


statistical info save

support collect min-max and null value info is necessary when we finished clustering data by z-order. Use those statistical info, hudi can do file level filtering directly.

...

not all statistics stored in parquet header are collectly. For example parquet stored wrong statistics for timestamp value.


/** collect statistic info*/

val sc = df.sparkSession.sparkContext
val serializableConfiguration = new SerializableConfiguration(conf)
val numParallelism = inputFiles.size/3
val previousJobDescription = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
try {
val description = s"Listing parquet column statistics"
sc.setJobDescription(description)
sc.parallelize(inputFiles, numParallelism).mapPartitions { paths =>
val hadoopConf = serializableConfiguration.value
paths.map(new Path(_)).flatMap { filePath =>
val blocks = ParquetFileReader.readFooter(hadoopConf, filePath).getBlocks().asScala
blocks.flatMap(b => b.getColumns().asScala.
map(col => (col.getPath().toDotString(),
FileStats(col.getStatistics().minAsString(), col.getStatistics().maxAsString(), col.getStatistics.getNumNulls.toInt))))
.groupBy(x => x._1).mapValues(v => v.map(vv => vv._2)).
mapValues(value => FileStats(value.map(_.minVal).min, value.map(_.maxVal).max, value.map(_.num_nulls).max)).toSeq.
map(x => ColumnFileStats(filePath.getName(), x._1, x._2.minVal, x._2.maxVal, x._2.num_nulls))
}.filter(p => cols.contains(p.colName))
}.collect()
} finally {
sc.setJobDescription(previousJobDescription)
}
  • Calcaute those information by sparksql, which can collect all types of column statistics correctly with poor efficiency


/** collect statistic info*/

val inputFiles = df.inputFiles
val conf = df.sparkSession.sparkContext.hadoopConfiguration
val values = cols.flatMap(c => Seq( min(col(c)).as(c + "_minValue"), max(col(c)).as(c + "_maxValue"), count(c).as(c + "_noNullCount")))
val valueCounts = count("*").as("totalNum")
val projectValues = Seq(col("file")) ++ cols.flatMap(c =>
Seq(col(c + "_minValue"), col(c + "_maxValue"), expr(s"totalNum - ${c + "_noNullCount"}").as(c + "_num_nulls")))


val result = df.select(input_file_name() as "file", col("*"))
.groupBy($"file")
.agg(valueCounts, values: _*).select(projectValues:_*)
result

Example generated: (zorder columns: col, col1)

file

col_minValue

col_maxValue

col_num_nulls

col1_minValue

col1_maxValue

col1_num_nulls

xx-1

49152

65535

0

65536

98303

0

xx-2

65536

90111

0

65536

98303

0

  • How to save those statistics info

We can introduce _index_ directory to store those statistics info, what’s more we can use a internal hudi cow table to save those statistics info, just like hudi metadata

Integrations with query engines

Generally speaking, we no need to do any extra jobs for query engine, since most query engines automatic use min-max value which stored in parquet/orc.  

But, if we do can use statistics info in sql preparation phase, the effect of z-order will be better.

How to apply those statistic info to SparkSQL

  • Load index table to indexDataFrame
  • Construct data filter for indexDataFrame by using origin query filter
  • Query indexDataFrame to select candidate files
  • Use those candidate files to reconstruct hudiMemoryIndex.

The only question is how to construct data filter for indexDataFrame:  just use the min-max value and null counts info to construct data filters.


/** convert filter */

def createZindexFilter(condition: Expression): Expression = {

val minValue = (colName: Seq[String]) =>
col(UnresolvedAttribute(colName) + "_minValue").expr
val maxValue = (colName: Seq[String]) =>
col(UnresolvedAttribute(colName) + "_maxValue").expr
val num_nulls = (colName: Seq[String]) =>
col(UnresolvedAttribute(colName) + "_num_nulls").expr

condition match {
case EqualTo(attribute: AttributeReference, value: Literal) =>
val colName = HudiMergeIntoUtils.getTargetColNameParts(attribute)
And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))
case EqualTo(value: Literal, attribute: AttributeReference) =>
val colName = HudiMergeIntoUtils.getTargetColNameParts(attribute)
And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))

case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @ Literal(null, _)) =>
val colName = HudiMergeIntoUtils.getTargetColNameParts(equalNullSafe.left)
EqualTo(num_nulls(colName), equalNullSafe.right)
............

Rollout/Adoption Plan


  • <What impact (if any) will there be on existing users?>
  • <If we are changing behavior how will we phase out the older behavior?>
  • <If we need special migration tools, describe them here.>
  • <When will we remove the existing behavior?>

Test Plan

<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>

...