You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Backgoud

SQL is a popular language for big data development. Building SQL extensions for Hudi will greatly reduce  the cost of use.This paper will discuss the sql extension on hudi for spark engine.

Extended SQL Syntax

Here is the sql syntax we need to extend for hudi.

DDL

As hudi has primary keys, we add the primary key definition in the create table statement which does not support int current spark sql.

syntax


CREATE TABLE [ IF NOT EXISTS]  [database_name.] table_name
[ ( columnTypeList
    [, PRIMAY KEY(column_list)]
  )
]
[ COMMENT table_comment ]
USING hudi
[ LOCATION location_path ]
[ OPTIONS (
  options_list
) ]
[ AS query_statement ]


example

create table h0 (
  id bigint,
  name string,
  price double,
  primary key(id)
) using hudi

create table h1 using hudi as select *  from src

DML

MERGE INTO

Merge the data from source table to the target.

syntax


MERGE INTO [database_name.] table_name [AS target_alias]
USING (query_statement | table_name) [ AS source_alias ]
ON merge_condition
WHEN MATCHED [ AND condition ] THEN matched_action 
[ WHEN MATCHED [ AND condition ] THEN matched_action ]
[ WHEN NOT MATCHED [ AND condition ]  THEN not_matched_action ]

merge_condition = bool_expression

matched_action  =
  DELETE  |
  UPDATE SET *  |
  UPDATE SET column1 = expression1 [, column2 = expression2...]

not_matched_action  =
  INSERT *  |
  INSERT (column1 [, column2 ...]) VALUES (expression1 [, expression2 ...])

example


merge into h0 as target
using (
  select 1 as id, 'a1' as name, 10.0 as price
) source
on target.id = source.id
when matched then update set *
when not matched then insert *


UPDATE

Update the data in hudi according to the condition.

syntax


UPDATE [database_name.] table_name SET column1 = expression1 [,column2 = expression2...] [ WHERE bool_expression ]

example


update h0 set price = price + 10 where id > 100


DELETE

Delete the data in hudi according to the condition.

syntax


DELETE FROM [database_name.] table_name  [ WHERE bool_expression ]

example


delete from h0 where id > 100


CONVERT

Convert other  file formats to hudi.

syntax


CONVERT TO ([database_name.] target_table_name | ‘target_file_path’)
FROM ( [database_name.] source_table_name | ‘source_file_path’ ) 


example


convert to h0 from  tbl_parquet 
convert to h0 from  ‘/tmp/parquet/p0’
convert to ‘/tmp/hudi/h0’ from tbl_parquet
convert to ‘/tmp/hudi/h0’ from ‘/tmp/parquet/p0’


CLI COMMAND

Hoodie has many cli commands, we can bring it to the sql. 


syntax


CLI_COMMAND [ (param_key1 = value1, param_key2 = value2...) ]

The CLI_COMMAND is the same with the hudi cli command and the param key should also keep the same.

example



commits show
commit showfiles (commit = ‘20210114221306’, limit = 10)
show rollbackssavepoint create (commit = ‘20210114221306’)

Implement on spark


A DDL/DML/CLI  statement goes through the following stages in spark sql:

parse

In the sql parse stage, we will inject a HoodieSqlParser to spark sql which will parse our extended DDL/DML/CLI syntax to LogicalPlan. If the HoodieSqlParser failed to parse the sql

statement, spark will rout it to spark’s sql parser. So we just need to implement our extended syntax in the HoodieSqlParser.

resolution

In the resolution stage, some hoodie resolution rules will be injected to spark sql to resolve our extended LogicalPlan to the resolve plan which is a command plan for DDL/DML/CLI.

rewrite

We may need to rewrite some of the built-in spark commands for hudi. For example, we need to rewrite the spark’s CreateDataSourceTableCommand to a hudi’s realization like CreateHoodieTableCommand which will do some initialization for the .hoodie directory and sync the metadata to the metastore.

So we will inject some rewrite rules to spark sql after the resolution.

command#run

The Command#run method will translate the logical plan to hoodie’s api call. For example the InsertIntoHudiTableCommand will translate to hoodie’s dataframe api for  insert data to hoodie.

HoodieSparkSessionExtension

The HoodieSparkSessionExtension is the main entrance for the extension which will inject our sql parser、resolution rules and rewrite rules to the spark sql. It just like the follow code:

class HoodieSparkSessionExtension extends (SparkSessionExtensions => Unit) {
  
  // Inject the hoodie sql parser
  override def apply(extensions: SparkSessionExtensions): Unit = {
    extensions.injectParser { (session, parser) =>
      new HoodieSqlParser(session, parser)
    }
    // Inject the hoodie resolution rule
    HoodieAnalyzer.customResolutionRules().foreach { rule =>
      extensions.injectResolutionRule { session =>
        rule(session)
      }
    }
    // Inject the post hoc rule to rewrite the resolved plan
    // (e.g. rewrite the CreateDataSourceTableCommand).
    HoodieAnalyzer.customPostHocResolutionRules().foreach { rule =>
      extensions.injectPostHocResolutionRule { session =>
        rule(session)
      }
    }
  }
}


spark can use the extension when creating sparksession.

val spark = SparkSession

      .builder()

      .master(...)

      .withExtensions(new HoodieSparkSessionExtension)

      .config(....)

implementation plan

1、Implement a sql parser by Antlr4 to extend most the sql syntax which include the DDL and DML. There would be a JIRA for this.

2、Implement the resolution rule and rewrite rule for  each DDL and DML logical plan.And translate the logical plan to hoodie’s api call. Maybe there is a JIRA for each DDL and DML statement. 

After this stage is finished, we can use sql to create tables and  insert/update data to hoodie.

3、In the last, we extend syntax for the rest of the Hoodie CLI Commands.


  • No labels