Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Background

SQL is a popular language for big data development. Building SQL extensions for Hudi will greatly reduce  the cost of use.This paper will discuss the sql extension on hudi for spark engine.

...

In order to solve this problem, we add a rewrite rule for the insert logical relation which will append the five meta fields to the head of the select projects.

MergeIntoCommand implement

The implementation for MergeIntoCommand is a litter complex than other commands. Look at the following statement.

Code Block
languagesql
merge into h0 as target
using (
  select * from s0
) as source
on source.id = target.id
when matched and delete_flag % 10 != 0 then update set id = source.id, name = source.name, price = source.price + target.source,
when matched and delete_flag % 10 = 0 then delete
when not matched and delete_flag % 10 != 0 then insert (id,name,price) values(id, name, price + 1)


There is a constraint for the Merge ON condition, It must contain the rowKey equal expression. So we can use the hoodie index to speed the update & delete. 

There are three write operations in the MergeIntoCommand: UPDATE, DELTE and INSERT.  We combine the three operators together with one hudi upsert write operator. We implement a ExpressionPayload which will execute the update & insert & delete expression and compute the result record to write. Here is main code for this:

Code Block
languagescala
class ExpressionPayload extend BaseAvroPayload {
     
    // do update
	override def combineAndGetUpdateValue(targetRecord: IndexedRecord, schema: Schema, properties: Properties): Option[IndexedRecord] = {
    	val sourceRecord = bytesToAvro(recordBytes, schema) // the incoming record
        // first test if the sourceRecord is match the update condition (e.g. delete_flag % 10 != 0 in the case)
		if (matchCondition(properties, HoodiePayloadProps.PAYLOAD_UPDATE_CONDITION_EXPRESSION, sourceRecord) {
			
			// get the update expression(e.g. [source.id, source.name, source.price + target.source]) 
			// from the properties and convert it to spark Expression.
			val updateExpression = toExpression(properties.get(HoodiePayloadProps.PAYLOAD_UPDATE_EXPRESSION))
			
			// doCodeGen for expression
     		val expressionEvaluator = doCodeGen(updateExpression)
        	
			// join the targetRecord with the sourceRecord, because the field referred 
        	// by expression come from both of them. 
        	val joinRecord = join(targetRecord, sourceRecord)
        	
			// execute the expression to compute the resultRecord
        	val resultRecord = expressionEvaluator.eval(joinRecord)
        	return Option.of(resultRecord)
      }
      // secondly test if match the delete condition
	  if (matchCondition(properties, HoodiePayloadProps.PAYLOAD_DELETE_CONDITION, sourceRecord) {
		return Option.empty // An empty record means delete in HoodieMergeHandle
      }
      // if no condition matched,return a IGNORED_RECORD which will be ignored by the HoodieMergeHandle.
      return IGNORED_RECORD.
    }

  // do insert
  override def getInsertValue(schema: Schema, properties: Properties): Option[IndexedRecord] = {
	val sourceRecord = bytesToAvro(recordBytes, schema)
    
	// first test if the sourceRecord is match the insert condition (e.g. delete_flag % 10 != 0 in the case)
    if (matchCondition(properties, HoodiePayloadProps.PAYLOAD_INSERT_CONDITION_EXPRESSION, sourceRecord) {
    	
		// get the insert expression(e.g. [id, name, price + 1]) from the properties and convert it to spark Expression.
		val insertExpression = toExpression(properties.get(HoodiePayloadProps.PAYLOAD_INSERT_EXPRESSION))
    	
		// doCodeGen for expression
		val expressionEvaluator = doCodeGen(insertExpression)
		
		// execute the expression to compute the resultRecord
    	val resultRecord = expressionEvaluator.eval(sourceRecord)
		return Option.of(resultRecord)
	}
	return Option.empty
  }

  // test if the sourceRecord match the insert or update condition.
  def matchCondition(properties: Properties, conditionExpressionKey, sourceRecord: IndexedRecord): Boolean = {
     val conditionExpression = toExpression(properties.get(conditionExpressionKey))
     val conditionExpressionEvaluator = doCodeGen(conditionExpression)
     return conditionExpressionEvaluator.eval(sourceRecord)
  }

}



HoodieSparkSessionExtension

...

Code Block
languagescala
class HoodieSparkSessionExtension extends (SparkSessionExtensions => Unit) {
  
  // Inject the hoodie sql parser
  override def apply(extensions: SparkSessionExtensions): Unit = {
    extensions.injectParser { (session, parser) =>
      new HoodieSqlParser(session, parser)
    }
    // Inject the hoodie resolution rule

    HoodieAnalyzer.customResolutionRules().foreach { rule =>
      extensions.injectResolutionRule { session =>
        rule(session)
      }
    }
    // Inject the post hoc rule to rewrite the resolved plan
    // (e.g. rewrite the CreateDataSourceTableCommand).
    HoodieAnalyzer.customPostHocResolutionRules().foreach { rule =>
      extensions.injectPostHocResolutionRule { session =>
        rule(session)
      }
    }
  }
}

...