Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Div
classhome-banner

 RFC - 33 : Hudi support more comprehensive Schema Evolution

Table of Contents

Proposers

Approvers

Status

Current state


Current State

Status
titleUnder Discussion

(tick)

Status
colourYellow
titleIn Progress


Status
colourRed
titleABANDONED


Status
colourGreen
titleCompleted


Status
colourBlue
titleINactive


...

JIRA: here

Released: <Hudi Version>

Abstract

Hudi currently supports limited schema evolution, which cannot meet the needs of actual scenarios. The purpose of this design is to support Hudi's evolution capability more comprehensively.

Background

With the continuous operation of the business, business adjustment or business system reconstruction will generally be carried out, and these operations may adjust the business system database, in the traditional data lake storage table schema adjustment is not easy.

...

Currently, HUDI only supports implicit schema sequential columns, which is very limited; So this design adds the ability of full Schema Evolution to HUDI.

Features

This design supports full schema evolutionthe following schema evolution features:

...

Note: Partition evolution is not included in this design, Partition evolution will come soon after schema evolution.

Overall design

Based on the existing implementation, add table schema metadata, store business field information in the table metadata, read and write operations based on it.The existing extrametadata in commitfile is not adjusted for compatibility with existing operation interfaces.

Schema metadata format

Table schema metadata examples are as follows:

...

  • Add: For Add operations that Add column types sequentially, Add column information append to the end of schema and assign a new ID. The ID value increases by +1. For the Add after/before operation, Add the column append to the corresponding position in the schema and give a new ID. The ID value is incremented by +1.
  • Change : directly Change the name, attribute, and type ID of the field corresponding to the column in the Schema
  • Reorder: adjusts the column position in the schema. Id is the same
  • Delete: Deletes fields corresponding to columns in the Schema

Schema storage policy


Currently, each write operation has a commit file and is saved in the./hoodie directory. Commit files such as commit, Deltacommit, and replacecommit contain AVRO Schema information generated by the write operation. This information is used to represent the current version of table schema.

...

There may be a large number of historical DDL operations, and each commit file may contain more and more historical versions of schemas. This requires a cleanup mechanism to set the number of schema historical versions retained. If the schema changes occur again, the historical version beyond the policy will not be recorded in the commit file.The cleaning policy is specific to tables, and each table can set a different count of version retained.

Evolution of the data file:

The data file field name is mapped to the Schema Metada field ID.

...

  • Write data:Through the metadata, the data is detected and written to the Datafile.
  • Query Data:Obtain the schema of the corresponding version from the metadata and complete the read operation
  • Table definition:DDL operations directly operate metadata, not data files.

solution scheme

According to the above ideas, the detailed design is divided into the following parts

...

  • The Hudi kernel supports schema evolution
  • Each engine adapter the schema evolution of the kernel
  • Clustering, compaction, Clean, rollback Supports schema evolution

DDL operation process


DDL operations mainly include create table and ALTER Table, but partition changes are not included in this design.

CREATE Table

For the CREATE TABLE operation, directly convert the table schema into text information and save it to the COMMIT file. Only one version of the schema is retained because there is no historical version of the schema.

Create table is an initialization operation, so the schema version ID and commit file name are set to "00000000000000".

Alter table

The DDL execution process is shown in the figure above. After the DDL statements pass through the Spark engine, they are passed to the HUDI kernel. The HUDI kernel classifies THE DDL operations as schema changes. Then, the classified changes are transferred to the Schema Evolvter module, which applies different types of schema changes to specific schema changes, and finally persi

...

The current DDL operation is required to be compatible with the operation that changes the column type (int can be converted to long, and vice versa). If incompatibility occurs, it will fail directly by default. In order to make the type modification operation more flexible, this design also provides another scheme as an alternative, that is, when incompatible situation occurs, directly pull up a new job to rewrite the full table, of course, this cost is very high, so the default scheme is still incompatible and directly failed.

Data write process

The overall writing process design is shown in the figure above:

...

