Search

Live Updates With Queues, WebSockets, and Push Notifications. Part 1: RabbitMQ Queues and Workers

Josh Justice

9 min read

Dec 3, 2019

Live Updates With Queues, WebSockets, and Push Notifications. Part 1: RabbitMQ Queues and Workers

Traditional web and native applications load data when the user requests it, but there are now a variety of well-established technologies to deliver live updates to your users. Live updates can get your users information they need sooner and prevent them from operating on outdated information. This can be a major advantage over competitors’ apps that do not use live updates.

To explore the topic of live-updating applications, let’s create an app called Notifier that allows us to receive notifications about things that happen in various services, like GitHub pull requests or deployments on Netlify or Heroku. We’ll receive these notifications via webhooks, then send them back to a mobile application using WebSockets and push notifications. This architecture could work for any web or mobile application, but in our case we’ll use React Native so we can get push notifications in a really straightforward way.

The final project will work like this:

For our first post, let’s create the Node backend that will store and route our messages. Rather than a typical monolithic backend, we’ll use RabbitMQ to communicate between separate components. For this part, it will only provide the messages when requested from the server. In future parts we’ll add the live-update functionality.

If you like, you can download the completed server project for part 1.

Setting Up the Project

Before we get fancy, we’ll create a simple Express app that connects directly to a database. This is much simpler than where our project will end up and it works like so:

There are good reasons to be cautious about using MongoDB in production. One of the benefits of MongoDB, and many other NoSQL databases, is that they offer better support for horizontal scaling than traditional SQL databases. This could be useful for a live-updating system like ours if the traffic ever got extremely high. But be carefully to weigh the pros and cons of SQL and NoSQL databases before making a choice for your production database.

Install the following on your development machine:

Be sure to start Mongo and RabbitMQ using the appropriate mechanism for the way you installed it.

Create a new folder and initialize it as a Node project:

$ mkdir notifier-server
$ cd notifier-server
$ yarn init -y

Add a few runtime dependencies:

$ yarn add express body-parser mongoose

Here’s what they’re for:

  • express is a lightweight web server.
  • body-parser provides a middleware to handle plain-text request bodies to complement Express’s built-in JSON-handling middleware.
  • mongoose is a database client for MongoDB.

Connecting to MongoDB

Rather than interspersing database code throughout our app we’ll use what’s called the “repository pattern”: we’ll create a module that hides the database implementation and just allows the rest of the app to read and write data. Create a lib folder, and a lib/repo.js file inside it.

In that file, first, connect to the Mongo database:

const mongoose = require('mongoose');
 
const dbUrl = 'mongodb://localhost:27017/notifier';
 
mongoose.connect(dbUrl, {
  useNewUrlParser: true,
  useUnifiedTopology: true,
});
 
mongoose.connection.on('error', console.error);

Next, we need to define the core data type of our application. Let’s call it a “message”: something we receive from an external service letting us know that something happened.

Let’s define a Mongoose schema and model for it:

const messageSchema = new mongoose.Schema({
  text: String,
  url: String,
});
 
const Message = mongoose.model('Message', messageSchema);

Finally, we’ll create the functions we want to expose to the rest of the app:

const create = attrs => new Message(attrs).save();
 
const list = () => Message.find().then(messages => messages.slice().reverse());
 
module.exports = { create, list };
  • create saves a new message record with the attributes we pass it.
  • list returns all the message records in the database in the reverse order they were created.

Connecting to the Web

With our repository defined we can create our web service to provide access to it.

Create a web folder and an index.js file inside it. In that file create an Express app and listen on the configured port:

const http = require('http');
const express = require('express');
 
const app = express();
 
const server = http.createServer(app);
 
const { PORT = 3000 } = process.env;
server.listen(PORT);
console.log(`listening on port ${PORT}`);

Note that instead of using Express’s app.listen() shorthand, we are using Node’s built-in http module to create a server based on the Express app and then calling server.listen(). Setting up the HTTP server explicitly like this will allow us to add WebSockets in a future blog post.

