Skip to main content

Communicate Through RabbitMQ Using NodeJS and Fastify

If we have two or more services that need to talk to each other but it is allowed to be asynchronous, we can implement a queue system in our system using RabbitMQ. RabbitMQ server will maintain all queues and connections to all services connected to it.

This post will utilize Fastify as a NodeJS framework to build our program. This framework is similar to Express but implements some unique features like a plugin concept and an improved request-respond handler.

First, we need to create two plugins, one is for sending a message, another one is for consuming the sent message. At first, we will make it using a normal queue. There is one mechanism for how a queue works, it is like a queue in the real world. When there are five persons in a queue and three staff to handle the queue, one person is served only by one staff, there is no need for other staff to handle any person that has been served, and there is no need for a person to be handled repeatedly by other staffs. For example, if there are two services that listen to a queue and a message is passed to the queue, RabbitMQ will manage which connected service will handle the message.


Create a plugin to send a message

The plugin will run the following processes.

  1. Create a channel to the RabbitMQ server
  2. Assert a queue
  3. Publish a message to the queue

For instance, the queue will be named all_queue.

import { FastifyInstance, FastifyPluginOptions } from 'fastify';
import amqp, { Channel } from 'amqplib';
import fastifyPlugin from 'fastify-plugin';

const rabbitUrl = 'amqp://guest:guest@localhost:5672'; // change to your own server URL
const queueName = 'all_queue'

async function getRabbitChannel(): Promise<Channel> {
  try {
    const conn = await amqp.connect(rabbitUrl);
    return conn.createChannel();
  } catch (error: any) {
    console.error(error.message || error)
    throw error; 
  }
}

async function rabbitPlugin(fastify: FastifyInstance, opts: FastifyPluginOptions, done: any) {
  const channel = await getRabbitChannel();
  
  // insert the queue only if it doesn't exist
  await channel.assertQueue(queueName, {});

  // create a function to publish a message
  function publishMessage(messageObject){
    const msg = JSON.stringify(messageObject);
    channel.publish('', queueName, Buffer.from(msg));
  }

  // make the function available in fastify scope
  fastify.decorate('publishMessage', publishMessage);

  done();
}

export default fastifyPlugin(rabbitPlugin);

In the code above, we declare a function to publish a message named publishMessage and decorate the fastify instance so that the function can be called in any part of our application. We also utilize fastify-plugin so that our plugin can be available in any plugin regardless of its location.


Create a plugin to consume the sent messages

The plugin will run the following processes.

  1. Create a channel to the RabbitMQ server
  2. Assert a queue
  3. Listen and consume any messages on the queue
import amqp, { Channel, ConsumeMessage } from 'amqplib';
import { FastifyInstance, FastifyPluginOptions } from 'fastify';
import fastifyPlugin from 'fastify-plugin';

const rabbitUrl = 'amqp://guest:guest@localhost:5672'; // change to your own server URL
const queueName = 'all_queue'

async function getRabbitChannel(): Promise<Channel> {
  try {
    const conn = await amqp.connect(rabbitUrl);
    return conn.createChannel();
  } catch (error: any) {
    console.error(error.message || error)
    throw error; 
  }
}

async function consumeRabbitMessage(msg: ConsumeMessage | null, fastify: FastifyInstance, channel: Channel) {
  if (!msg) {
    return;
  }

  try {
    const data = JSON.parse(msg.content.toString());
    
    // do something else
    
  } catch (error) {
    fastify.log.error(error);
  }
}

async function rabbitPlugin(fastify: FastifyInstance, opts: FastifyPluginOptions, done: any) {
  const channel = await getRabbitChannel();

  await channel.assertQueue(queueName, {}).then(() => {
    // listen to the queue
    return channel.consume(queueName, async (msg) => {
      await consumeRabbitMessage(msg, fastify, channel);
    });
  });

  done();
}

export default fastifyPlugin(rabbitPlugin);

Notice that we pass fastify and channel instances to the message handler function (consumeRabbitMessage). It is because the handler function is outside the plugin declaration scope and we want to allow the function to use those instances.


Registering the plugin

Finally, we can register each plugin to our services, one is for our publisher service, and another is for our consumer service.

