Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In this blog, we will build an end-end solution for capturing changes from a MySQL instance running on AWS RDS to a Hudi table on S3, using capabilities in the Hudi 0.5.1 release


We can break up the problem into two pieces. 

  1. Extracting change logs from MySQL : Surprisingly, this is  still a pretty tricky problem to solve and often Hudi users get stuck here. Thankfully, at-least for AWS users, there is a Database Migration service (DMS for short), that does this change capture and uploads them as parquet files on S3
  2. Applying these change logs to your data lake table :  Once there are change logs in some form, the next step is to apply them incrementally to your table. This mundane task can be fully automated using the Hudi DeltaStreamer tool. 

...

In the table, order_id is the primary key which will be enforced on the Hudi table as well. Since a batch of change records can contain changes to the same primary key, we also include updated_at and created_at fields, which are kept upto date as writes happen to the table. 

Extracting Change logs from MySQL

Before we can configure DMS, we first need to prepare the MySQL instance for change capture, by ensuring backups are enabled and binlog is turned on. 

...

Now, proceed to create endpoints in DMS that capture MySQL data and store in S3, as parquet files

  • Source hudi-source-db endpoint, points to the DB server and provides basic authentication details 
  • Target parquet-s3  endpoint, points to the bucket and folder on s3 to store the change logs records as parquet files

...

Code Block
scala> spark.read.parquet("s3://hudi-dms-demo/orders/hudi_dms/orders/*").sort("updated_at").show

+--------+---------+-------------+-------------------+-------------------+
|order_id|order_qty|customer_name|         updated_at|         created_at|
+--------+---------+-------------+-------------------+-------------------+
|       2|       10|        peter|2020-01-20 20:12:22|2020-01-20 20:12:22|
|       1|       10|       victor|2020-01-20 20:12:31|2020-01-20 20:12:31|
+--------+---------+-------------+-------------------+-------------------+

Applying Change Logs using Hudi DeltaStreamer

Now, we are ready to start consuming the change logs. Hudi DeltaStreamer runs as  Spark job on your favorite workflow scheduler (it also supports a continuous mode using --continuous  flag, where it runs as a long running Spark job), that tails a given path on S3 (or any DFS implementation) for new files and can issue an upsert  to a target hudi dataset. The tool automatically checkpoints itself and thus to repeatedly ingest, all one needs to do is to keep executing the DeltaStreamer periodically. 

...

So, querying the Hudi table now would yield 3 rows and the hoodie_commit_time accurately reflects when these writes happened. You can notice that order_qty for order_id=2, is not updated from 10 to 20! 

Code Block
+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+---------+-------------+-------------------+-------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| Op|order_id|order_qty|customer_name|         updated_at|         created_at|
+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+---------+-------------+-------------------+-------------------+
|     20200120211526|  20200120211526_0_1|                 2|                 peter|af9a2525-a486-40e...|  U|       2|       20|        peter|2020-01-20 21:11:47|2020-01-20 20:12:22|
|     20200120211526|  20200120211526_1_1|                 3|                 sandy|566eb34a-e2c5-44b...|  I|       3|       30|        sandy|2020-01-20 21:11:24|2020-01-20 21:11:24|
|     20200120205028|  20200120205028_1_1|                 1|                victor|8e431ece-d51c-4c7...|   |       1|       10|       victor|2020-01-20 20:12:31|2020-01-20 20:12:31|
+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+---------+-------------+-------------------+-------------------+

...