Motivation


In a data warehouse, ingesting MongoDB data is often a complex problem. Although Hive provides a MongoStorageHandler to map MongoDB data into a Hive table, it is practically unusable due to certain characteristics of MongoDB:
Firstly, MongoDB is a document-oriented database that uses BSON format to store data. Its data model is highly flexible, resembling JSON data structures, allowing nesting and inclusion of different data types, making it more adaptable to application requirements.
Secondly, unlike traditional relational databases, MongoDB does not have a fixed data schema. This means that different documents can have different fields and structures. This flexibility allows for easy adjustments and expansion of the data model during the development process, without the need for complex migration procedures.
However, it is precisely these advantages that pose significant challenges during data synchronization. In practice, due to data synchronization limitations, using MongoDB requires sacrificing some flexibility, necessitating that each document be structured. Additionally, upstream business entities may add fields due to business requirements, but downstream systems cannot be aware of these changes, leading to data synchronization issues. Although there are some industry solutions, such as MongoDB CDC (Change Data Capture), synchronizing unstructured JSON data does not work effectively.
Typically, we treat the data in ChangeStream format obtained from MongoDB CDC as a whole and write it into a field in Hive tables. We utilize Hive's Merge functionality to perform updates based on primary keys (_id) and operation timestamps. For business usage, we parse the synchronized JSON data from MongoDB into a JSON table using Hive-JSON-Serde and perform relevant analysis based on that table.
However, while this approach partially addresses some data synchronization issues, it still suffers from high storage costs, long synchronization links, and inconvenience in data usage. Therefore, to resolve the problem of ingesting MongoDB data into the data lake or data warehouse, I suggest developing a SyncAction based on MongoDB, which can assist users in completing the process of ingesting MongoDB data into the data lake or data warehouse.

Use Case

Synchronizing Tables

By using the MongoDBSyncTableAction in a Flink DataStream job or directly running "flink run," users can synchronize one or multiple MongoDB tables into a Paimon table.
To utilize this functionality through "flink run," please execute the following shell command:

代码
<FLINK_HOME>/bin/flink run \ 
/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \ 
mongodb-sync-table 
--warehouse <warehouse-path> \ 
--database <database-name> \ 
--table <table-name> \ 
--partition-keys <partition-keys> \ 
--primary-keys <primary-keys> \ 
--mongodb-conf hosts=localhost:27017 \ 
--mongodb-conf database=test \ 
--mongodb-conf collection=temp \ 
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \ 
[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]

Users can perform table queries using Hive's Beeline and can observe that the Hive table has three records in its initial state.

0: jdbc:hive2://localhost:10000/default> select * from temp;
INFO  : Compiling command(queryId=hive_20230807174040_819914c0-75ea-45ab-9b4a-673e10621cd7): select * from temp3
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:temp3._id, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20230807174040_819914c0-75ea-45ab-9b4a-673e10621cd7); Time taken: 0.321 seconds
INFO  : Executing command(queryId=hive_20230807174040_819914c0-75ea-45ab-9b4a-673e10621cd7): select * from temp3
+---------------------------+
|         temp._id         |
+---------------------------+
| 643e9943fa4dfe102cfcfa61  |
| 643e9943fa4dfe102cfcfa62  |
| 64ce0b9ff91f6689419f8004  |
+---------------------------+

Perform an operation to add a field to a MongoDB document.

// Add a new column.
db.getCollection('temp').updateOne(
   { _id: ObjectId("64c8966e26ca9abecf6e3b19") },
   { $set: { email: "example@example.com" } }
)

Upon querying the Hive table again, it is observed that the Schema Change Evolution has taken effect, and Hive has created a corresponding column as well.

0: jdbc:hive2://localhost:10000/default> select * from temp;
INFO  : Compiling command(queryId=hive_20230807174217_99074e53-39c0-4811-bc79-c0b0107f5330): select * from temp3
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:temp3._id, type:string, comment:null), FieldSchema(name:temp3.email, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20230807174217_99074e53-39c0-4811-bc79-c0b0107f5330); Time taken: 0.179 seconds
INFO  : Executing command(queryId=hive_20230807174217_99074e53-39c0-4811-bc79-c0b0107f5330): select * from temp3
+---------------------------+----------------------+
|         temp._id         |     temp.email      |
+---------------------------+----------------------+
| 643e9943fa4dfe102cfcfa61  | example@example.com  |
| 643e9943fa4dfe102cfcfa62  | NULL                 |
| 64ce0b9ff91f6689419f8004  | NULL                 |
+---------------------------+----------------------+
3 rows selected (0.453 seconds)
INFO  : Completed executing command(queryId=hive_20230807174217_99074e53-39c0-4811-bc79-c0b0107f5330); Time taken: 0.009 seconds
INFO  : OK

We perform another validation by adding a name column once again.

