You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

 RFC - 33 : Hudi support more comprehensive Schema Evolution

Proposers

  • @<proposer1 JIRA username>
  • @<proposer2 JIRA username>
  • ...

Approvers

  • @<approver1 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
  • @<approver2 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
  • ...

Status

Current state


Current State

UNDER DISCUSSION

(tick)

IN PROGRESS


ABANDONED


COMPLETED


INACTIVE


Discussion thread: here

JIRA: here

Released: <Hudi Version>

Abstract

Hudi currently supports limited schema evolution, which cannot meet the needs of actual scenarios. The purpose of this design is to support Hudi's evolution capability more comprehensively.

Background

With the continuous operation of the business, business adjustment or business system reconstruction will generally be carried out, and these operations may adjust the business system database, in the traditional data lake storage table schema adjustment is not easy.

Hudi schema evolution is in-place table evolution,does not require costly distractions,like rebuild table or rewrite new table.In addition, it can quickly adapt to the adjustment of the business system and write data into the data lake faster

Currently, HUDI only supports implicit schema sequential columns, which is very limited; So this design adds the ability of full Schema Evolution to HUDI.

Features

This design supports full schema evolutionthe following schema evolution features:

Support changes

  • Add : add a new column to the table or to a nested struct
  • Drop: remove an existing column from the table or a nested struct
  • Rename: rename an existing column or field in a nested struct
  • Update: widen the type of a column, struct field, map key, map value, or array element
  • Reorder: change the order of columns or fields in a nested struct

These operations are table schema metadata changes,the data files does not need to be rewrite

Not supports

  • map keys do not support adding or dropping struct fields that would change equality

Note: Partition evolution is not included in this design, Partition evolution will come soon after schema evolution.

Overall design

Based on the existing implementation, add table schema metadata, store business field information in the table metadata, read and write operations based on it.The existing extrametadata in commitfile is not adjusted for compatibility with existing operation interfaces.

Schema metadata format

Table schema metadata examples are as follows:

Schema Metadata Json element definition

  • max-column-id:Maximum column ID value for the current table,When a new columns are added,the value +1
  • type:schema type,default value is "struct"
  • version-id:DDL operation commit time, which is consistent with commitfile
  • fields:cloumn definintion
  • id:Id of column,the id will be store as column in parquet.
  • name:name of column,consistent with service system,Both DDL and DML through this field.
  • type:data type of column
  • required:Is NULL,if not
  • default:This column is missing in new data and is written by default

 

Table shcema metadata's multiple versions are available by "version-id"

 

DDL operations on metadata

  • Add: For Add operations that Add column types sequentially, Add column information append to the end of schema and assign a new ID. The ID value increases by +1. For the Add after/before operation, Add the column append to the corresponding position in the schema and give a new ID. The ID value is incremented by +1.
  • Change : directly Change the name, attribute, and type ID of the field corresponding to the column in the Schema
  • Reorder: adjusts the column position in the schema. Id is the same
  • Delete: Deletes fields corresponding to columns in the Schema

Schema storage policy


Currently, each write operation has a commit file and is saved in the./hoodie directory. Commit files such as commit, Deltacommit, and replacecommit contain AVRO Schema information generated by the write operation. This information is used to represent the current version of table schema.

In this design, the schema information stored in the previous COMMIT file will be changed to support multiple versions. The commit file will save all previous versions of the schema.In addition, due to the current clean mechanism, the historical COMMIT information is cleaned and the historical Schema information is also deleted. This operation causes historical data not to be read correctly and the DDL's historical operations not to be traceable

There may be a large number of historical DDL operations, and each commit file may contain more and more historical versions of schemas. This requires a cleanup mechanism to set the number of schema historical versions retained. If the schema changes occur again, the historical version beyond the policy will not be recorded in the commit file.The cleaning policy is specific to tables, and each table can set a different count of version retained.

Evolution of the data file:

The data file field name is mapped to the Schema Metada field ID.

 