import fastify from 'fastify';
import rabbitPlugin from 'your/plugin/location'; // publisher or consumer

const server = fastify({ logger: true });
server.register(rabbitPlugin);

// some codes


Comments

Popular posts from this blog

Increase of Malicious Activities and Implementation of reCaptcha

In recent time, I've seen the increase of malicious activities such as login attempts or phishing emails to some accounts I manage. Let me list some of them and the actions taken. SSH Access Attempts This happened on a server that host a Gitlab server. Because of this case, I started to limit the incoming traffic to the server using internal and cloud firewall provided by the cloud provider. I limit the exposed ports, connected network interfaces, and allowed protocols. Phishing Attempts This typically happened through email and messaging platform such as Whatsapp and Facebook Page messaging. The malicious actors tried to share a suspicious link lured as invoice, support ticket, or something else. Malicious links shared Spammy Bot The actors leverage one of public endpoint on my website to send emails. Actually, the emails won't be forwarded anywhere except to my own email so this just full my inbox. This bot is quite active, but I'm still not sure what...

Configuring Swap Memory on Ubuntu Using Ansible

If we maintain a Linux machine with a low memory capacity while we are required to run an application with high memory consumption, enabling swap memory is an option. Ansible can be utilized as a helper tool to automate the creation of swap memory. A swap file can be allocated in the available storage of the machine. The swap file then can be assigned as a swap memory. Firstly, we should prepare the inventory file. The following snippet is an example, you must provide your own configuration. [server] 192.168.1.2 [server:vars] ansible_user=root ansible_ssh_private_key_file=~/.ssh/id_rsa Secondly, we need to prepare the task file that contains not only the tasks but also some variables and connection information. For instance, we set /swapfile  as the name of our swap file. We also set the swap memory size to 2GB and the swappiness level to 60. - hosts: server become: true vars: swap_vars: size: 2G swappiness: 60 For simplicity, we only check the...

Deliver SaaS According Twelve-Factor App

If you haven't heard of  the twelve-factor app , it gives us a recommendation or a methodology for developing SaaS or web apps structured into twelve items. The recommendation has some connections with microservice architecture and cloud-native environments which become more popular today. We can learn the details on its website . In this post, we will do a quick review of the twelve points. One Codebase Multiple Deployment We should maintain only one codebase for our application even though the application may be deployed into multiple environments like development, staging, and production. Having multiple codebases will lead to any kinds of complicated issues. Explicitly State Dependencies All the dependencies for running our application should be stated in the project itself. Many programming languages have a kind of file that maintains a list of the dependencies like package.json in Node.js. We should also be aware of the dependencies related to the pla...

Kenshin VS The Assassin

It is an assassin versus assassin.

Free Cloud Services from UpCloud

Although I typically deploy my development environment or experimental services on UpCloud , I do not always stay updated on its announcements. Recently, I discovered that UpCloud has introduced a new plan called the Essentials plan, which enables certain cloud services to be deployed at no cost. The complimentary services are generally associated with network components or serve as the foundation for other cloud services. This feature is particularly useful when retaining foundational services, such as a load balancer, is necessary, while tearing down all services and reconfiguring the DNS and other application settings each time we temporarily clean up infrastructure to reduce costs is undesirable.  When reviewing the service specifications of the cloud services in the Essentials plan, they appear to be very similar to those in the Development plan. The difference in service levels is unclear, but it could be related to hardware or resource allocation. For instance, the loa...

What's Good About Strapi, a Headless CMS

Recently, I've been revisiting Strapi as a solution for building backend systems. I still think this headless CMS can be quite useful in certain cases, especially for faster prototyping or creating common websites like company profiles or e-commerce platforms. It might even have the potential to handle more complex systems. With the release of version 5, I'm curious to know what updates it brings. Strapi has launched a new documentation page, and it already feels like an improvement in navigation and content structure compared to the previous version. That said, there's still room for improvement, particularly when it comes to use cases and best practices for working with Strapi. In my opinion, Strapi stands out with some compelling features that could catch developers' attention. I believe three key aspects of Strapi offer notable advantages. First, the content-type builder feature lets us design the data structure of an entity or database model, including field ...