...
Background
SQL is a popular language for big data development. Building SQL extensions for Hudi will greatly reduce the cost of use.This paper will discuss the sql extension on hudi for spark engine.
...
There are three write operations in the MergeIntoCommand: UPDATE, DELTE and INSERT. We combine the UPDATE and INSERT operator three operators together with one hudi upsert write operator. Because hudi will do the insert if key not match. And another write operator is need for DELETE. In the upsert write operator, We implement a ExpressionPayload which will execute the update & insert & delete expression and compute the result record to write. Here is main code for this:
Code Block | ||
---|---|---|
| ||
class ExpressionPayload extend BaseAvroPayload { // do update override def combineAndGetUpdateValue(targetRecord: IndexedRecord, schema: Schema, properties: Properties): Option[IndexedRecord] = { val sourceRecord = bytesToAvro(recordBytes, schema) // the incoming record // first test if the sourceRecord is match the update condition (e.g. delete_flag % 10 != 0 in the case) if (matchCondition(properties, HoodiePayloadProps.PAYLOAD_UPDATE_CONDITION_EXPRESSION, sourceRecord) { // get the update expression(e.g. [source.id, source.name, source.price + target.source]) // from the properties and convert it to spark Expression. val updateExpression = toExpression(properties.get(HoodiePayloadProps.PAYLOAD_UPDATE_EXPRESSION)) // doCodeGen for expression val expressionEvaluator = doCodeGen(updateExpression) // join the targetRecord with the sourceRecord, because the field referred // by expression come from both of them. val joinRecord = join(targetRecord, sourceRecord) // execute the expression to compute the resultRecord val resultRecord = expressionEvaluator.eval(joinRecord) return Option.of(resultRecord) } // secondly test if match the delete return Option.emptycondition if (matchCondition(properties, HoodiePayloadProps.PAYLOAD_DELETE_CONDITION, sourceRecord) { return Option.empty // An empty record means delete in HoodieMergeHandle } // if no condition matched,return a IGNORED_RECORD which will be ignored by the HoodieMergeHandle. return IGNORED_RECORD. } // do insert override def getInsertValue(schema: Schema, properties: Properties): Option[IndexedRecord] = { val sourceRecord = bytesToAvro(recordBytes, schema) // first test if the sourceRecord is match the insert condition (e.g. delete_flag % 10 != 0 in the case) if (matchCondition(properties, HoodiePayloadProps.PAYLOAD_INSERT_CONDITION_EXPRESSION, sourceRecord) { // get the insert expression(e.g. [id, name, price + 1]) from the properties and convert it to spark Expression. val insertExpression = toExpression(properties.get(HoodiePayloadProps.PAYLOAD_INSERT_EXPRESSION)) // doCodeGen for expression val expressionEvaluator = doCodeGen(insertExpression) // execute the expression to compute the resultRecord val resultRecord = expressionEvaluator.eval(sourceRecord) return Option.of(resultRecord) } return Option.empty } // test if the sourceRecord match the insert or update condition. def matchCondition(properties: Properties, conditionExpressionKey, sourceRecord: IndexedRecord): Boolean = { val conditionExpression = toExpression(properties.get(conditionExpressionKey)) val conditionExpressionEvaluator = doCodeGen(conditionExpression) return conditionExpressionEvaluator.eval(sourceRecord) } } |
...