Introduction

This implement MongoDBArtifactStore which can replace CouchDB, and it can also work along with ElasticSearch as activations store backend.


Design

Data Scheme

The data scheme in MongoDB is almost same with CouchDB, except 4 differences:

  1.  annotations field in MongoDB is a string instead of array of objects in CouchDB, this is because that it may use arbitrary strcut while MongoDB doesn't support "$" as the first char for field name, so we need to convert this field in to a raw json string before store in to MongoDB, and convert it back to array of object when fetch it
  2. parameters  field is same as annotations.
  3. _rev field will not be generated automatically in MongoDB, so it is calculted and inserted in code explicitly
  4. there is a _computed field, which store some extra fields to help to query,

Below is an example:

CouchDBMongoDB
{
  "_id": "whisk.system/invokerHealthTestAction0",
  "_rev": "68-e72440f911c64ab11441c09e730e5ab8",
  "name": "invokerHealthTestAction0",
  "publish": false,
  "annotations": [],
  "version": "0.0.1",
  "updated": 1524476933182,
  "entityType": "action",
  "exec": {
    "kind": "nodejs:6",
    "code": "function main(params) { return params; }",
    "binary": false
  },
  "parameters": [],
  "limits": {
    "timeout": 60000,
    "memory": 256,
    "logs": 10
  },
  "namespace": "whisk.system"
}
{
  "_id" : "whisk.system/invokerHealthTestAction0",
  "name" : "invokerHealthTestAction0",
"_computed" : {
"rootns" : "whisk.system"
}, "publish" : false, "annotations" : "[ ]", "version" : "0.0.1", "updated" : NumberLong("1524473794826"), "entityType" : "action", "exec" : { "kind" : "nodejs:6", "code" : "function main(params) { return params; }", "binary" : false }, "parameters" : "[ ]", "limits" : { "timeout" : 60000, "memory" : 256, "logs" : 10 }, "namespace" : "whisk.system" }

Attachment

MongoDB use GridFS to store and retrieve files that exceed the BSON-document size limit of 16 MB.

Attachment in MongoDB is stored in a separate collection with a independent _id, this PR use the doc._id + doc.file_name as the attachment's _id field, then we can find the relative attachment easily.


Implementation

There are 5 brand new files except ansible scripts and testing files:

common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreProvider.scala

Just work like `CouchDbStoreProvider`

It creates a singleton mongodb client so that WhiskAuthStore/WhiskEntityStore/WhiskActivationStore will share one client.

common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala

An implementation of `trait ArtifactStore[DocumentAbstraction]`.

There are some private methods need to mention:

  • attach(d: DocumentAbstraction, name: String, contentType: ContentType, docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[AttachResult]:

          this will save action's attachment to MongoDB's gridFSBucket, since the attachment is in a Source format, so we need to use a Sink to process it, this is what MongoDBAsyncStreamSink used for:

    val uploadStream = gridFSBucket.openUploadStream(BsonString(s"$id/$name"), name, option)
    val sink = MongoDBAsyncSt
reamSink(uploadStream)

    val f = docStream
      .runWith(combinedSink(sink))
      .map { r =>
        transid
          .finished(this, start, s"[ATT_PUT] '$collName' completed uploading attachment '$name' of document '$id'")
        AttachResult(r.digest, r.length)
      }
      .recover {
        case t: MongoException =>
          transid.failed(
            this,
            start,
            s"[ATT_PUT] '$collName' failed to upload attachment '$name' of document '$id'; error code '${t.getCode}'",
            ErrorLevel)
          throw new Exception("Unexpected mongodb server error: " + t.getMessage)
      }

  • readAttachmentFromMongo[T](doc: DocInfo, attachmentUri: Uri, sink: Sink[ByteString, Future[T]])(implicit transid: TransactionId): Future[T]:

  Contrary to attach method, it read attchment from MongoDB, since we use a Sink to get the result, we need to construct a Source for MongoDB attachment, so we have a MongoDBAsyncStreamSource:

    val downloadStream = gridFSBucket.openDownloadStream(BsonString(s"${doc.id.id}/$attachmentName"))

    def readStream(file: GridFSFile) = {
      val source = MongoDBAsyncStreamSource(downloadStream)
      source
        .runWith(sink)
        .map { result =>
          transid
            .finished(
              this,
              start,
              s"[ATT_GET] '$collName' completed: found attachment '$attachmentName' of document '$doc'")
          result
        }
    }

    def getGridFSFile = {
      downloadStream
        .gridFSFile()
        .head()
        .transform(
          identity, {
            case ex: MongoGridFSException if ex.getMessage.contains("File not found") =>
              transid.finished(
                this,
                start,
                s"[ATT_GET] '$collName', retrieving attachment '$attachmentName' of document '$doc'; not found.")
              NoDocumentException("Not found on 'readAttachment'.")
            case ex: MongoGridFSException =>
              transid.failed(
                this,
                start,
                s"[ATT_GET] '$collName' failed to get attachment '$attachmentName' of document '$doc'; error code: '${ex.getCode}'",
                ErrorLevel)
              throw new Exception("Unexpected mongodb server error: " + ex.getMessage)
            case t => t
          })
    }

    val f = for {
      file <- getGridFSFile
      result <- readStream(file)
    } yield result
  • revisionCalculate(doc: JsObject): (String, String):

          Calculate revision based on entity content, will return old_revision(empty if not exist) and new_revision

  • getCollectionAndCreateIndexes(): MongoCollection[Document]:

          This method will create indices for collection if indices not exist

  • encodeFields(fields: Seq[String], jsValue: JsObject): JsObject:

          encode JsValue which has complex and arbitrary structure to JsString, used for annotations and params fields

  • decodeFields(fields: Seq[String], jsValue: JsObject): JsObject:

          decode fields from JsString

common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSink.scala

This file defines a Sink to process attachment Source and save to MongoDB GridFSBucket

common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSource.scala

This file defines a Source to get attachment from MongoDB GridFSBucket and processed to a bytestring through a Sink

common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapper.scala

To simulate design doc features of CouchDB/Cloudant, community has defined CosmosDBViewMapper for CosmosDB and MemoryViewMapper for Memory based database(for test&develop only), and obviously this file is for MongoDB.

We know that CouchDB's design doc can used to query against on computed key, like a document

{
  "name: "Jack"
  "nation": "Eng"
}

We can create a design doc

function (doc) {
  emit(doc.name + "/" + doc.nation, 1);
}

Then we can query with a new computed key doc.name + "/" + doc.nation.

To simulate this, some extra fileds under _computed will be saved to MongoDB, for activations, it is

    "_computed" : {
        "deleteLogs" : true,
        "nspath" : "whisk.system/hello"
    },

And for other entities, is

    "_computed" : {
        "rootns" : "whisk.system"
    },

With these extra computed fields, we can do similar design doc queries on MongoDB.

There are 1 variables and 2 methods which are overrideable in trait MongoDBViewMapper:

  1. val indexes: List[Document]: define required indices for related collection, so system can create indices to speed up query requests
  2. def filter(ddoc: String, view: String, startKey: List[Any], endKey: List[Any]): Bson: generate a MongoDB query DSL which should be identical with the given design doc query DSL for CouchDB
  3. def sort(ddoc: String, view: String, descending: Boolean): Option[Bson]: generate a sort query DSL for MongoDB
  • No labels