Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.



Motivation


In a data warehouse, ingesting MongoDB data is often a complex problem. Although Hive provides a MongoStorageHandler to map MongoDB data into a Hive table, it is practically unusable due to certain characteristics of MongoDB:
Firstly, MongoDB is a document-oriented database that uses BSON format to store data. Its data model is highly flexible, resembling JSON data structures, allowing nesting and inclusion of different data types, making it more adaptable to application requirements.
Secondly, unlike traditional relational databases, MongoDB does not have a fixed data schema. This means that different documents can have different fields and structures. This flexibility allows for easy adjustments and expansion of the data model during the development process, without the need for complex migration procedures.
However, it is precisely these advantages that pose significant challenges during data synchronization. In practice, due to data synchronization limitations, using MongoDB requires sacrificing some flexibility, necessitating that each document be structured. Additionally, upstream business entities may add fields due to business requirements, but downstream systems cannot be aware of these changes, leading to data synchronization issues. Although there are some industry solutions, such as MongoDB CDC (Change Data Capture), synchronizing unstructured JSON data does not work effectively.
Typically, we treat the data in ChangeStream format obtained from MongoDB CDC as a whole and write it into a field in Hive tables. We utilize Hive's Merge functionality to perform updates based on primary keys (_id) and operation timestamps. For business usage, we parse the synchronized JSON data from MongoDB into a JSON table using Hive-JSON-Serde and perform relevant analysis based on that table.
However, while this approach partially addresses some data synchronization issues, it still suffers from high storage costs, long synchronization links, and inconvenience in data usage. Therefore, to resolve the problem of ingesting MongoDB data into the data lake or data warehouse, I suggest developing a SyncAction based on MongoDB, which can assist users in completing the process of ingesting MongoDB data into the data lake or data warehouse.

...

Code Block
languagexml
{
   _id : { <BSON Object> },        // Identifier for the opened change stream, can be assigned as a value to the resumeAfter parameter to resume this change stream later.
   "operationType" : "<operation>",        // The type of change operation that occurred, e.g., insert, delete, update, etc.
   "fullDocument" : { <document> },      // The complete document data involved in the change operation; not present in delete operations.
   "ns" : {   
      "db" : "<database>",         // The database where the change operation occurred.
      "coll" : "<collection>"     // The collection where the change operation occurred.
   },
   "to" : {   // Displayed only for rename operations.
      "db" : "<database>",         // The new database name after the change.
      "coll" : "<collection>"     // The new collection name after the change.
   },
   "documentKey" : { "_id" : <value> },  // The value of the _id field of the document involved in the change operation.
   "updateDescription" : {    // Description of the modification operation.
      "updatedFields" : { <document> },  // The fields that were modified and their new values.
      "removedFields" : [ "<field>", ... ]     // The fields that were removed and their values in the modification operation.
   }
   "clusterTime" : <Timestamp>,     // The timestamp from the Oplog entry associated with the change operation.
   "txnNumber" : <NumberLong>,    // If the change operation was executed within a multi-document transaction, this field indicates the transaction number.
   "lsid" : {          // Information about the session related to the transaction.
      "id" : <UUID>,  
      "uid" : <BinData> 
   }
}

Note that

  1. MongoDB Change Streams are designed to return simple JSON documents without any data type definitions. This is because MongoDB is a document-oriented database, and one of its core features is the dynamic schema, where documents can contain different fields, and the data types of fields can be flexible. Therefore, the absence of data type definitions in Change Streams is to maintain this flexibility and extensibility.
    For this reason, we have set all field data types for synchronizing MongoDB to Paimon as String to address the issue of not being able to obtain data types.

  2. MongoDB’s change event record doesn’t have updated before message. So, we can only convert it to Flink’s UPSERT changelog stream. An upsert stream requires a unique key, so we must declare _id as primary key. We can’t declare other column as primary key, because delete operation does not contain the key and value besides _id and sharding key.

Compatibility, Deprecation, and Migration Plan

...