Skip to main content

Communicate Through RabbitMQ Using NodeJS and Fastify

If we have two or more services that need to talk to each other but it is allowed to be asynchronous, we can implement a queue system in our system using RabbitMQ. RabbitMQ server will maintain all queues and connections to all services connected to it.

This post will utilize Fastify as a NodeJS framework to build our program. This framework is similar to Express but implements some unique features like a plugin concept and an improved request-respond handler.

First, we need to create two plugins, one is for sending a message, another one is for consuming the sent message. At first, we will make it using a normal queue. There is one mechanism for how a queue works, it is like a queue in the real world. When there are five persons in a queue and three staff to handle the queue, one person is served only by one staff, there is no need for other staff to handle any person that has been served, and there is no need for a person to be handled repeatedly by other staffs. For example, if there are two services that listen to a queue and a message is passed to the queue, RabbitMQ will manage which connected service will handle the message.


Create a plugin to send a message

The plugin will run the following processes.

  1. Create a channel to the RabbitMQ server
  2. Assert a queue
  3. Publish a message to the queue

For instance, the queue will be named all_queue.

import { FastifyInstance, FastifyPluginOptions } from 'fastify';
import amqp, { Channel } from 'amqplib';
import fastifyPlugin from 'fastify-plugin';

const rabbitUrl = 'amqp://guest:guest@localhost:5672'; // change to your own server URL
const queueName = 'all_queue'

async function getRabbitChannel(): Promise<Channel> {
  try {
    const conn = await amqp.connect(rabbitUrl);
    return conn.createChannel();
  } catch (error: any) {
    console.error(error.message || error)
    throw error; 
  }
}

async function rabbitPlugin(fastify: FastifyInstance, opts: FastifyPluginOptions, done: any) {
  const channel = await getRabbitChannel();
  
  // insert the queue only if it doesn't exist
  await channel.assertQueue(queueName, {});

  // create a function to publish a message
  function publishMessage(messageObject){
    const msg = JSON.stringify(messageObject);
    channel.publish('', queueName, Buffer.from(msg));
  }

  // make the function available in fastify scope
  fastify.decorate('publishMessage', publishMessage);

  done();
}

export default fastifyPlugin(rabbitPlugin);

In the code above, we declare a function to publish a message named publishMessage and decorate the fastify instance so that the function can be called in any part of our application. We also utilize fastify-plugin so that our plugin can be available in any plugin regardless of its location.


Create a plugin to consume the sent messages

The plugin will run the following processes.

  1. Create a channel to the RabbitMQ server
  2. Assert a queue
  3. Listen and consume any messages on the queue
import amqp, { Channel, ConsumeMessage } from 'amqplib';
import { FastifyInstance, FastifyPluginOptions } from 'fastify';
import fastifyPlugin from 'fastify-plugin';

const rabbitUrl = 'amqp://guest:guest@localhost:5672'; // change to your own server URL
const queueName = 'all_queue'

async function getRabbitChannel(): Promise<Channel> {
  try {
    const conn = await amqp.connect(rabbitUrl);
    return conn.createChannel();
  } catch (error: any) {
    console.error(error.message || error)
    throw error; 
  }
}

async function consumeRabbitMessage(msg: ConsumeMessage | null, fastify: FastifyInstance, channel: Channel) {
  if (!msg) {
    return;
  }

  try {
    const data = JSON.parse(msg.content.toString());
    
    // do something else
    
  } catch (error) {
    fastify.log.error(error);
  }
}

async function rabbitPlugin(fastify: FastifyInstance, opts: FastifyPluginOptions, done: any) {
  const channel = await getRabbitChannel();

  await channel.assertQueue(queueName, {}).then(() => {
    // listen to the queue
    return channel.consume(queueName, async (msg) => {
      await consumeRabbitMessage(msg, fastify, channel);
    });
  });

  done();
}

export default fastifyPlugin(rabbitPlugin);

Notice that we pass fastify and channel instances to the message handler function (consumeRabbitMessage). It is because the handler function is outside the plugin declaration scope and we want to allow the function to use those instances.


Registering the plugin

Finally, we can register each plugin to our services, one is for our publisher service, and another is for our consumer service.

import fastify from 'fastify';
import rabbitPlugin from 'your/plugin/location'; // publisher or consumer

const server = fastify({ logger: true });
server.register(rabbitPlugin);

// some codes


Comments

Popular posts from this blog

Rangkaian Sensor Infrared dengan Photo Dioda

Keunggulan photodioda dibandingkan LDR adalah photodioda lebih tidak rentan terhadap noise karena hanya menerima sinar infrared, sedangkan LDR menerima seluruh cahaya yang ada termasuk infrared. Rangkaian yang akan kita gunakan adalah seperti gambar di bawah ini. Pada saat intensitas Infrared yang diterima Photodiode besar maka tahanan Photodiode menjadi kecil, sedangkan jika intensitas Infrared yang diterima Photodiode kecil maka tahanan yang dimiliki photodiode besar. Jika  tahanan photodiode kecil  maka tegangan  V- akan kecil . Misal tahanan photodiode mengecil menjadi 10kOhm. Maka dengan teorema pembagi tegangan: V- = Rrx/(Rrx + R2) x Vcc V- = 10 / (10+10) x Vcc V- = (1/2) x 5 Volt V- = 2.5 Volt Sedangkan jika  tahanan photodiode besar  maka tegangan  V- akan besar  (mendekati nilai Vcc). Misal tahanan photodiode menjadi 150kOhm. Maka dengan teorema pembagi tegangan: V- = Rrx/(Rrx + R2) x Vcc V- = 150 / (150+10) x Vcc V- = (150/160) x 5