Next, import two routers that we will define shortly and add them to the app:

const http = require('http');
 const express = require('express');
+const webhookRouter = require('./webhook');
+const listRouter = require('./list');
 
 const app = express();
 
+app.use('/webhook', webhookRouter);
+app.use('/list', listRouter);
 
 const server = http.createServer(app);

Now let’s define those routers.

First, the webhook. Webhooks are a mechanism for one web application to inform another of an event via an HTTP request. Many services offer webhook integrations. Over the course of this series we’ll integrate with GitHub, Netlify, and Heroku. To start out, we’ll create the simplest possible webhook that allows us to POST any text content we like. This will allow us to easily test out the architecture that we’ll build future webhooks on.

In the web folder create a webhook.js file and add the following:

const express = require('express');
const bodyParser = require('body-parser');
const repo = require('../lib/repo');
 
const webhookRoute = (req, res) => {
  const message = {
    text: req.body,
  };
  repo
    .create(message)
    .then(record => {
      res.end('Saved ' + JSON.stringify(record));
    })
    .catch(e => {
      console.error(e);
      res.status(500);
      res.end(e.message);
    });
};
 
const router = express.Router();
router.post('/', bodyParser.text({ type: '*/*' }), webhookRoute);
 
module.exports = router;

This will receive any text posted to it, save it to our repo as the text attribute of a message, and return an affirmative response.

Next, let’s provide a way to read the messages in the database. In web create list.js and add the following:

const express = require('express');
const repo = require('../lib/repo');
 
const listRoute = (req, res) => {
  repo
    .list()
    .then(messages => {
      res.setHeader('content-type', 'application/json');
      res.end(JSON.stringify(messages));
    })
    .catch(e => {
      console.error(e);
      res.status(500);
      res.setHeader('content-type', 'application/json');
      res.end(JSON.stringify({ error: e.message }));
    });
};
 
const router = express.Router();
router.get('/', listRoute);
 
module.exports = router;

With this, we are ready to try our app. Start the app:

$ node web

In another terminal, POST a few messages to the app using curl:

$ curl http://localhost:3000/webhook -d "this is a message"
$ curl http://localhost:3000/webhook -d "this is another message"

The -d flag allows us to provide an HTTP body to the request. Setting the -d flag will make our request a POST request by default, which is what we want here.

Now, request the list of data:

$ curl http://localhost:3000/list

You should receive back the messages you posted in JSON format (formatting added here for clarity):

[
  {
    "_id":"5dae1d549a0ade04888a1ac6",
    "text":"this is another message",
    "__v":0
  },
  {
    "_id":"5dae1d499a0ade04888a1ac5",
    "text":"this is a message",
    "__v":0
  }
]

Your _ids that are automatically assigned by MongoDB will be different than these.

Our app runs fine so far, but what if the process to save the data to the database was kind of slow? This could cause the requests from the third-party service to time out. We don’t need to pre-emptively address this, but say we decided in our app that it was important to do so. How could we avoid this timeout?

Connecting to RabbitMQ

We can decouple the processing of the data from receiving it. When we receive it, we just insert it into a queue. A separate worker process will pick up data added to the queue and do whatever we like with it. We’ll use RabbitMQ to handle the queueing, and the amqplib client library to communicate with it. Here’s an illustration of that flow of communication:

To start, add one more runtime dependency: amqplib is a client for RabbitMQ.

$ yarn add amqplib

As with our database, instead of accessing it directly from the rest of the app we’ll wrap it in a module to hide the implementation. Create a lib/queue.js file and add the following:

const amqp = require('amqplib');
 
const queueUrl = 'amqp://localhost';
 
const channel = () => {
  return amqp.connect(queueUrl).then(connection => connection.createChannel());
};

First, we provide a private helper function channel that will connect to the queue and create a channel for communication.

Next, let’s define a send function to allow us to send a message on a given queue:

