...
In the table, order_id is the primary key which will be enforced on the Hudi table as well. Since a batch of change records can contain changes to the same primary key, we also include updated_at and created_at fields, which are kept upto date as writes happen to the table.
Extracting Change logs from MySQL
Before we can configure DMS, we first need to prepare the MySQL instance for change capture, by ensuring backups are enabled and binlog is turned on.
...
Code Block |
---|
scala> spark.read.parquet("s3://hudi-dms-demo/orders/hudi_dms/orders/*").sort("updated_at").show +--------+---------+-------------+-------------------+-------------------+ |order_id|order_qty|customer_name| updated_at| created_at| +--------+---------+-------------+-------------------+-------------------+ | 2| 10| peter|2020-01-20 20:12:22|2020-01-20 20:12:22| | 1| 10| victor|2020-01-20 20:12:31|2020-01-20 20:12:31| +--------+---------+-------------+-------------------+-------------------+ |
Applying Change Logs using Hudi DeltaStreamer
Now, we are ready to start consuming the change logs. Hudi DeltaStreamer runs as Spark job on your favorite workflow scheduler (it also supports a continuous mode using --continuous flag, where it runs as a long running Spark job), that tails a given path on S3 (or any DFS implementation) for new files and can issue an upsert to a target hudi dataset. The tool automatically checkpoints itself and thus to repeatedly ingest, all one needs to do is to keep executing the DeltaStreamer periodically.
...