Rangkaian Sensor Cahaya dengan LDR

LDR(Light Depending Resistor) adalah resistor yang nilai hambatannya bergantung dari intensitas cahaya yang ia terima. Jika intensitas cahaya rendah (gelap) maka nilai resistansinya akan menjadi sangat besar (mencapai 1MOhm atau lebih), sedangkan jika intensitas cahaya tinggi (terang) nilai resistansinya menjadi kecil (mencapai 10kOhm atau kurang). Sifat ini dapat kita pergunakan dalam rangkaian sensor cahaya. Misalkan jika kita menginginkan sensor cahaya yang akan menyalakan lampu indikasi ketika ada cahaya dan mematikan lampu indikasi ketika tidak ada cahaya. Kita dapat menggunakan rangkaian seperti gambar di bawah ini. Transistor NPN berfungsi sebagai gate. Arus dari kolektor akan mengalir menuju emitor jika arus dari base besar namun jika arus pada base kecil maka arus dari kolektor tidak akan menuju emitor. Pada rangkaian sensor cahaya dengan LDR, ketika intensitas cahaya tinggi (terang) maka arus dari VCC akan melewati LDR kemudian melewati RESISTOR dan masuk ke

Installing APCu in PHP 7

APCu is one of caching application for PHP. In this case, I use PHP 7.0 on Ubuntu 16.04. In PHP 7.0, this application is provided via PEAR. First, install PEAR. $ sudo apt-get install php-pear Install APCu. If an error occured state that there's no phpize, you need to install PHP 7.0-dev which provide phpize support. $ sudo apt-get install php7.0-dev $ sudo pecl install apcu Create APCu module configuration in PHP modules directory. $ sudo echo "extension = apcu.so" >> /etc/php/7.0/mods-available/apcu.ini Add that configuration to PHP FPM and CLI. $ sudo ln -s /etc/php/7.0/mods-available/apcu.ini /etc/php/7.0/fpm/conf.d/30-apcu.ini $ sudo ln -s /etc/php/7.0/mods-available/apcu.ini /etc/php/7.0/cli/conf.d/30-apcu.ini Restart PHP FPM.

Configuring Swap Memory on Ubuntu Using Ansible

If we maintain a Linux machine with a low memory capacity while we are required to run an application with high memory consumption, enabling swap memory is an option. Ansible can be utilized as a helper tool to automate the creation of swap memory. A swap file can be allocated in the available storage of the machine. The swap file then can be assigned as a swap memory. Firstly, we should prepare the inventory file. The following snippet is an example, you must provide your own configuration. [server] 192.168.1.2 [server:vars] ansible_user=root ansible_ssh_private_key_file=~/.ssh/id_rsa Secondly, we need to prepare the task file that contains not only the tasks but also some variables and connection information. For instance, we set /swapfile  as the name of our swap file. We also set the swap memory size to 2GB and the swappiness level to 60. - hosts: server become: true vars: swap_vars: size: 2G swappiness: 60 For simplicity, we only check the exi

Setting Up Next.js Project With ESLint, Typescript, and AirBnB Configuration

If we initiate a Next.js project using the  create-next-app tool, our project will be included with ESLint configuration that we can apply using yarn run lint . By default, the tool installs eslint-config-next and extends next/core-web-vitals in the ESLint configuration. The Next.js configuration has been integrated with linting rules for React and several other libraries and tools. yarn create next-app --typescript For additional configuration such as AirBnB, it is also possible. First, we need to install the peer dependencies of eslint-config-airbnb . We also add support for Typescript using eslint-config-airbnb-typescript . yarn add --dev eslint-config-airbnb eslint-plugin-import eslint-plugin-jsx-a11y eslint-plugin-react eslint-plugin-react-hooks yarn add --dev eslint-config-airbnb-typescript @typescript-eslint/eslint-plugin @typescript-eslint/parser After that, we can update the .eslintrc.json file for the new configuration. { "extends": [ "airb

Managing MongoDB Records Using NestJS and Mongoose

NestJS is a framework for developing Node.js-based applications. It provides an additional abstraction layer on top of Express or other HTTP handlers and gives developers a stable foundation to build applications with structured procedures. Meanwhile, Mongoose is a schema modeling helper based on Node.js for MongoDB. There are several main steps to be performed for allowing our program to handle MongoDB records. First, we need to add the dependencies which are @nestjs/mongoose , mongoose , and @types/mongoose . Then, we need to define the connection configuration on the application module decorator. import { MongooseModule } from '@nestjs/mongoose'; @Module({ imports: [ MongooseModule.forRoot('mongodb://localhost:27017/mydb'), ], controllers: [AppController], providers: [AppService], }) Next, we create the schema definition using helpers provided by NestJS and Mongoose. The following snippet is an example with a declaration of index setting and an o