Today, we'll delve into implementing a scalable backend service based on the Fastify framework and BullMQ for queues.
Usage of Queues
Queues are essential for managing tasks efficiently in distributed systems, acting as buffers to handle asynchronous job processing. They help prevent overload by allowing tasks to be processed at a controlled rate, ensuring system stability and scalability. BullMQ is a robust, Redis-based queue library for Node.js, offering powerful features for job scheduling, prioritization, and reliable task execution in high-performance applications.
-
Procedure: Actions that adds a job to the queue, like the process of putting a task onto the stack of things to do.
-
Job: Actual task or piece of code that enters the queue, waiting to be processed.
-
Queue: Stack of tasks waiting to be done, it holds all the jobs that need to be processed.
-
Worker: Takes the tasks off the stack (queue) and do them, handles the jobs and carries out the tasks assigned to the worker.
There are different ways of running workers:
-
Same process: Running the workers and Fastify server both on the same process will allow sharing the same enviroment, resources, and manage them together.
-
Separate processes: Running the workers and Fastify server on different processes will allow to operate independently, having separate enviroments, and do not share resources by default.
Running workers on separate processes from the server means they don't have to share the same CPU and memory, preventing one task from hogging resources and slowing down others. This setup also ensures that if a worker encounters an overload or crashes, it won't bring down the entire server, maintaining system stability.
Additionally, it allows for fine-tuning resource allocation for each task, making the system more efficient overall.
In the current tutorial, we'll be demonstrating how to run the workers together with Node.js, integrating them directly into the application for simplicity and ease of management.
Requirements
-
Node.js: Ensure that Node.js is installed on your system. You can download and install Node.js from the official website: https://nodejs.org/. Node.js is required to run JavaScript applications, including our backend server and worker processes.
-
Docker: Make sure Docker is installed on your machine. Docker provides a platform for developing, shipping, and running applications in containers. You can download and install Docker Desktop from the official website: https://www.docker.com/products/docker-desktop. We'll use Docker to set up and manage the Redis server required for our BullMQ queues during the tutorial.
Folders structure
src/
jobs/
upload-image/
upload-image.interfaces.ts
upload-image.job.ts
upload-image.worker.ts
job.interfaces.ts
job.constants.ts
queues/
upload-image/
upload-image.queue.ts
queue.config.ts
queue.constants.ts
queue.utils.ts
config.ts
logger.ts
server.ts
index.ts
uploads/
.env
.gitignore
docker-compose.yaml
package.json
tsconfig.json
-
src/
is the directory that holds the source code files, including the jobs and queues related to uploading images tasks -
uploads/
is the directory for storing uploaded files -
.env
is a configuration file for managing enviroment variables -
.gitignore
is used for specifying files and directories to be ignored by Git -
docker-compose.yaml
is a configuration file for defining Docker containers and their configurations -
package.json
is where project dependencies and scripts are listed -
tsconfig.json
is a configuration file that manages Typescript compiler options, ensuring proper compilation
Settings Up the Foundation
Begin by establishing the project while creating a root directory for your project, and within it, intialize the package.json
file:
> mkdir image-uploader && cd image-uploader
> npm init -y
Then, install Typescript and other dependencies:
> npm install @fastify @fastify/cors @fastify/static @fastify/multipart bullmq @bull-board/api @bull-board/fastify jimp pino pino-pretty
> npm install --dev typescript ts-node dotenv @types/node
Now, add the script for running the build command and the project, and update the main file in the package.json
file:
{
"name": "image-uploader",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"scripts": {
"dev": "ts-node src/index.ts",
"start": "tsc && node build/index.js"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"@bull-board/api": "^5.15.3",
"@bull-board/fastify": "^5.15.3",
"@fastify/cors": "^9.0.1",
"@fastify/multipart": "^8.2.0",
"@fastify/static": "^7.0.1",
"bullmq": "^5.4.5",
"fastify": "^4.26.2",
"jimp": "^0.22.12",
"pino": "^8.19.0",
"pino-pretty": "^11.0.0",
"uuid": "^9.0.1"
},
"devDependencies": {
"@types/node": "^20.11.30",
"@types/uuid": "^9.0.8",
"dotenv": "^16.4.5",
"ts-node": "^10.9.2",
"typescript": "^5.4.3"
}
}
Generate the tsconfig.json
file:
> npx tsc --init
And update the tsconfig.json
to the following:
{
"compilerOptions": {
"sourceMap": true,
"strict": true,
"esModuleInterop": true,
"lib": ["esnext"],
"outDir": "./build",
"rootDir": "./src"
}
}
Now, we're gonna use docker-compose to start up a Redis server. We need Redis for the BullMQ library we'll use to handle queues and make things scale better.
Create a docker-compose.yaml
file and update the docker-compose.yaml
file to the following:
version: "3.8"
services:
redis:
image: redis
ports:
- "${REDIS_PORT}:6379"
networks:
- uploader
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 30s
timeout: 10s
retries: 5
networks:
uploader:
driver: bridge
And execute the docker-compose
to start up the Redis server:
> docker-compose up -d
Note: The -d
flag in the docker-compose
tells the Docker to run the containers in the background, detached from the terminal session.
Awesome! With the foundational setup now complete, let's jump into the fun part - building the actual backend service. Get ready for some hands-on work in the next step!
Implementing the HTTP server
Typescript files will be places in src
directory, and compiled files will be placed in build
directory.
Let's begin by creating a logger instance for logging purposes. We'll utilize the pino
library for this task.
import pino from "pino";
export const logger = pino({
transport: {
target: "pino-pretty",
options: {
colorize: true,
},
},
});
Next, we'll create the .env
file to securely store our environment variables:
PORT=8080
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
CONCURRENT_WORKERS=4
Furthermore, let's generate the config.ts
file, which will facilitate the export of environment variables:
import dotenv from "dotenv";
dotenv.config();
export const config = {
port: Number(process.env.PORT || 8080),
redis: {
host: process.env.REDIS_HOST || "127.0.0.1",
port: Number(process.env.REDIS_PORT || 6379),
},
concurrency: Number(process.env.COCURRENT_WORKERS || 1),
};
We are using dotenv
library to load environment variables from the .env
file into the Node.js process environment, enabling easy access and configuration throughout the application.
In src
directory let's create server.ts
file that will be used to setup the server based on the Fastify framework:
import fastify, {
FastifyInstance,
FastifyReply,
FastifyRequest,
} from "fastify";
import fastifyCors from "@fastify/cors";
import { logger } from "./logger";
export const setupServer = async (): Promise<FastifyInstance> => {
const server: FastifyInstance = fastify();
await server.register(fastifyCors, {
origin: "*",
exposedHeaders: ["*"],
methods: ["*"],
});
server.get(
"/healthcheck",
async (request: FastifyRequest, reply: FastifyReply) => {
return reply.code(200).send({ message: "OK" });
}
);
// Error handler
server.setErrorHandler(
(error: Error, request: FastifyRequest, reply: FastifyReply) => {
logger.error(`An error occurred: ${error.message}`);
reply.status(500).send({ error: "Internal Server Error" });
}
);
return server;
};
We're setting up a Fastify server by creating an instance, registering CORS middleware to enable cross-origin resource sharing, defining a route for a health check endpoint, and implementing an error handler to manage server errors.
Next, let's create index.ts
file, this file is the entry point of the project:
import { FastifyInstance } from "fastify";
import { config } from "./config";
import { logger } from "./logger";
import { setupServer } from "./server";
const start = async (): Promise<void> => {
try {
const app: FastifyInstance = await setupServer();
await app.listen({
host: "0.0.0.0",
port: config.port,
});
logger.info(`🚀 Listening on localhost:${config.port}`);
} catch (err) {
logger.error(err);
process.exit(1);
}
};
start();
We set up a Fastify server instance by utilizing configuration settings defined in config.ts
, starts the server listening on the specified host and port, and logs a message indicating the server is running.
Continuing, we define a health check route (/healthcheck
) to verify server functionality.
If an error occurs during the setup or while starting the server, it logs the error and exits the process with a status code of 1.
Implementing the Queues
Let's begin by creating a queue.config.ts
configuration file specifically tailored for the queues:
import { ConnectionOptions } from "bullmq";
import { config } from "../config";
export const connection: ConnectionOptions = {
host: config.redis.host,
port: config.redis.port,
};
export const concurrency: number = config.concurrency;
-
connection: Represents the configuration options needed to establish a connection to the Redis server where the queues will operate.
-
concurrency: Specifies the maximum number of concurrent jobs that the worker will process from the queue.
Let's now declare the queue.constants.ts
constants file containing enums and constants used for the queues:
export const enum Queues {
IMAGE_UPLOADER = "IMAGE_UPLOADER",
}
We define an enum Queues
that contains the queue names used in our application. Currently, it includes only one constant IMAGE_UPLOADER
, which represents the name of a specific queue.
We're now at the final step, where we'll create a Queue instance dedicated to handling image uploads. Subsequently, we'll proceed to craft the interfaces file, which will encompass the necessary structures for the upload image job.
import { Queue } from "bullmq";
import { Queues } from "../queue.constants";
import { connection } from "../queue.config";
import { UploadImageJobDataType } from "../../jobs/upload-image/upload-image.interfaces";
export const uploadImageQueue = new Queue<UploadImageJobDataType>(
Queues.IMAGE_UPLOADER,
{
connection,
}
);
We create a queue instance named uploadImageQueue
using Queue
from BullMQ. The queue is configured with the specified queue name from Queues.IMAGE_UPLOADER
and the connection settings from connection.
Additionally, the generic type UploadImageJobDataType
is provided to specify the data structure expected for jobs in this queue.
Implementing the Jobs and Workers
Let's define the job enums inside the that will be utilized within our queues:
export const enum Jobs {
UPLOAD_IMAGE = "UPLOAD_IMAGE",
}
We introduce an enum Jobs to represent the various types of jobs. Currently, it includes only one the UPLOAD_IMAGE
job type, which signifies the task of uploading an image.
Additionally, we need to define the interface utilized for handling jobs:
import { Job } from "bullmq";
export interface JobHandlers<T, R = void> {
handle: (job: Job<T, R>) => Promise<R>;
completed: (job: Job) => Promise<void>;
failed: (job?: Job, error?: Error) => Promise<void>;
}
Next, we'll create a file dedicated to declaring interfaces tailored specifically for the image uploader job:
export interface UploadImageJobDataType {
filename: string;
file: Buffer;
}
export interface UploadImageJobReturnType {
imageFileName: string;
}
-
UploadImageJobDataType
specifies the structure of data expected for an upload image job, including the filename and the file data itself. -
UploadImageJobReturnType
outlines the structure of the return data expected from processing an upload image job, containing the file name of the uploaded image.
Now, we'll create the job instance that will be responsible for handling the image upload process:
import { Job } from "bullmq";
import path from "path";
import jimp from "jimp";
import { JobHandlers } from "../job.interfaces";
import { logger } from "../../logger";
import { config } from "../../config";
import {
UploadImageJobDataType,
UploadImageJobReturnType,
} from "./upload-image.interfaces";
export class UploadImageJob
implements JobHandlers<UploadImageJobDataType, UploadImageJobReturnType>
{
async handle(job: Job<UploadImageJobDataType, UploadImageJobReturnType>) {
const { file, filename } = job.data;
const timestamp = new Date().toISOString();
const fileName = `${timestamp}-${filename}`;
const filePath = path.join(__dirname, "../../../uploads/", fileName);
const buffer = Buffer.from(file);
const image = await jimp.read(buffer);
await image.quality(20).writeAsync(filePath);
logger.info(
`File "${filename}" uploaded as "${fileName}" to uploads directory.`
);
return { imageFileName: fileName };
}
async completed(job: Job) {
logger.info(`[Upload Image] Job (id: ${job.id}) has completed.`);
}
async failed(job?: Job, error?: Error) {
logger.error(
`[Upload Image] Job (id: ${job?.id}) has failed - ${error?.message}.`
);
}
}
We define a class UploadImageJob
that implements the JobHandlers
interface, encapsulating functionalities for handling image upload jobs. Within this class, we implement the handle
method, responsible for processing the image upload job. This method includes tasks such as resizing the image, saving it to the uploads directory, and returning the corresponding file name of the uploaded image. Additionally, we implement the completed
method to log a message indicating successful completion of the job.
In case of failure, the failed
method is implemented to log an error message, including the job ID and error details, providing valuable insights for debugging and troubleshooting purposes. This structured approach ensures robust handling of image upload jobs within our application, enhancing reliability and maintainability.
Next, we'll create the worker that will be used to process the image upload jobs:
import { Worker } from "bullmq";
import { UploadImageJob } from "./upload-image.job";
import { Queues } from "../../queues/queue.constants";
import { concurrency, connection } from "../../queues/queue.config";
const uploadImageJob = new UploadImageJob();
export const uploadImageWorker = new Worker(
Queues.IMAGE_UPLOADER,
uploadImageJob.handle,
{
connection,
concurrency,
}
);
uploadImageWorker.on("completed", uploadImageJob.completed);
uploadImageWorker.on("failed", uploadImageJob.failed);
We initialize a worker responsible for processing image upload jobs using BullMQ, defining it to handle jobs from the designated IMAGE_UPLOADER
queue, with configurations including connection settings and concurrency levels. Additionally, event listeners are attached to the worker to manage job completion and failure events, invoking corresponding methods from the UploadImageJob
class.
Finally, we need to develop a utility to initialize the workers responsible for processing incoming jobs from the queues:
import { Worker } from "bullmq";
import { uploadImageWorker } from "../jobs/upload-image/upload-image.worker";
import { logger } from "../logger";
const WorkerMap = new Map([["UploadImage", uploadImageWorker]]);
export const initJobs = () => {
WorkerMap.forEach((worker: Worker) => {
worker.on("error", (err) => {
logger.error(err);
});
});
};
A WorkerMap
is created to store workers and their associated job types.
The initJobs
function initializes the workers and attaches error event listeners to handle any errors that occur during job processing.
Let's now incorporate the invocation of initJobs within the start function to ensure that our workers are initialized before the server starts:
import { FastifyInstance } from "fastify";
import { config } from "./config";
import { logger } from "./logger";
import { setupServer } from "./server";
import { initJobs } from "./queues/queue.utils";
const start = async (): Promise<void> => {
try {
const app: FastifyInstance = await setupServer();
initJobs();
await app.listen({
host: "0.0.0.0",
port: config.port,
});
logger.info(`🚀 Listening on localhost:${config.port}`);
} catch (err) {
logger.error(err);
process.exit(1);
}
};
// Execute the start function to begin the server initialization process
start();
Implementing the image routes
Let's proceed with implementing the routes responsible for uploading an image and retrieving the status of the image upload process, alongside integrating a BullMQ dashboard for tracking the jobs:
import fastify, {
FastifyInstance,
FastifyReply,
FastifyRequest,
} from "fastify";
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
import { FastifyAdapter } from "@bull-board/fastify";
import fastifyCors from "@fastify/cors";
import fastifyMultipart, { MultipartFile } from "@fastify/multipart";
import fastifyStatic from "@fastify/static";
import path from "path";
import fs from "fs";
import { uploadImageQueue } from "./queues/upload-image/upload-image.queue";
import { logger } from "./logger";
import { Jobs } from "./jobs/job.constants";
const BULL_DASHBOARD_ROUTE = "/dashboard";
export const setupServer = async (): Promise<FastifyInstance> => {
const server: FastifyInstance = fastify();
const serverAdapter = new FastifyAdapter();
createBullBoard({
queues: [new BullMQAdapter(uploadImageQueue)],
serverAdapter,
});
serverAdapter.setBasePath(BULL_DASHBOARD_ROUTE);
await server.register(serverAdapter.registerPlugin(), {
prefix: BULL_DASHBOARD_ROUTE,
basePath: BULL_DASHBOARD_ROUTE,
});
await server.register(fastifyCors, {
origin: "*",
exposedHeaders: ["*"],
methods: ["*"],
});
await server.register(fastifyMultipart, {
attachFieldsToBody: true,
limits: {
fileSize: 1048576, // 1MB
files: 1,
},
});
const uploadsDirectory = path.join(__dirname, "..", "uploads");
// Check if uploads directory exists, create it if not
if (!fs.existsSync(uploadsDirectory)) {
fs.mkdirSync(uploadsDirectory);
}
await server.register(fastifyStatic, {
root: uploadsDirectory,
prefix: "/uploads/",
});
server.get(
"/healthcheck",
async (request: FastifyRequest, reply: FastifyReply) => {
return reply.code(200).send({ message: "OK" });
}
);
server.register(
async (instance: FastifyInstance) => {
/**
* Add a job to upload an image to queue
*/
instance.post(
"/upload-image",
async (
request: FastifyRequest<{ Body: { file: MultipartFile } }>,
reply: FastifyReply
) => {
const file = request?.body?.file;
if (!file) {
return reply.code(404).send({ error: "File not found." });
}
const mimeType = file.mimetype;
if (!mimeType.startsWith("image/")) {
return reply
.code(400)
.send({ error: "Only image files are allowed." });
}
const bufferFile: Buffer = await file.toBuffer();
const uploadImageJob = await uploadImageQueue.add(Jobs.UPLOAD_IMAGE, {
filename: file.filename,
file: bufferFile,
});
const url = `${request.protocol}://${request.hostname}/api/v1/image/${uploadImageJob.id}`;
return reply
.code(201)
.send({ id: uploadImageJob.id, pollingUrl: url });
}
);
/**
* Get the image data by job id
*/
instance.get(
"/image/:jobId",
async (
request: FastifyRequest<{ Params: { jobId: string } }>,
reply: FastifyReply
) => {
const jobId = request.params?.jobId;
// Finds the job by the uuid that was passed for the name
const job = await uploadImageQueue.getJob(jobId);
if (!job) {
return reply
.code(404)
.send({ error: "Uploading image job not found" });
}
const imageName = job.returnvalue?.imageFileName;
return reply.code(200).send({
status: await job.getState(),
imageUrl: imageName
? `${request.protocol}://${request.hostname}/uploads/${imageName}`
: null,
});
}
);
},
{ prefix: "/api/v1" }
);
// Error handler
server.setErrorHandler(
(error: Error, request: FastifyRequest, reply: FastifyReply) => {
logger.error(`An error occurred: ${error.message}`);
return reply
.code(500)
.send({ error: error.message ?? "Internal Server Error" });
}
);
return server;
};
We start by importing various modules necessary for setting up our Fastify server, including utilities for handling multipart form data, serving static files, and interfacing with BullMQ for job queue management. Additionally, we import configurations such as the BullMQ dashboard components and the file system module for directory manipulation. Continuing, we define a constant BULL_DASHBOARD_ROUTE
to specify the route for accessing the BullMQ dashboard within our application.
Next, we instantiate a Fastify adapter to integrate BullMQ with our Fastify server. We configure the BullMQ dashboard using createBullBoard
, specifying the uploadImageQueue
for monitoring and associating it with the Fastify adapter. The base path for the BullMQ dashboard route is then set using serverAdapter.setBasePath
, ensuring proper routing. Subsequently, we register the BullMQ dashboard with Fastify using server.register
, incorporating it into our server setup.
Moving forward, we handle multipart form data by registering the fastifyMultipart
middleware. We define the directory path for uploading files and ensure its existence by creating it if it doesn't already exist using the file system module. Then, we register the fastifyStatic
middleware to serve static files from the uploads directory.
Continuing, we register API routes under the /api/v1
prefix to facilitate image uploading and retrieval operations. The upload image route accepts POST requests, processes uploaded images, adds corresponding jobs to the uploadImageQueue
, and provides clients with the job ID and polling URL for status tracking. On the other hand, the get image data route handles GET requests with a job ID parameter, retrieves the status and file name of the uploaded image job, and responds accordingly with the job status and URL.
Conclusion
In conclusion, we have successfully implemented a server capable of handling image uploads. Leveraging the power of the BullMQ library, which is based on Redis, our server architecture is highly scalable and efficient.
The incorporation of BullMQ allows for the management of job queues, ensuring smooth processing of image uploads and other asynchronous tasks. Additionally, our server setup includes features such as a BullMQ dashboard for real-time monitoring of job queues, as well as routes for health checks, image uploads, and retrieval.
This robust server architecture provides a reliable foundation for handling image upload functionalities in web applications, while also offering scalability to accommodate increased demand and workload.
Hope you enjoyed and took away something useful from this article. The source code used in this article is published on Github.
Thanks for reading!