The impact of evolution:

  • Write data:Through the metadata, the data is detected and written to the Datafile.
  • Query Data:Obtain the schema of the corresponding version from the metadata and complete the read operation
  • Table definition:DDL operations directly operate metadata, not data files.

solution scheme

According to the above ideas, the detailed design is divided into the following parts

 

  • The Hudi kernel supports schema evolution
  • Each engine adapter the schema evolution of the kernel
  • Clustering, compaction, Clean, rollback Supports schema evolution

DDL operation process


DDL operations mainly include create table and ALTER Table, but partition changes are not included in this design.

CREATE Table

For the CREATE TABLE operation, directly convert the table schema into text information and save it to the COMMIT file. Only one version of the schema is retained because there is no historical version of the schema.

Create table is an initialization operation, so the schema version ID and commit file name are set to "00000000000000".

Alter table

The DDL execution process is shown in the figure above. After the DDL statements pass through the Spark engine, they are passed to the HUDI kernel. The HUDI kernel classifies THE DDL operations as schema changes. Then, the classified changes are transferred to the Schema Evolvter module, which applies different types of schema changes to specific schema changes, and finally persi

sts the schema to the HDFS/ object storage. The key point of the whole process is the transformation process of Schema Evolvter and Schema Evolvter as follows:

Change column incompatibility handling policy:

The current DDL operation is required to be compatible with the operation that changes the column type (int can be converted to long, and vice versa). If incompatibility occurs, it will fail directly by default. In order to make the type modification operation more flexible, this design also provides another scheme as an alternative, that is, when incompatible situation occurs, directly pull up a new job to rewrite the full table, of course, this cost is very high, so the default scheme is still incompatible and directly failed.

Data write process

The overall writing process design is shown in the figure above:

  • Preparing data for writing
  • Check the schema to which data is being written. If the schema is not compatible with the current table schema, stop writing
  • Execute the original hudI write process (upsert, etc.) to write to the data file.
  • The latest schema information is obtained from the previous commit file, combine with the current schema information, and then written to the current commit file

The steps to focus on are:

For normal write operations, there will be no inconsistency between the written data and the current schema, and there will be no schemashcema change. The schema of the commit file can inherit the last submitted schema; However, Hudi itself supports the ability of writing to implicitly add columns. In this scenario, schema changes will occur in the write operation. In this case, the schema saving process is the same as that after the DDL change operation, that is, the newly generated new version of the schema and the historical version of the schema will be saved.

Data Query process

Introduction to the overall process

The whole reading process is shown in the figure above:

  • Step 1: judge the query method of data from SQL statements
  • Step 2: give the query method to the scan factory module. The scan factory module has two core functions to obtain the table files to be scanned according to different query methods (the logical Hudi kernel has been implemented); Build a query schema for different query methods (refer to the construction process of query schema for detailed design)

Query method

Query schema constructcountsruct

Snapshot query

Get the latest schemashcema from the latest commit file

Incremental query

The versioned schema is obtained from the specified committime by incremental query, and then the query schema is obtained from the commit file

Read Optimized query

Get the schema version ID from the file name of the basefile,,and then the query schema is obtained from the commit file

  • Step 3: scan factory passes all the files involved in this query to the scanner module; Scanner builds a scan task for each file group; Scan task is responsible for reading specific data.
  • Step4: scan task constructs the file schema of the file to be read for the file to be read. Here, the construction of file schema is similar to the process of finding schema in step 2. Use the committime of the file as the version ID to query the corresponding version of schema from the commit file.
  • Step 5: scan task gives the file schema generated in step 4 to the merge schema module. The merge schema will merge the file schema and the query schema generated in step 2, generate the read schema according to the query engine, and push filters will also be built to push down to the data source side to filter data.

Construction of readschema

The generation of read schema is related to the specific query engine, because different query engine file readers use different schemas. Here is an example of spark.

let's introduce the execution process in the figure above.

