Skip to main content

Communicate Through RabbitMQ Using NodeJS and Fastify

If we have two or more services that need to talk to each other but it is allowed to be asynchronous, we can implement a queue system in our system using RabbitMQ. RabbitMQ server will maintain all queues and connections to all services connected to it.

This post will utilize Fastify as a NodeJS framework to build our program. This framework is similar to Express but implements some unique features like a plugin concept and an improved request-respond handler.

First, we need to create two plugins, one is for sending a message, another one is for consuming the sent message. At first, we will make it using a normal queue. There is one mechanism for how a queue works, it is like a queue in the real world. When there are five persons in a queue and three staff to handle the queue, one person is served only by one staff, there is no need for other staff to handle any person that has been served, and there is no need for a person to be handled repeatedly by other staffs. For example, if there are two services that listen to a queue and a message is passed to the queue, RabbitMQ will manage which connected service will handle the message.


Create a plugin to send a message

The plugin will run the following processes.

  1. Create a channel to the RabbitMQ server
  2. Assert a queue
  3. Publish a message to the queue

For instance, the queue will be named all_queue.

import { FastifyInstance, FastifyPluginOptions } from 'fastify';
import amqp, { Channel } from 'amqplib';
import fastifyPlugin from 'fastify-plugin';

const rabbitUrl = 'amqp://guest:guest@localhost:5672'; // change to your own server URL
const queueName = 'all_queue'

async function getRabbitChannel(): Promise<Channel> {
  try {
    const conn = await amqp.connect(rabbitUrl);
    return conn.createChannel();
  } catch (error: any) {
    console.error(error.message || error)
    throw error; 
  }
}

async function rabbitPlugin(fastify: FastifyInstance, opts: FastifyPluginOptions, done: any) {
  const channel = await getRabbitChannel();
  
  // insert the queue only if it doesn't exist
  await channel.assertQueue(queueName, {});

  // create a function to publish a message
  function publishMessage(messageObject){
    const msg = JSON.stringify(messageObject);
    channel.publish('', queueName, Buffer.from(msg));
  }

  // make the function available in fastify scope
  fastify.decorate('publishMessage', publishMessage);

  done();
}

export default fastifyPlugin(rabbitPlugin);

In the code above, we declare a function to publish a message named publishMessage and decorate the fastify instance so that the function can be called in any part of our application. We also utilize fastify-plugin so that our plugin can be available in any plugin regardless of its location.


Create a plugin to consume the sent messages

The plugin will run the following processes.

  1. Create a channel to the RabbitMQ server
  2. Assert a queue
  3. Listen and consume any messages on the queue
import amqp, { Channel, ConsumeMessage } from 'amqplib';
import { FastifyInstance, FastifyPluginOptions } from 'fastify';
import fastifyPlugin from 'fastify-plugin';

const rabbitUrl = 'amqp://guest:guest@localhost:5672'; // change to your own server URL
const queueName = 'all_queue'

async function getRabbitChannel(): Promise<Channel> {
  try {
    const conn = await amqp.connect(rabbitUrl);
    return conn.createChannel();
  } catch (error: any) {
    console.error(error.message || error)
    throw error; 
  }
}

async function consumeRabbitMessage(msg: ConsumeMessage | null, fastify: FastifyInstance, channel: Channel) {
  if (!msg) {
    return;
  }

  try {
    const data = JSON.parse(msg.content.toString());
    
    // do something else
    
  } catch (error) {
    fastify.log.error(error);
  }
}

async function rabbitPlugin(fastify: FastifyInstance, opts: FastifyPluginOptions, done: any) {
  const channel = await getRabbitChannel();

  await channel.assertQueue(queueName, {}).then(() => {
    // listen to the queue
    return channel.consume(queueName, async (msg) => {
      await consumeRabbitMessage(msg, fastify, channel);
    });
  });

  done();
}

export default fastifyPlugin(rabbitPlugin);

Notice that we pass fastify and channel instances to the message handler function (consumeRabbitMessage). It is because the handler function is outside the plugin declaration scope and we want to allow the function to use those instances.


Registering the plugin

Finally, we can register each plugin to our services, one is for our publisher service, and another is for our consumer service.

