Backgoud
SQL is a popular language for big data development. Building SQL extensions for Hudi will greatly reduce the cost of use.This paper will discuss the sql extension on hudi for spark engine.
Extended SQL Syntax
Here is the sql syntax we need to extend for hudi.
DDL
As hudi has primary keys, we add the primary key definition in the create table statement which does not support int current spark sql.
syntax
CREATE TABLE [ IF NOT EXISTS] [database_name.] table_name [ ( columnTypeList [, PRIMAY KEY(column_list)] ) ] [ COMMENT table_comment ] USING hudi [ LOCATION location_path ] [ OPTIONS ( options_list ) ] [ AS query_statement ]
example
create table h0 ( id bigint, name string, price double, primary key(id) ) using hudi create table h1 using hudi as select * from src
DML
MERGE INTO
Merge the data from source table to the target.
syntax
MERGE INTO [database_name.] table_name [AS target_alias] USING (query_statement | table_name) [ AS source_alias ] ON merge_condition WHEN MATCHED [ AND condition ] THEN matched_action [ WHEN MATCHED [ AND condition ] THEN matched_action ] [ WHEN NOT MATCHED [ AND condition ] THEN not_matched_action ] merge_condition = bool_expression matched_action = DELETE | UPDATE SET * | UPDATE SET column1 = expression1 [, column2 = expression2...] not_matched_action = INSERT * | INSERT (column1 [, column2 ...]) VALUES (expression1 [, expression2 ...])
example
merge into h0 as target using ( select 1 as id, 'a1' as name, 10.0 as price ) source on target.id = source.id when matched then update set * when not matched then insert *
UPDATE
Update the data in hudi according to the condition.
syntax
UPDATE [database_name.] table_name SET column1 = expression1 [,column2 = expression2...] [ WHERE bool_expression ]
example
update h0 set price = price + 10 where id > 100
DELETE
Delete the data in hudi according to the condition.
syntax
DELETE FROM [database_name.] table_name [ WHERE bool_expression ]
example
delete from h0 where id > 100
CONVERT
Convert other file formats to hudi.
syntax
CONVERT TO ([database_name.] target_table_name | ‘target_file_path’) FROM ( [database_name.] source_table_name | ‘source_file_path’ )
example
convert to h0 from tbl_parquet convert to h0 from ‘/tmp/parquet/p0’ convert to ‘/tmp/hudi/h0’ from tbl_parquet convert to ‘/tmp/hudi/h0’ from ‘/tmp/parquet/p0’
CLI COMMAND
Hoodie has many cli commands, we can bring it to the sql.
syntax
CLI_COMMAND [ (param_key1 = value1, param_key2 = value2...) ]
The CLI_COMMAND is the same with the hudi cli command and the param key should also keep the same.
example
commits show commit showfiles (commit = ‘20210114221306’, limit = 10) show rollbackssavepoint create (commit = ‘20210114221306’)
Implement on spark
A DDL/DML/CLI statement goes through the following stages in spark sql:
parse
In the sql parse stage, we will inject a HoodieSqlParser to spark sql which will parse our extended DDL/DML/CLI syntax to LogicalPlan. If the HoodieSqlParser failed to parse the sql
statement, spark will rout it to spark’s sql parser. So we just need to implement our extended syntax in the HoodieSqlParser.
resolution
In the resolution stage, some hoodie resolution rules will be injected to spark sql to resolve our extended LogicalPlan to the resolve plan which is a command plan for DDL/DML/CLI.
rewrite
We may need to rewrite some of the built-in spark commands for hudi. For example, we need to rewrite the spark’s CreateDataSourceTableCommand to a hudi’s realization like CreateHoodieTableCommand which will do some initialization for the .hoodie directory and sync the metadata to the metastore.
So we will inject some rewrite rules to spark sql after the resolution.
command#run
The Command#run method will translate the logical plan to hoodie’s api call. For example the InsertIntoHudiTableCommand will translate to hoodie’s dataframe api for insert data to hoodie.
Process for meta fields
Hoodie will append five meta fields to the head of the table schema. These are table property fields. Users may not use them in the insert statement, However spark sql will invalidate
the target table fields size which contains the meta fields with the select statement output field size which will result to size not match exception. eg. insert into h0 select 1, 'a1', 10 from s
will failed in the validate stage, Because the select statement does not have the meta fields while the h0 contains.
In order to solve this problem, we add a rewrite rule for the insert logical relation which will append the five meta fields to the head of the select projects.
MergeIntoCommand implement
The implementation for MergeIntoCommand is a litter complex than other commands. Look at the following statement.
merge into h0 as target using ( select * from s0 ) as source on source.id = target.id when matched and delete_flag % 10 != 0 then update set id = source.id, name = source.name, price = source.price + target.source, when matched and delete_flag % 10 = 0 then delete when not matched and delete_flag % 10 != 0 then insert (id,name,price) values(id, name, price + 1)
There is a constraint for the Merge ON condition, It must contain the rowKey equal expression. So we can use the hoodie index to speed the update & delete.
There are three write operations in the MergeIntoCommand: UPDATE, DELTE and INSERT. We combine the three operators together with one hudi upsert write operator. We implement a ExpressionPayload which will execute the update & insert & delete expression and compute the result record to write. Here is main code for this:
class ExpressionPayload extend BaseAvroPayload { // do update override def combineAndGetUpdateValue(targetRecord: IndexedRecord, schema: Schema, properties: Properties): Option[IndexedRecord] = { val sourceRecord = bytesToAvro(recordBytes, schema) // the incoming record // first test if the sourceRecord is match the update condition (e.g. delete_flag % 10 != 0 in the case) if (matchCondition(properties, HoodiePayloadProps.PAYLOAD_UPDATE_CONDITION_EXPRESSION, sourceRecord) { // get the update expression(e.g. [source.id, source.name, source.price + target.source]) // from the properties and convert it to spark Expression. val updateExpression = toExpression(properties.get(HoodiePayloadProps.PAYLOAD_UPDATE_EXPRESSION)) // doCodeGen for expression val expressionEvaluator = doCodeGen(updateExpression) // join the targetRecord with the sourceRecord, because the field referred // by expression come from both of them. val joinRecord = join(targetRecord, sourceRecord) // execute the expression to compute the resultRecord val resultRecord = expressionEvaluator.eval(joinRecord) return Option.of(resultRecord) } // secondly test if match the delete condition if (matchCondition(properties, HoodiePayloadProps.PAYLOAD_DELETE_CONDITION, sourceRecord) { return Option.empty // An empty record means delete in HoodieMergeHandle } // if no condition matched,return a IGNORED_RECORD which will be ignored by the HoodieMergeHandle. return IGNORED_RECORD. } // do insert override def getInsertValue(schema: Schema, properties: Properties): Option[IndexedRecord] = { val sourceRecord = bytesToAvro(recordBytes, schema) // first test if the sourceRecord is match the insert condition (e.g. delete_flag % 10 != 0 in the case) if (matchCondition(properties, HoodiePayloadProps.PAYLOAD_INSERT_CONDITION_EXPRESSION, sourceRecord) { // get the insert expression(e.g. [id, name, price + 1]) from the properties and convert it to spark Expression. val insertExpression = toExpression(properties.get(HoodiePayloadProps.PAYLOAD_INSERT_EXPRESSION)) // doCodeGen for expression val expressionEvaluator = doCodeGen(insertExpression) // execute the expression to compute the resultRecord val resultRecord = expressionEvaluator.eval(sourceRecord) return Option.of(resultRecord) } return Option.empty } // test if the sourceRecord match the insert or update condition. def matchCondition(properties: Properties, conditionExpressionKey, sourceRecord: IndexedRecord): Boolean = { val conditionExpression = toExpression(properties.get(conditionExpressionKey)) val conditionExpressionEvaluator = doCodeGen(conditionExpression) return conditionExpressionEvaluator.eval(sourceRecord) } }
HoodieSparkSessionExtension
The HoodieSparkSessionExtension is the main entrance for the extension which will inject our sql parser、resolution rules and rewrite rules to the spark sql. It just like the follow code:
class HoodieSparkSessionExtension extends (SparkSessionExtensions => Unit) { // Inject the hoodie sql parser override def apply(extensions: SparkSessionExtensions): Unit = { extensions.injectParser { (session, parser) => new HoodieSqlParser(session, parser) } // Inject the hoodie resolution rule HoodieAnalyzer.customResolutionRules().foreach { rule => extensions.injectResolutionRule { session => rule(session) } } // Inject the post hoc rule to rewrite the resolved plan // (e.g. rewrite the CreateDataSourceTableCommand). HoodieAnalyzer.customPostHocResolutionRules().foreach { rule => extensions.injectPostHocResolutionRule { session => rule(session) } } } }
spark can use the extension when creating sparksession.
val spark = SparkSession
.builder()
.master(...)
.withExtensions(new HoodieSparkSessionExtension)
.config(....)
implementation plan
1、Implement a sql parser by Antlr4 to extend most the sql syntax which include the DDL and DML. There would be a JIRA for this.
2、Implement the resolution rule and rewrite rule for each DDL and DML logical plan.And translate the logical plan to hoodie’s api call. Maybe there is a JIRA for each DDL and DML statement.
After this stage is finished, we can use sql to create tables and insert/update data to hoodie.
3、In the last, we extend syntax for the rest of the Hoodie CLI Commands.