// Add a new column.
db.getCollection('temp').updateOne(
   { _id: ObjectId("643e9943fa4dfe102cfcfa61") },
   { $set: { name: "zhangsan" } } 
)

Querying again.

0: jdbc:hive2://localhost:10000/default> select * from temp;
INFO  : Compiling command(queryId=hive_20230807200441_c45eef00-b495-4004-b708-e69490f50be0): select * from temp3
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:temp3._id, type:string, comment:null), FieldSchema(name:temp3.email, type:string, comment:null), FieldSchema(name:temp3.name, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20230807200441_c45eef00-b495-4004-b708-e69490f50be0); Time taken: 0.216 seconds
INFO  : Executing command(queryId=hive_20230807200441_c45eef00-b495-4004-b708-e69490f50be0): select * from temp3
+---------------------------+----------------------+-------------+
|         temp._id         |     temp.email      | temp.name  |
+---------------------------+----------------------+-------------+
| 643e9943fa4dfe102cfcfa61  | example@example.com  | zhangsan    |
| 643e9943fa4dfe102cfcfa62  | NULL                 | NULL        |
| 64ce0b9ff91f6689419f8004  | NULL                 | NULL        |
+---------------------------+----------------------+-------------+
3 rows selected (0.544 seconds)
INFO  : Completed executing command(queryId=hive_20230807200441_c45eef00-b495-4004-b708-e69490f50be0); Time taken: 0.013 seconds
INFO  : OK

Public Interfaces

The existing public interfaces remain unchanged.

Proposed Change

The basic logic is that during the full load phase, when creating a table, the system will use MongoDB's Client to read the first record from the specified collection. This record will be used to parse its schema and obtain the field names and attributes for table creation.
In the full load and incremental processing phase, the RichCdcMultiplexRecordEventParser#parseSchemaChange() method is reused.
For methods that directly produce data based on RichEventParser, a LinkedHashMap is maintained in memory. The key of the map represents the field name, and the value represents the field type. Each data record will undergo a "map.get" operation to check if the field exists in the map and if its type matches the current type. If the types do not match, the field and its new type will be added to the map, and the record of this change will be sent to the output stream for further processing through the side output stream for updateSchema operations.
This system supports two ways to synchronize MongoDB data into Paimon:
Scenario 1: When you explicitly know which fields to use. In this case, the system supports the retrieval of data from a single table and nested datasets.
Scenario 2: When the business requirements change. In this case, the system supports the synchronization of data from a single table and the entire database.

Additional information: MongoDB CDC is implemented based on MongoDB's Change Streams, and the format of the real-time stream data obtained is as follows:

{
   _id : { <BSON Object> },        // Identifier for the opened change stream, can be assigned as a value to the resumeAfter parameter to resume this change stream later.
   "operationType" : "<operation>",        // The type of change operation that occurred, e.g., insert, delete, update, etc.
   "fullDocument" : { <document> },      // The complete document data involved in the change operation; not present in delete operations.
   "ns" : {   
      "db" : "<database>",         // The database where the change operation occurred.
      "coll" : "<collection>"     // The collection where the change operation occurred.
   },
   "to" : {   // Displayed only for rename operations.
      "db" : "<database>",         // The new database name after the change.
      "coll" : "<collection>"     // The new collection name after the change.
   },
   "documentKey" : { "_id" : <value> },  // The value of the _id field of the document involved in the change operation.
   "updateDescription" : {    // Description of the modification operation.
      "updatedFields" : { <document> },  // The fields that were modified and their new values.
      "removedFields" : [ "<field>", ... ]     // The fields that were removed and their values in the modification operation.
   }
   "clusterTime" : <Timestamp>,     // The timestamp from the Oplog entry associated with the change operation.
   "txnNumber" : <NumberLong>,    // If the change operation was executed within a multi-document transaction, this field indicates the transaction number.
   "lsid" : {          // Information about the session related to the transaction.
      "id" : <UUID>,  
      "uid" : <BinData> 
   }
}

Note that

  1. MongoDB Change Streams are designed to return simple JSON documents without any data type definitions. This is because MongoDB is a document-oriented database, and one of its core features is the dynamic schema, where documents can contain different fields, and the data types of fields can be flexible. Therefore, the absence of data type definitions in Change Streams is to maintain this flexibility and extensibility.
    For this reason, we have set all field data types for synchronizing MongoDB to Paimon as String to address the issue of not being able to obtain data types.

  2. MongoDB’s change event record doesn’t have updated before message. So, we can only convert it to Flink’s UPSERT changelog stream. An upsert stream requires a unique key, so we must declare _id as primary key. We can’t declare other column as primary key, because delete operation does not contain the key and value besides _id and sharding key.

Compatibility, Deprecation, and Migration Plan

  • There are no changes to the public interface and no impact on existing users.

  • no compatibility issues are expected.

Test Plan

IT Testing: Verification of Logic. For example, validating data synchronization correctness and ensuring the effectiveness of Schema Change Evolution.

  • No labels