If we have two or more services that need to talk to each other but it is allowed to be asynchronous, we can implement a queue system in our system using RabbitMQ. RabbitMQ server will maintain all queues and connections to all services connected to it.
This post will utilize Fastify as a NodeJS framework to build our program. This framework is similar to Express but implements some unique features like a plugin concept and an improved request-respond handler.
First, we need to create two plugins, one is for sending a message, another one is for consuming the sent message. At first, we will make it using a normal queue. There is one mechanism for how a queue works, it is like a queue in the real world. When there are five persons in a queue and three staff to handle the queue, one person is served only by one staff, there is no need for other staff to handle any person that has been served, and there is no need for a person to be handled repeatedly by other staffs. For example, if there are two services that listen to a queue and a message is passed to the queue, RabbitMQ will manage which connected service will handle the message.
Create a plugin to send a message
The plugin will run the following processes.
- Create a channel to the RabbitMQ server
- Assert a queue
- Publish a message to the queue
For instance, the queue will be named all_queue
.
import { FastifyInstance, FastifyPluginOptions } from 'fastify';
import amqp, { Channel } from 'amqplib';
import fastifyPlugin from 'fastify-plugin';
const rabbitUrl = 'amqp://guest:guest@localhost:5672'; // change to your own server URL
const queueName = 'all_queue'
async function getRabbitChannel(): Promise<Channel> {
try {
const conn = await amqp.connect(rabbitUrl);
return conn.createChannel();
} catch (error: any) {
console.error(error.message || error)
throw error;
}
}
async function rabbitPlugin(fastify: FastifyInstance, opts: FastifyPluginOptions, done: any) {
const channel = await getRabbitChannel();
// insert the queue only if it doesn't exist
await channel.assertQueue(queueName, {});
// create a function to publish a message
function publishMessage(messageObject){
const msg = JSON.stringify(messageObject);
channel.publish('', queueName, Buffer.from(msg));
}
// make the function available in fastify scope
fastify.decorate('publishMessage', publishMessage);
done();
}
export default fastifyPlugin(rabbitPlugin);
In the code above, we declare a function to publish a message named publishMessage
and decorate the fastify instance so that the function can be called in any part of our application. We also utilize fastify-plugin
so that our plugin can be available in any plugin regardless of its location.
Create a plugin to consume the sent messages
The plugin will run the following processes.
- Create a channel to the RabbitMQ server
- Assert a queue
- Listen and consume any messages on the queue
import amqp, { Channel, ConsumeMessage } from 'amqplib';
import { FastifyInstance, FastifyPluginOptions } from 'fastify';
import fastifyPlugin from 'fastify-plugin';
const rabbitUrl = 'amqp://guest:guest@localhost:5672'; // change to your own server URL
const queueName = 'all_queue'
async function getRabbitChannel(): Promise<Channel> {
try {
const conn = await amqp.connect(rabbitUrl);
return conn.createChannel();
} catch (error: any) {
console.error(error.message || error)
throw error;
}
}
async function consumeRabbitMessage(msg: ConsumeMessage | null, fastify: FastifyInstance, channel: Channel) {
if (!msg) {
return;
}
try {
const data = JSON.parse(msg.content.toString());
// do something else
} catch (error) {
fastify.log.error(error);
}
}
async function rabbitPlugin(fastify: FastifyInstance, opts: FastifyPluginOptions, done: any) {
const channel = await getRabbitChannel();
await channel.assertQueue(queueName, {}).then(() => {
// listen to the queue
return channel.consume(queueName, async (msg) => {
await consumeRabbitMessage(msg, fastify, channel);
});
});
done();
}
export default fastifyPlugin(rabbitPlugin);
Notice that we pass fastify and channel instances to the message handler function (consumeRabbitMessage
). It is because the handler function is outside the plugin declaration scope and we want to allow the function to use those instances.
Registering the plugin
Finally, we can register each plugin to our services, one is for our publisher service, and another is for our consumer service.
import fastify from 'fastify';
import rabbitPlugin from 'your/plugin/location'; // publisher or consumer
const server = fastify({ logger: true });
server.register(rabbitPlugin);
// some codes
Comments
Post a Comment