Some examples of Mongo Scala Driver usage

Sometimes in #consultantLife you find yourself in the middle of an obscure, sparsely-blogged-about problem–say, your Scala API suddenly needs to talk to a Mongo database. And you ask yourself, “How did I get here? This is not my beautiful SQL…” But the only way out is through, so you install the Mongo Scala Driver and forge ahead. But it would be just, like, SUPER helpful if you had a few more examples of how to use it.

I’m here for you, friend.

The key to remember is that querying Mongo happens asynchronously. The “observer” sends off a query, which is “observable,” and then “subscribes” to await the observation of the response. The query doesn’t actually execute until the moment of subscription. (I’m using a bunch of scare quotes because these words don’t quite feel like the right metaphor and as such don’t make a ton of sense to me.)

The following examples assume you’ve already sorted out how to connect to your database. Also “logMessage” and “logErrorMessage” are custom methods beyond the scope of these examples. (My journey through Mongo-Scalaland was part of an open source project, so if you want to see all the ins and outs of the working code, check out the repo.)

Say you have a student that you want to add to the students collection. First you gather the information as a Document:

val dataSource = new MongoDataSource //you've defined this elsewhere; it handles making the connection
val students : MongoCollection[Document] = dataSource.getCollection("students")

val doc: Document = Document(
    "schoolStudentID" -> "9999",
    "fullName" -> "Tom Servo",
    "gradeLevel" -> 8
)

Then insert it immediately using insertOne(doc).subscribe(). Now subscribe() takes parameters to override the onNext, onError, and onComplete callbacks, and onNext and onError take their own parameters; the type of result will be different if you’re subscribing to a find (result: Document), an update (result: UpdateResult), or a delete (result: DeleteResult). You obviously can break this anonymous new Observer out and give it a name if that feels better, but the docs seem to prefer the anonymous inline style.

students.insertOne(doc).subscribe(new Observer[Completed] {
    override def onNext(result: Completed): Unit = {
        logSuccessMessage("student successfully inserted")
    }
    override def onError(e: Throwable) = {
        logMessage("error: " + e.getMessage)
        dataSource.close //onComplete doesn't get called when an error is thrown, we better close the connection here
    }
    override def onComplete(): Unit = {
        dataSource.close
    }
})

Say you want to find one student with a particular name and student ID number (assigned by the school, different from the Mongo ObjectID). There’s not, far as I can tell, a findOne() method, so we’ll do a .find().first() chain. We’ll use methods in the Filters class to utilize some Mongo query selectors.

And, hey, let’s make it synchronous, because we need that student data before we can proceed. This is easily accomplished by making the query a Future and awaiting the result.


val targetName = "Student McGradeschoolface"
val targetID = "123456"

val oneStudent = students
.find(Filters.and(Filters.equal("fullName", targetName), Filters.equal("schoolStudentID", targetID)))
.first()

//The query is written but hasn't been executed. The next line executes it and waits up to ten seconds for the response.

val response = Await.result(oneStudent.toFuture, Duration(10, TimeUnit.SECONDS))

if (response != null && response.nonEmpty && response.head.nonEmpty) {
//do stuff
}

dataSource.close

Here’s a little more complicated example. Let’s say users for this website each have a collection of permissions of areas of the site (“organizations”) they can access; on the website, we have a place in each area to add a user. When that form gets submitted, we check the database for the user, and if it’s there, we add a permission to the user’s collection; otherwise, we want to insert a new user with that one permission.

def insertPossiblyNewUser(fullName: String, organizationId: BsonValue): Unit = {
    val newUser = new User(fullName, organizationId) //organizationID is a Mongo ObjectID
    val collection = retrieveCollection("users")

    val filter: Bson = equal("email", newUser.email)

    val userQuery = collection.find(filter).first()

    //Email addresses are unique, so it should find 0-1 users.
    //'Course if there are 2 or more it'll just return the first one it finds without error.

    //Now let's execute the query and wait...
    val user = Await.result(userQuery.toFuture(), Duration(10, TimeUnit.SECONDS))

    val onCompleteInsert = new Observer[Completed] {
        override def onNext(result: Completed): Unit = logMessage("Inserted new user. Result: " + result.toString)
        override def onError(e: Throwable): Unit = {
            logErrorMessage("ERROR: " + e.toString)
            dataSource.close
        }
        override def onComplete(): Unit = {
            dataSource.close
        }
    }

    val onCompleteUpdate = new Observer[UpdateResult] {
        override def onNext(result: UpdateResult): Unit = logMessage("Updated Admin User. Result: " + result.toString)
        override def onError(e: Throwable): Unit = {
            logErrorMessage("ERROR: " + e.toString)
            dataSource.close
        }
        override def onComplete(): Unit = {
            dataSource.close
        }
    }

    //We don't need to wait on the next operation, so we'll fire it off async
    if (user.isEmpty) {
        collection.insertOne(adminUser.toDocument).subscribe(onCompleteInsert)
    } else {
        //here's the method for updating a sub-collection!
        collection.updateOne(filter, Updates.addToSet("permissions", adminUser.userPermissions.head)).subscribe(onCompleteUpdate)
    }
}

You can also, of course, use callbacks instead of toFuture to enforce synchronicity. The following block inserts a new organization, and then on success queries the collection for that organization, and then calls insertPossiblyNewUser (defined above) if the organization is found.

def insertOrganization(someCriterion: String, someOtherCriterion: String): Unit = {
    val organization = new Organization(someCriterion, someOtherCriterion) //we don't care about this object; just know it has a Name field
    val collection = retrieveCollection("organizations")

    val observable: Observable[Completed] = collection.insertOne(organization)

    observable.subscribe(new Observer[Completed] {
        override def onNext(result: Completed): Unit = logMessage("Inserted Organization")
        override def onError(e: Throwable): Unit = logErrorMessage(e.toString)
        override def onComplete(): Unit = {
            addUserToOrganization(collection, someCriterion, organization.Name)
        }
    })
}

def addUserToOrganization(collection: MongoCollection[Document], someCriterion: String, organizationName: String): Unit = {

    //Query for the collection we just inserted. Note this could return many results.
    //It doesn't really make sense that it would, but... just go with it.
    val observable: Observable[Document] = collection.find(equal("name", organizationName))

    observable.subscribe(new Observer[Document] {
        override def onNext(result: Document): Unit = {
            //This should asynchronously insert the new user into each result.
            val organizationId = result.get("_id").get
            insertPossiblyNewUser(someCriterion, organizationId)
        }
        override def onError(e: Throwable): Unit = logErrorMessage(e.toString)
        override def onComplete(): Unit = logMessage("Done with querying organizations")
    })
}

So there ya go! I hope this was helpful. And if it was, that means you too are using Scala and Mongo so… Godspeed, my friend.

Leave a Reply

Your email address will not be published. Required fields are marked *