Firstly, prune the query schema according to the prune columns generated by query engine. Take the corresponding id in the pruned query schema one by one.

  • The first ID is 5. It is found that 5 does not exist in the file schema, but the name corresponding to id = 5 exists in the file schema. This indicates that the ID column is deleted first and then added back; However, these two columns are no longer the same column, so the data of this column in the file cannot be read out and should be assigned null. Therefore, this place should add a suffix to the column name corresponding to the ID to prevent spark from reading it out
  • The second id is 2. There is a column named data in the file schema with id 2. In the query schema, id = 2 corresponds to the column named data1. This indicates that the rename column operation has occurred in the table since their IDs are the same. In this case, the data of this column should be read out theoretically, because the user only renames the column. Why do you choose to transfer data to spark schema instead of data1? The reason is that the column name stored in file (parquet as an example) is data. You can't read data with data1, which is wrong.
  • The third column with id=3. The name of the third column with ID 3 is the same in file schema and query schema, and the order is the same, indicating that there is no change in this column. Just insert it into the schema of spark
  • The fourth ID is 4. This ID does not exist in file schema, and the column name corresponding to this ID does not exist in file schema, so this column is a new column. Just insert the schema directly into spark. When spark reads it, it will be automatically given null

 

Contruction of pushed filters

Expand the pushed filters generated by the query engine. For each sub filter, such as EqualTo、EqualNullSafe、GreaterThan、GreaterThanOrEqual、LessThan、LessThanOrEqual、In、IsNull、IsNotNull、StringStartsWith、StringEndsWith、StringContains(exlude and、or、not), turn it into a new filter according to the process in Figure below。For the complex filters `and`, `or`, `not`; process the sub filters recursively as shown in figure below to generate new sub filters, and then recombine them into and, or, not filters.

DDL concurrent operations

  • Multi DDL Concurrency: distributed locking processing
  • DDL and query Concurrency: reusing Hudi's isolation mechanism
  • DDL and write Concurrency

Optimistic locking mechanism is adopted. DDL and write operations are directly executed. When the final commit is submitted, the lock is added。

  • The DDL operation can be submitted directly after obtaining the lock
  • The write operation cannot be submitted directly after obtaining the lock,need to check whether the schema has changed.If the change is caused by the implicit addition of field capability, the write operation can be committed. If not, the operation is cancelled

The policy of checking whether the schema conflicts needs to be made in the form of a plug-in, which can be customized by users.

 

Various engine support capabilities

Some engines use hive meta store as metadata management, but hive meta does not support multiple versions, so these engines cannot support reading data from historical schema.

Old schema data compatibility

scene1: old hudi table will not do any schema change in the future

This scence is relatively simple, we can fallback to the original read/write logic of hudi.

scene2: do schema change on old hudi table.

schema evolution operation

if we do first schema change on a old hudi table. the first id-schema will be created

1) convert old hudi table's latest avro schema to id-schema as the first id-schema.

2) any schema change will directly applied to this first id-schema and saved with commit file

let's give a exmaple:


now rename operationTime to col1:

read operation:

once we have done schema change on old hudi table.  first id-schema will be created and all the old files are bound to the first id-schema. since old hudi table only support add/modify column type operation and avro/parquet support those change native. so use the first id-schema to represent the old data is completely fine.

now all files in hudi table are bound to the id-schema.  the query operation can refer chapter Data query process.

Let us follow the above example to give an explanation :

now old table exists two files.   one is old file which bound to the the first id-scheam. the other one is new file which writed after we do add column change and this file bound to the latest id-schema.

follow the step on chapter Data query process.

  1. when we read the old file, the first id-schema will be used as file-schema. lastest id-schema will be used as qurey-schema. then we use merge module to merge file-schema and query-schema to produce the final read-schema. once read-schema is produced, we can read the old files correctly.how to merge file-schema and query-schema pls see the chapter Data query process

   2. when we read the new file, the lastest id-schema will be used as file-schema and qurey-schema the remaining process are same as (1)

write operation:

now we aready has id-schema. just see the chapter Data Write process 

Rollout/Adoption Plan

  • <What impact (if any) will there be on existing users?>
  • <If we are changing behavior how will we phase out the older behavior?>
  • <If we need special migration tools, describe them here.>
  • <When will we remove the existing behavior?>

Test Plan

<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>







  • No labels