Streaming Data – MongoDB, NodeJS, Robomongo


Streaming Data – Aggregate

with MongoDB, NodeJS, Robomongo

Do you want to learn more about NoSQL databases? The following is part II of my series on building fast apps with MongoDB, NodeJS and Robomongo (great MongoDB tools).  Part I – Installation is about getting and installing the tools along with a simple primer on how to build a simple mongodb database.  In this post, we’ll explore building a twitter stream and sending tweets into a local mongodb database. Finally, We going to learn some basic analytics for MongoDB using Map-Reduce and JavaScript.

Once Again, You can we use SQL to analyze JSON data. In later posts I’ll show you have to integrate your data in Cassandra, Hadoop, Hive, Spark. If you have a well architected MongoDB cluster that is properly configured – you should get solid performance. Also, Hadoop (Hive – Spark) plays well with JSON data for large scale – fast analytics.

Twitter Stream

The twitter stream is great for getting allot of data into a database quickly. The Streaming APIs give developers low latency access to Twitter’s global stream of Tweet data. The following Write-Up from twitter goes into detail on the Twitter Stream.  So, If your ready – go ahead and signup for a Developer account, create an app and get your OAUTH needed for this or your own dev project. Your OAUTH credentials will be going into your own mongostream app.  (below).

Git the Mongostream Framework App

I built a simple framework : Mongostream by Gus Segura [ on GitHub ].  This is built on Node.js.  You will need Node installed before you begin. See Part I – Installation instructions on MongoDB and Node.JS.

  • Create a Development Directory on your local machine.
mkdir mongo-test [ then change directory to this new directory...]
cd mongo-test
  • Download the kyanyoga/mongostream Git Repository.
 git clone

Edit the credentials.js file – place your twitter credentials into this file – obtained from signing up as a dev.

var credentials = {
  consumer_key: '',
  consumer_secret: '',
  access_token_key: '',
  access_token_secret: ''

module.exports = credentials;

Edit the server.js – point your server to your Local or Remote MongoDB database.

MongoClient.connect('mongodb://', function(err, db) {
    if(err) {console.log(err);}
  •  Next, Install the required Node Modules with NPM, Then Run the App … Example below.
HOST:mongostream kyanyoga$ npm install 
[ This will install the node modules ]. 
[ see previous post to understand how npm knows what to install. Hint: package.json ]
[ you can edit server.js to point to another mongodb or make sure your local is started ]
[ Now. Run the server.js ]
HOST:mongostream kyanyoga$ node server.js
2016-06-16 12:06:29|pie|RT @i_dentitytheft: I remember coco and pie was trending topic and I never knew who pie was still don't till this day 
2016-06-16 12:06:43|summer|RT @boysandgals: summer 2k16
2016-06-16 12:06:61|summer|Inc: The best 25 apps to enhance your life in the summer of 2016.
2016-06-16 12:06:65|summer|RT @SadeOsoCute: making money all summer 16 
Press CTRL-C to Stop Streaming : Note: Don't leave this running too long. It will FILL your drive!! 
FOR SIMPLE DEV-DEMO: 5-10 Mins Tops. You can stop the server.js and DROP the collection from MongoDB but
you wont get space back unless or until your repair the database or clear it out from command line.

You can edit the server.js – arrayOfTopics to place your own interesting topics that you would like to follow.

// Twitter Stream Topics
var arrayOfTopics = ['pie', 'nfl', 'summer', 'mongodb','bigdata']; // topics

There you go! You’re streaming Tweets live in real-time from twitter into a mongodb database.  This is the basic plumbing of dozens of on-line analytical apps.  This is where we started years ago when we began developing our own text sentiment analytics toolkits.  Believe me, The Sentiment and Machine Learning – I think is way harder than getting the data.  The challenge becomes storing the data very quickly.

Pop over to Robomongo GUI, Open it up and Enter the commands from the screen shot below.  Note: If you’re in Window or Unix, You can run the same commands using the Mongo Client.  Example Below.

HOST:MapReduceCourse User$ mongo --host localhost
MongoDB shell version: 3.2.8
connecting to: localhost:27017/test
Server has startup warnings: 
2016-08-07T11:34:18.028-0500 I CONTROL  [initandlisten] 
2016-08-07T11:34:18.028-0500 I CONTROL  [initandlisten] ** WARNING: soft rlimits too low. Number of files is 256, should be at least 1000
> show databases
btstream     0.078GB
local        0.078GB
streamworld  0.078GB
> use streamworld
switched to db streamworld
> db.getCollection('tsentstrm').find({}).count()
> db.getCollection('tsentstrm').find({})

Highlight the commands, Then press the Green Triangle [ on tool bar next to floppy disk icon ] or CMD-Enter to Execute. I’ve included a few sample commands below. Explore the tool. Right Click on a Collection. Have Fun!

db.getCollection('tsentstrm').find({ topic: "summer"}).count()

Note: The server.js Node program – created the : streamworld database if it did not exist and is streaming to : tsentstrm collection.  [ Look for db drum icon, then follow tree down: tsentstrm document collection, click around, explore the ui. ]. You can edit all of this in the Node server.js program if you want to make changes. However, My aggregate and code samples use the tsentstrm collection name. You will have to edit your code if make changes.
Ok, So – While the streamer is running. Try issuing : db.getCollection(‘tsentstrm’).find({}).count() a few times and note, your count should be increasing. You can also issue : db.stats() to see the object count and size increasing.  Just let it run long enough for you to understand that the DB is getting BIGGER and the data is definitely streaming.  “Make sure to shut it down after a few minutes – You should have plenty of data”.

Aggregate – Analytics [Code Samples]

// aggregate
{$project : {new_topic : {$substr : ["$topic",0, 100]}}}, 
{$group:{_id:"$new_topic", "count": {$sum:1}}}

/* 1 */
 "_id" : "nfl",
 "count" : 22.0

/* 2 */
 "_id" : "pie",
 "count" : 119.0

/* 3 */
 "_id" : "bigdata",
 "count" : 12.0

/* 4 */
 "_id" : "summer",
 "count" : 1006.0
// find max date

/* 1 */
 "_id" : ObjectId("5762e9b6baf9d0d142e2fabc"),
 "dt" : "2016-06-16 13:06:97",
 "topic" : "summer",
 "tweet" : "RT @UPOinDC: Good news! A former P.O.W.E.R. student is a summer intern for the Smithsonian Environmental Research Center!…"
// find min value by topic
// Build some indexes. Then re-run a find and see if its faster.
db.tsentstrm.ensureIndex({"dt":1 , "tweet":1})

// map - reduce: simple
var mapFun1 = function() {
 emit(this._id, this.topic)
var redFun1 = function(tweetid, topic) {
 return Array.count(tweetid)


 { out: "map_reduce_example"}
// go look at your new map_reduce_example collection.

Whew, That’s allot… In future post, I will reference back to these two parts as a foundation.

Also, I’ll spend a bit more time on the MongoDB implementation of MapReduce.  It’s a great way to explore the MapReduce paradigm without having to spin up an Hadoop cluster when your starting out.  A next step would be – exporting this JSON data and using PYSPARK – EMR to perform large scale analytics.

In the mean time, Dig deeper. Learn how filter data. Come up with your own new aggregations. Have Fun!

Please Contact US or Subscribe to our Blog if you found this interesting or would like more information.

Subscribe : Blueskymetrics Blog

* indicates required,  Managed By Mail-chimp – Please check your Spam Folder and Confirm Subscription.