import fastify from 'fastify';
import rabbitPlugin from 'your/plugin/location'; // publisher or consumer

const server = fastify({ logger: true });
server.register(rabbitPlugin);

// some codes


Comments

Popular posts from this blog

Increase of Malicious Activities and Implementation of reCaptcha

In recent time, I've seen the increase of malicious activities such as login attempts or phishing emails to some accounts I manage. Let me list some of them and the actions taken. SSH Access Attempts This happened on a server that host a Gitlab server. Because of this case, I started to limit the incoming traffic to the server using internal and cloud firewall provided by the cloud provider. I limit the exposed ports, connected network interfaces, and allowed protocols. Phishing Attempts This typically happened through email and messaging platform such as Whatsapp and Facebook Page messaging. The malicious actors tried to share a suspicious link lured as invoice, support ticket, or something else. Malicious links shared Spammy Bot The actors leverage one of public endpoint on my website to send emails. Actually, the emails won't be forwarded anywhere except to my own email so this just full my inbox. This bot is quite active, but I'm still not sure what...

Configuring Swap Memory on Ubuntu Using Ansible

If we maintain a Linux machine with a low memory capacity while we are required to run an application with high memory consumption, enabling swap memory is an option. Ansible can be utilized as a helper tool to automate the creation of swap memory. A swap file can be allocated in the available storage of the machine. The swap file then can be assigned as a swap memory. Firstly, we should prepare the inventory file. The following snippet is an example, you must provide your own configuration. [server] 192.168.1.2 [server:vars] ansible_user=root ansible_ssh_private_key_file=~/.ssh/id_rsa Secondly, we need to prepare the task file that contains not only the tasks but also some variables and connection information. For instance, we set /swapfile  as the name of our swap file. We also set the swap memory size to 2GB and the swappiness level to 60. - hosts: server become: true vars: swap_vars: size: 2G swappiness: 60 For simplicity, we only check the...

Deliver SaaS According Twelve-Factor App

If you haven't heard of  the twelve-factor app , it gives us a recommendation or a methodology for developing SaaS or web apps structured into twelve items. The recommendation has some connections with microservice architecture and cloud-native environments which become more popular today. We can learn the details on its website . In this post, we will do a quick review of the twelve points. One Codebase Multiple Deployment We should maintain only one codebase for our application even though the application may be deployed into multiple environments like development, staging, and production. Having multiple codebases will lead to any kinds of complicated issues. Explicitly State Dependencies All the dependencies for running our application should be stated in the project itself. Many programming languages have a kind of file that maintains a list of the dependencies like package.json in Node.js. We should also be aware of the dependencies related to the pla...

Kenshin VS The Assassin

It is an assassin versus assassin.

Handling PDF Generation in Web Service

If we are building a website that requires a PDF generation feature, there are several options for implementing it based on the use cases or user requirements. First, we can generate the PDF on the client side using any available client library. It is suitable if the use case is to print out some data that is already available inside certain website components, and we want to maintain the styles of the components in the document. Second, we can do it fully in the back-end using any library available, such as PDF-lib, jsPDF, and so on. This approach is suitable if we want to keep the data processing or any related business functions in the back-end server. This second approach might have disadvantages, such as the difficulty of maintaining the design assets and styles which are already on our website. Third, it is using a hybrid approach, where certain processes are handled on the client side, and some are handled on the back-end. In this post, I want to discuss more about the...

Free Cloud Services from UpCloud

Although I typically deploy my development environment or experimental services on UpCloud , I do not always stay updated on its announcements. Recently, I discovered that UpCloud has introduced a new plan called the Essentials plan, which enables certain cloud services to be deployed at no cost. The complimentary services are generally associated with network components or serve as the foundation for other cloud services. This feature is particularly useful when retaining foundational services, such as a load balancer, is necessary, while tearing down all services and reconfiguring the DNS and other application settings each time we temporarily clean up infrastructure to reduce costs is undesirable.  When reviewing the service specifications of the cloud services in the Essentials plan, they appear to be very similar to those in the Development plan. The difference in service levels is unclear, but it could be related to hardware or resource allocation. For instance, the loa...