For normal write operations, there will be no inconsistency between the written data and the current schema, and there will be no schemashcema change. The schema of the commit file can inherit the last submitted schema; However, Hudi itself supports the ability of writing to implicitly add columns. In this scenario, schema changes will occur in the write operation. In this case, the schema saving process is the same as that after the DDL change operation, that is, the newly generated new version of the schema and the historical version of the schema will be saved.

Data Query process

Introduction to the overall process

The whole reading process is shown in the figure above:

...

  • Step 3: scan factory passes all the files involved in this query to the scanner module; Scanner builds a scan task for each file group; Scan task is responsible for reading specific data.
  • Step4: scan task constructs the file schema of the file to be read for the file to be read. Here, the construction of file schema is similar to the process of finding schema in step 2. Use the committime of the file as the version ID to query the corresponding version of schema from the commit file.
  • Step 5: scan task gives the file schema generated in step 4 to the merge schema module. The merge schema will merge the file schema and the query schema generated in step 2, generate the read schema according to the query engine, and push filters will also be built to push down to the data source side to filter data.

Construction of readschema

The generation of read schema is related to the specific query engine, because different query engine file readers use different schemas. Here is an example of spark.

...

  • The first ID is 5. It is found that 5 does not exist in the file schema, but the name corresponding to id = 5 exists in the file schema. This indicates that the ID column is deleted first and then added back; However, these two columns are no longer the same column, so the data of this column in the file cannot be read out and should be assigned null. Therefore, this place should add a suffix to the column name corresponding to the ID to prevent spark from reading it out
  • The second id is 2. There is a column named data in the file schema with id 2. In the query schema, id = 2 corresponds to the column named data1. This indicates that the rename column operation has occurred in the table since their IDs are the same. In this case, the data of this column should be read out theoretically, because the user only renames the column. Why do you choose to transfer data to spark schema instead of data1? The reason is that the column name stored in file (parquet as an example) is data. You can't read data with data1, which is wrong.
  • The third column with id=3. The name of the third column with ID 3 is the same in file schema and query schema, and the order is the same, indicating that there is no change in this column. Just insert it into the schema of spark
  • The fourth ID is 4. This ID does not exist in file schema, and the column name corresponding to this ID does not exist in file schema, so this column is a new column. Just insert the schema directly into spark. When spark reads it, it will be automatically given null

 

Contruction of pushed filters

Expand the pushed filters generated by the query engine. For each sub filter, such as EqualTo、EqualNullSafe、GreaterThan、GreaterThanOrEqual、LessThan、LessThanOrEqual、In、IsNull、IsNotNull、StringStartsWith、StringEndsWith、StringContains(exlude and、or、not), turn it into a new filter according to the process in Figure below。For the complex filters `and`, `or`, `not`; process the sub filters recursively as shown in figure below to generate new sub filters, and then recombine them into and, or, not filters.

DDL concurrent operations

  • Multi DDL Concurrency: distributed locking processing
  • DDL and query Concurrency: reusing Hudi's isolation mechanism
  • DDL and write Concurrency

...

The policy of checking whether the schema conflicts needs to be made in the form of a plug-in, which can be customized by users.

 

Various engine support capabilities

Some engines use hive meta store as metadata management, but hive meta does not support multiple versions, so these engines cannot support reading data from historical schema.

Old schema data compatibility

scene1: old hudi table will not do any schema change in the future

This scence is relatively simple, we can fallback to the original read/write logic of hudi.

scene2: do schema change on old hudi table.

schema evolution operation

if we do first schema change on a old hudi table. the first id-schema will be created

...

now rename operationTime to col1:


read operation:


once we have done schema change on old hudi table.  first id-schema will be created and all the old files are bound to the first id-schema. since old hudi table only support add/modify column type operation and avro/parquet support those change native. so use the first id-schema to represent the old data is completely fine.

...

   2. when we read the new file, the lastest id-schema will be used as file-schema and qurey-schema the remaining process are same as (1)

write operation:

now we already has id-schema. just see the chapter Data Write process 

Rollout/Adoption Plan

  • <What impact (if any) will there be on existing users?>
  • <If we are changing behavior how will we phase out the older behavior?>
  • <If we need special migration tools, describe them here.>
  • <When will we remove the existing behavior?>

Test Plan

<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>

...