const send = (queue, message) =>
  channel().then(channel => {
    const encodedMessage = JSON.stringify(message);
    channel.assertQueue(queue, { durable: false });
    channel.sendToQueue(queue, Buffer.from(encodedMessage));
    console.log('Sent to "%s" message %s', queue, encodedMessage);
  });

Note that we serialize the message to JSON, so we can handle any object structure that’s serializable.

Next, let’s create a receive function allowing us to listen for messages on a given queue, calling a passed-in handler when a message arrives:

const receive = (queue, handler) =>
  channel().then(channel => {
    channel.assertQueue(queue, { durable: false });
    console.log('Listening for messages on queue "%s"', queue);
    channel.consume(queue, msg => handler(JSON.parse(msg.content.toString())), {
      noAck: true,
    });
  });

Finally, we export these two functions:

module.exports = { send, receive };

Using Queueing in Our App

Let’s see these in use. We’ll change our webhook route to enqueue the data instead of saving it to the database. Then we’ll define a separate worker process to save that message to the database.

Open web/webhook.js and make the following changes:

const bodyParser = require('body-parser');
-const repo = require('../lib/repo');
+const queue = require('../lib/queue');
 
 const webhookRoute = (req, res) => {
   const message = {
     text: req.body,
   };
-  repo
-    .create(message)
-    .then(record => {
-      res.end('Saved ' + JSON.stringify(record));
-    })
+  queue
+    .send('incoming', message)
+    .then(() => {
+      res.end('Received ' + JSON.stringify(message));
+    })
     .catch(e => {
       console.error(e);
       res.status(500);
       res.end(e.message);
     });
 };

Instead of saving our message to the repo, now we enqueue it.

Note one other significant difference: we can no longer confirm in the response that record has been “saved” to the database because that hasn’t happened yet; we can only say that the message was received. Also, we will not have the complete record including database ID to return; we can only echo back the message as we received it.

Now our data is being sent to a queue named incoming. How can we listen for it to come in to actually save it to the database? We can create a worker to do so.

Create a folder workers at the root of your project, then a file index.js inside it. Add the following contents to it:

const queue = require('../lib/queue');
const repo = require('../lib/repo');
 
const handleIncoming = message =>
  repo
    .create(message)
    .then(record => {
      console.log('Saved ' + JSON.stringify(record));
    });
  
queue
  .receive('incoming', handleIncoming)
  .catch(console.error);

This is pretty simple; we define a function to handle an incoming message. It saves it to the database, then logs it out.

We will need to run these as two separate processes. Quit and restart the existing web process:

$ node web

Then in another terminal to start the worker:

$ node workers

Send a message:

$ curl http://localhost:3000/webhook -d "this is a message to be enqueued"

Both terminals will appear to update instantly. In the web process you’ll see:

Sent to "incoming" message {"text":"this is a message to be enqueued"}

And in the worker process you’ll see:

Saved {"_id":"5dae2346fc89d91e09576e70","text":"this is a message to be enqueued","__v":0}

So our two processes are now working together. Our webhook process receives the webhook, puts it on a queue, and returns an HTTP response as quickly as possible. Our worker process receives the message from the queue and saves it to the database.

What’s Next?

In this post we’ve built a good foundation for our Node backend that we will build live-update functionality in future posts. In our next post we’ll create a React Native client app using Expo so we’ll have a frontend that’s ready for a great live-update experience as well.

Josh Justice

Author Big Nerd Ranch

Josh Justice has worked as a developer since 2004 across backend, frontend, and native mobile platforms. Josh values creating maintainable systems via testing, refactoring, and evolutionary design, and mentoring others to do the same. He currently serves as the Web Platform Lead at Big Nerd Ranch.

Speak with a Nerd

Schedule a call today! Our team of Nerds are ready to help

Let's Talk

Related Posts

We are ready to discuss your needs.

Not applicable? Click here to schedule a call.

Stay in Touch WITH Big Nerd Ranch News