Katriel tsepelevish - Software Engineer at Vimeo
How to build a Scalable image uploader with TypeScript? - Part 1

How to build a Scalable image uploader with TypeScript? - Part 1

April 1, 2024·16 min read

Today, we'll delve into implementing a scalable backend service based on the Fastify framework and BullMQ for queues.

Usage of Queues

Queues are essential for managing tasks efficiently in distributed systems, acting as buffers to handle asynchronous job processing. They help prevent overload by allowing tasks to be processed at a controlled rate, ensuring system stability and scalability. BullMQ is a robust, Redis-based queue library for Node.js, offering powerful features for job scheduling, prioritization, and reliable task execution in high-performance applications.

  • Procedure: Actions that adds a job to the queue, like the process of putting a task onto the stack of things to do.

  • Job: Actual task or piece of code that enters the queue, waiting to be processed.

  • Queue: Stack of tasks waiting to be done, it holds all the jobs that need to be processed.

  • Worker: Takes the tasks off the stack (queue) and do them, handles the jobs and carries out the tasks assigned to the worker.

There are different ways of running workers:

  • Same process: Running the workers and Fastify server both on the same process will allow sharing the same enviroment, resources, and manage them together.

  • Separate processes: Running the workers and Fastify server on different processes will allow to operate independently, having separate enviroments, and do not share resources by default.

Running workers on separate processes from the server means they don't have to share the same CPU and memory, preventing one task from hogging resources and slowing down others. This setup also ensures that if a worker encounters an overload or crashes, it won't bring down the entire server, maintaining system stability.

Additionally, it allows for fine-tuning resource allocation for each task, making the system more efficient overall.

In the current tutorial, we'll be demonstrating how to run the workers together with Node.js, integrating them directly into the application for simplicity and ease of management.

Requirements

  • Node.js: Ensure that Node.js is installed on your system. You can download and install Node.js from the official website: https://nodejs.org/. Node.js is required to run JavaScript applications, including our backend server and worker processes.

  • Docker: Make sure Docker is installed on your machine. Docker provides a platform for developing, shipping, and running applications in containers. You can download and install Docker Desktop from the official website: https://www.docker.com/products/docker-desktop. We'll use Docker to set up and manage the Redis server required for our BullMQ queues during the tutorial.

Folders structure

src/
    jobs/
        upload-image/
            upload-image.interfaces.ts
            upload-image.job.ts
            upload-image.worker.ts
        job.interfaces.ts
        job.constants.ts
    queues/
        upload-image/
            upload-image.queue.ts
        queue.config.ts
        queue.constants.ts
        queue.utils.ts
    config.ts
    logger.ts
    server.ts
    index.ts
uploads/
.env
.gitignore
docker-compose.yaml
package.json
tsconfig.json
  • src/ is the directory that holds the source code files, including the jobs and queues related to uploading images tasks

  • uploads/ is the directory for storing uploaded files

  • .env is a configuration file for managing enviroment variables

  • .gitignore is used for specifying files and directories to be ignored by Git

  • docker-compose.yaml is a configuration file for defining Docker containers and their configurations

  • package.json is where project dependencies and scripts are listed

  • tsconfig.json is a configuration file that manages Typescript compiler options, ensuring proper compilation

Settings Up the Foundation

Begin by establishing the project while creating a root directory for your project, and within it, intialize the package.json file:

> mkdir image-uploader && cd image-uploader
> npm init -y

Then, install Typescript and other dependencies:

> npm install @fastify @fastify/cors @fastify/static @fastify/multipart bullmq @bull-board/api @bull-board/fastify jimp pino pino-pretty
> npm install --dev typescript ts-node dotenv @types/node

Now, add the script for running the build command and the project, and update the main file in the package.json file:

package.json
{
  "name": "image-uploader",
  "version": "1.0.0",
  "description": "",
  "main": "src/index.ts",
  "scripts": {
    "dev": "ts-node src/index.ts",
    "start": "tsc && node build/index.js"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "dependencies": {
    "@bull-board/api": "^5.15.3",
    "@bull-board/fastify": "^5.15.3",
    "@fastify/cors": "^9.0.1",
    "@fastify/multipart": "^8.2.0",
    "@fastify/static": "^7.0.1",
    "bullmq": "^5.4.5",
    "fastify": "^4.26.2",
    "jimp": "^0.22.12",
    "pino": "^8.19.0",
    "pino-pretty": "^11.0.0",
    "uuid": "^9.0.1"
  },
  "devDependencies": {
    "@types/node": "^20.11.30",
    "@types/uuid": "^9.0.8",
    "dotenv": "^16.4.5",
    "ts-node": "^10.9.2",
    "typescript": "^5.4.3"
  }
}

Generate the tsconfig.json file:

> npx tsc --init

And update the tsconfig.json to the following:

tsconfig.json
{
  "compilerOptions": {
    "sourceMap": true,
    "strict": true,
    "esModuleInterop": true,
    "lib": ["esnext"],
    "outDir": "./build",
    "rootDir": "./src"
  }
}

Now, we're gonna use docker-compose to start up a Redis server. We need Redis for the BullMQ library we'll use to handle queues and make things scale better.

Create a docker-compose.yaml file and update the docker-compose.yaml file to the following:

docker-compose.yaml
version: "3.8"

services:
  redis:
    image: redis
    ports:
      - "${REDIS_PORT}:6379"
    networks:
      - uploader
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 30s
      timeout: 10s
      retries: 5

networks:
  uploader:
    driver: bridge

And execute the docker-compose to start up the Redis server:

> docker-compose up -d

Note: The -d flag in the docker-compose tells the Docker to run the containers in the background, detached from the terminal session.

Awesome! With the foundational setup now complete, let's jump into the fun part - building the actual backend service. Get ready for some hands-on work in the next step!

Implementing the HTTP server

Typescript files will be places in src directory, and compiled files will be placed in build directory.

Let's begin by creating a logger instance for logging purposes. We'll utilize the pino library for this task.

src/logger.ts
import pino from "pino";

export const logger = pino({
  transport: {
    target: "pino-pretty",
    options: {
      colorize: true,
    },
  },
});

Next, we'll create the .env file to securely store our environment variables:

.env
PORT=8080
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
CONCURRENT_WORKERS=4

Furthermore, let's generate the config.ts file, which will facilitate the export of environment variables:

src/config.ts
import dotenv from "dotenv";

dotenv.config();

export const config = {
  port: Number(process.env.PORT || 8080),
  redis: {
    host: process.env.REDIS_HOST || "127.0.0.1",
    port: Number(process.env.REDIS_PORT || 6379),
  },
  concurrency: Number(process.env.COCURRENT_WORKERS || 1),
};

We are using dotenv library to load environment variables from the .env file into the Node.js process environment, enabling easy access and configuration throughout the application.

In src directory let's create server.ts file that will be used to setup the server based on the Fastify framework:

src/server.ts
import fastify, {
  FastifyInstance,
  FastifyReply,
  FastifyRequest,
} from "fastify";
import fastifyCors from "@fastify/cors";

import { logger } from "./logger";

export const setupServer = async (): Promise<FastifyInstance> => {
  const server: FastifyInstance = fastify();

  await server.register(fastifyCors, {
    origin: "*",
    exposedHeaders: ["*"],
    methods: ["*"],
  });

  server.get(
    "/healthcheck",
    async (request: FastifyRequest, reply: FastifyReply) => {
      return reply.code(200).send({ message: "OK" });
    }
  );

  // Error handler
  server.setErrorHandler(
    (error: Error, request: FastifyRequest, reply: FastifyReply) => {
      logger.error(`An error occurred: ${error.message}`);
      reply.status(500).send({ error: "Internal Server Error" });
    }
  );

  return server;
};

We're setting up a Fastify server by creating an instance, registering CORS middleware to enable cross-origin resource sharing, defining a route for a health check endpoint, and implementing an error handler to manage server errors.

Next, let's create index.ts file, this file is the entry point of the project:

src/index.ts
import { FastifyInstance } from "fastify";

import { config } from "./config";
import { logger } from "./logger";
import { setupServer } from "./server";

const start = async (): Promise<void> => {
  try {
    const app: FastifyInstance = await setupServer();

    await app.listen({
      host: "0.0.0.0",
      port: config.port,
    });

    logger.info(`🚀 Listening on localhost:${config.port}`);
  } catch (err) {
    logger.error(err);
    process.exit(1);
  }
};

start();

We set up a Fastify server instance by utilizing configuration settings defined in config.ts, starts the server listening on the specified host and port, and logs a message indicating the server is running.

Continuing, we define a health check route (/healthcheck) to verify server functionality.

If an error occurs during the setup or while starting the server, it logs the error and exits the process with a status code of 1.

Implementing the Queues

Let's begin by creating a queue.config.ts configuration file specifically tailored for the queues:

src/queues/queue.config.ts
import { ConnectionOptions } from "bullmq";
import { config } from "../config";

export const connection: ConnectionOptions = {
  host: config.redis.host,
  port: config.redis.port,
};

export const concurrency: number = config.concurrency;
  • connection: Represents the configuration options needed to establish a connection to the Redis server where the queues will operate.

  • concurrency: Specifies the maximum number of concurrent jobs that the worker will process from the queue.

Let's now declare the queue.constants.ts constants file containing enums and constants used for the queues:

src/queues/queue.constants.ts
export const enum Queues {
  IMAGE_UPLOADER = "IMAGE_UPLOADER",
}

We define an enum Queues that contains the queue names used in our application. Currently, it includes only one constant IMAGE_UPLOADER, which represents the name of a specific queue.

We're now at the final step, where we'll create a Queue instance dedicated to handling image uploads. Subsequently, we'll proceed to craft the interfaces file, which will encompass the necessary structures for the upload image job.

src/queues/upload-image/upload-image.queue.ts
import { Queue } from "bullmq";
import { Queues } from "../queue.constants";
import { connection } from "../queue.config";
import { UploadImageJobDataType } from "../../jobs/upload-image/upload-image.interfaces";

export const uploadImageQueue = new Queue<UploadImageJobDataType>(
  Queues.IMAGE_UPLOADER,
  {
    connection,
  }
);

We create a queue instance named uploadImageQueue using Queue from BullMQ. The queue is configured with the specified queue name from Queues.IMAGE_UPLOADER and the connection settings from connection. Additionally, the generic type UploadImageJobDataType is provided to specify the data structure expected for jobs in this queue.

Implementing the Jobs and Workers

Let's define the job enums inside the that will be utilized within our queues:

job.constants.ts
export const enum Jobs {
  UPLOAD_IMAGE = "UPLOAD_IMAGE",
}

We introduce an enum Jobs to represent the various types of jobs. Currently, it includes only one the UPLOAD_IMAGE job type, which signifies the task of uploading an image.

Additionally, we need to define the interface utilized for handling jobs:

src/jobs/job.interfaces.ts
import { Job } from "bullmq";

export interface JobHandlers<T, R = void> {
  handle: (job: Job<T, R>) => Promise<R>;
  completed: (job: Job) => Promise<void>;
  failed: (job?: Job, error?: Error) => Promise<void>;
}

Next, we'll create a file dedicated to declaring interfaces tailored specifically for the image uploader job:

src/jobs/upload-image/upload-image.interfaces.ts
export interface UploadImageJobDataType {
  filename: string;
  file: Buffer;
}

export interface UploadImageJobReturnType {
  imageFileName: string;
}
  • UploadImageJobDataType specifies the structure of data expected for an upload image job, including the filename and the file data itself.

  • UploadImageJobReturnType outlines the structure of the return data expected from processing an upload image job, containing the file name of the uploaded image.

Now, we'll create the job instance that will be responsible for handling the image upload process:

src/jobs/upload-image/upload-image.job.ts
import { Job } from "bullmq";
import path from "path";
import jimp from "jimp";

import { JobHandlers } from "../job.interfaces";
import { logger } from "../../logger";
import { config } from "../../config";
import {
  UploadImageJobDataType,
  UploadImageJobReturnType,
} from "./upload-image.interfaces";

export class UploadImageJob
  implements JobHandlers<UploadImageJobDataType, UploadImageJobReturnType>
{
  async handle(job: Job<UploadImageJobDataType, UploadImageJobReturnType>) {
    const { file, filename } = job.data;

    const timestamp = new Date().toISOString();
    const fileName = `${timestamp}-${filename}`;
    const filePath = path.join(__dirname, "../../../uploads/", fileName);

    const buffer = Buffer.from(file);

    const image = await jimp.read(buffer);
    await image.quality(20).writeAsync(filePath);

    logger.info(
      `File "${filename}" uploaded as "${fileName}" to uploads directory.`
    );

    return { imageFileName: fileName };
  }

  async completed(job: Job) {
    logger.info(`[Upload Image] Job (id: ${job.id}) has completed.`);
  }

  async failed(job?: Job, error?: Error) {
    logger.error(
      `[Upload Image] Job (id: ${job?.id}) has failed - ${error?.message}.`
    );
  }
}

We define a class UploadImageJob that implements the JobHandlers interface, encapsulating functionalities for handling image upload jobs. Within this class, we implement the handle method, responsible for processing the image upload job. This method includes tasks such as resizing the image, saving it to the uploads directory, and returning the corresponding file name of the uploaded image. Additionally, we implement the completed method to log a message indicating successful completion of the job.

In case of failure, the failed method is implemented to log an error message, including the job ID and error details, providing valuable insights for debugging and troubleshooting purposes. This structured approach ensures robust handling of image upload jobs within our application, enhancing reliability and maintainability.

Next, we'll create the worker that will be used to process the image upload jobs:

src/jobs/image-upload/image-upload.worker.ts
import { Worker } from "bullmq";

import { UploadImageJob } from "./upload-image.job";
import { Queues } from "../../queues/queue.constants";
import { concurrency, connection } from "../../queues/queue.config";

const uploadImageJob = new UploadImageJob();

export const uploadImageWorker = new Worker(
  Queues.IMAGE_UPLOADER,
  uploadImageJob.handle,
  {
    connection,
    concurrency,
  }
);

uploadImageWorker.on("completed", uploadImageJob.completed);

uploadImageWorker.on("failed", uploadImageJob.failed);

We initialize a worker responsible for processing image upload jobs using BullMQ, defining it to handle jobs from the designated IMAGE_UPLOADER queue, with configurations including connection settings and concurrency levels. Additionally, event listeners are attached to the worker to manage job completion and failure events, invoking corresponding methods from the UploadImageJob class.

Finally, we need to develop a utility to initialize the workers responsible for processing incoming jobs from the queues:

src/queues/queue.utils.ts
import { Worker } from "bullmq";

import { uploadImageWorker } from "../jobs/upload-image/upload-image.worker";
import { logger } from "../logger";

const WorkerMap = new Map([["UploadImage", uploadImageWorker]]);

export const initJobs = () => {
  WorkerMap.forEach((worker: Worker) => {
    worker.on("error", (err) => {
      logger.error(err);
    });
  });
};

A WorkerMap is created to store workers and their associated job types. The initJobs function initializes the workers and attaches error event listeners to handle any errors that occur during job processing.

Let's now incorporate the invocation of initJobs within the start function to ensure that our workers are initialized before the server starts:

src/index.ts
import { FastifyInstance } from "fastify";
import { config } from "./config";
import { logger } from "./logger";
import { setupServer } from "./server";
import { initJobs } from "./queues/queue.utils";

const start = async (): Promise<void> => {
  try {
    const app: FastifyInstance = await setupServer();

    initJobs();

    await app.listen({
      host: "0.0.0.0",
      port: config.port,
    });

    logger.info(`🚀 Listening on localhost:${config.port}`);
  } catch (err) {
    logger.error(err);
    process.exit(1);
  }
};

// Execute the start function to begin the server initialization process
start();

Implementing the image routes

Let's proceed with implementing the routes responsible for uploading an image and retrieving the status of the image upload process, alongside integrating a BullMQ dashboard for tracking the jobs:

src/server.ts
import fastify, {
  FastifyInstance,
  FastifyReply,
  FastifyRequest,
} from "fastify";
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
import { FastifyAdapter } from "@bull-board/fastify";
import fastifyCors from "@fastify/cors";
import fastifyMultipart, { MultipartFile } from "@fastify/multipart";
import fastifyStatic from "@fastify/static";
import path from "path";
import fs from "fs";

import { uploadImageQueue } from "./queues/upload-image/upload-image.queue";
import { logger } from "./logger";
import { Jobs } from "./jobs/job.constants";

const BULL_DASHBOARD_ROUTE = "/dashboard";

export const setupServer = async (): Promise<FastifyInstance> => {
  const server: FastifyInstance = fastify();

  const serverAdapter = new FastifyAdapter();

  createBullBoard({
    queues: [new BullMQAdapter(uploadImageQueue)],
    serverAdapter,
  });

  serverAdapter.setBasePath(BULL_DASHBOARD_ROUTE);

  await server.register(serverAdapter.registerPlugin(), {
    prefix: BULL_DASHBOARD_ROUTE,
    basePath: BULL_DASHBOARD_ROUTE,
  });

  await server.register(fastifyCors, {
    origin: "*",
    exposedHeaders: ["*"],
    methods: ["*"],
  });

  await server.register(fastifyMultipart, {
    attachFieldsToBody: true,
    limits: {
      fileSize: 1048576, // 1MB
      files: 1,
    },
  });

  const uploadsDirectory = path.join(__dirname, "..", "uploads");

  // Check if uploads directory exists, create it if not
  if (!fs.existsSync(uploadsDirectory)) {
    fs.mkdirSync(uploadsDirectory);
  }

  await server.register(fastifyStatic, {
    root: uploadsDirectory,
    prefix: "/uploads/",
  });

  server.get(
    "/healthcheck",
    async (request: FastifyRequest, reply: FastifyReply) => {
      return reply.code(200).send({ message: "OK" });
    }
  );

  server.register(
    async (instance: FastifyInstance) => {
      /**
       * Add a job to upload an image to queue
       */
      instance.post(
        "/upload-image",
        async (
          request: FastifyRequest<{ Body: { file: MultipartFile } }>,
          reply: FastifyReply
        ) => {
          const file = request?.body?.file;

          if (!file) {
            return reply.code(404).send({ error: "File not found." });
          }

          const mimeType = file.mimetype;

          if (!mimeType.startsWith("image/")) {
            return reply
              .code(400)
              .send({ error: "Only image files are allowed." });
          }

          const bufferFile: Buffer = await file.toBuffer();

          const uploadImageJob = await uploadImageQueue.add(Jobs.UPLOAD_IMAGE, {
            filename: file.filename,
            file: bufferFile,
          });

         const url = `${request.protocol}://${request.hostname}/api/v1/image/${uploadImageJob.id}`;

          return reply
            .code(201)
            .send({ id: uploadImageJob.id, pollingUrl: url });
        }
      );

      /**
       * Get the image data by job id
       */
      instance.get(
        "/image/:jobId",
        async (
          request: FastifyRequest<{ Params: { jobId: string } }>,
          reply: FastifyReply
        ) => {
          const jobId = request.params?.jobId;

          // Finds the job by the uuid that was passed for the name
          const job = await uploadImageQueue.getJob(jobId);

          if (!job) {
            return reply
                .code(404)
                .send({ error: "Uploading image job not found" });
          }

          const imageName = job.returnvalue?.imageFileName;

          return reply.code(200).send({
            status: await job.getState(),
            imageUrl: imageName
              ? `${request.protocol}://${request.hostname}/uploads/${imageName}`
              : null,
          });
        }
      );
    },
    { prefix: "/api/v1" }
  );

  // Error handler
  server.setErrorHandler(
    (error: Error, request: FastifyRequest, reply: FastifyReply) => {
      logger.error(`An error occurred: ${error.message}`);
      return reply
        .code(500)
        .send({ error: error.message ?? "Internal Server Error" });
    }
  );

  return server;
};

We start by importing various modules necessary for setting up our Fastify server, including utilities for handling multipart form data, serving static files, and interfacing with BullMQ for job queue management. Additionally, we import configurations such as the BullMQ dashboard components and the file system module for directory manipulation. Continuing, we define a constant BULL_DASHBOARD_ROUTE to specify the route for accessing the BullMQ dashboard within our application.

Next, we instantiate a Fastify adapter to integrate BullMQ with our Fastify server. We configure the BullMQ dashboard using createBullBoard, specifying the uploadImageQueue for monitoring and associating it with the Fastify adapter. The base path for the BullMQ dashboard route is then set using serverAdapter.setBasePath, ensuring proper routing. Subsequently, we register the BullMQ dashboard with Fastify using server.register, incorporating it into our server setup.

Moving forward, we handle multipart form data by registering the fastifyMultipart middleware. We define the directory path for uploading files and ensure its existence by creating it if it doesn't already exist using the file system module. Then, we register the fastifyStatic middleware to serve static files from the uploads directory.

Continuing, we register API routes under the /api/v1 prefix to facilitate image uploading and retrieval operations. The upload image route accepts POST requests, processes uploaded images, adds corresponding jobs to the uploadImageQueue, and provides clients with the job ID and polling URL for status tracking. On the other hand, the get image data route handles GET requests with a job ID parameter, retrieves the status and file name of the uploaded image job, and responds accordingly with the job status and URL.

Conclusion

In conclusion, we have successfully implemented a server capable of handling image uploads. Leveraging the power of the BullMQ library, which is based on Redis, our server architecture is highly scalable and efficient.

The incorporation of BullMQ allows for the management of job queues, ensuring smooth processing of image uploads and other asynchronous tasks. Additionally, our server setup includes features such as a BullMQ dashboard for real-time monitoring of job queues, as well as routes for health checks, image uploads, and retrieval.

This robust server architecture provides a reliable foundation for handling image upload functionalities in web applications, while also offering scalability to accommodate increased demand and workload.

Hope you enjoyed and took away something useful from this article. The source code used in this article is published on Github.

Thanks for reading!

See All Articles