
This second article about queueing without a queue focuses on implementation based on PostgreSQL. This solution moves the queue implementation from the node system to the database, where it has dedicated storage to save the queues’ status. The leading actor of this post will be pg-boss, an NPM library that implements the queue system thanks to pg and PostgreSQL.
pg-boss
pg-boss is a job queue built in Node.js on top of PostgreSQL in order to provide background processing and reliable asynchronous execution to Node.js applications.
It has a straightforward API and exposes many great features to handle queues in our system.
To work with pg-boss, you must enable it to create a custom database schema for managing your queues. As we will see soon, just a simple NPM installation and some knowledge will prepare you to use it without any problem.
Get started with pg-boss
To start with pg-boss, install it on your Node.js project.
I have already created a simple project for you to follow along with the post; you can find it here.
So, the first simple step, after the git clone and the npm install, is to run the commands npm install pg pg-boss
and npm install -D @types/pg
. Those commands install the dependency in your project.
With those, you are ready to use pg-boos on your project.
As you can see, this package’s setup is smooth and easy to do.
Before starting coding, we need to set up a PostgreSQL database. I’ve already prepared it for you. If you have a Docker instance installed on your laptop, type docker compose up - d
, and your database is ready to use.
Create the queue
First, you need to create a connection to the database to allow pg-boss to connect to it. pg-boss simplifies the step and enables you to pass the connection options.
So, create a file on this path, src/pgBoss.ts
, and add this content.
import PgBoss from 'pg-boss';
const pgBossInstance = new PgBoss({
host: process.env.POSTGRES_HOST!,
port: Number(process.env.POSTGRES_PORT!),
user: process.env.POSTGRES_USER!,
password: process.env.POSTGRES_PASSWORD!,
database: process.env.POSTGRES_DB!,
});
pgBossInstance.on('error', console.error)
await pgBossInstance.start()
export default pgBossInstance;
Code language: JavaScript (javascript)
This code creates a pg-boss instance and connects it to the database. As you can notice, creating a pg-boss instance is easy.
But now it’s time to create the queue.
Let’s start by adding the Queue name in the src/common.ts
file.
export const QUEUE_NAME = 'user-creation-queue';
Code language: TypeScript (typescript)
Now, it’s time to create your queue. Create a new file on this path, src/queue.ts
, and add this code.
import pgBossInstance from "pgBoss.js";
import { QUEUE_NAME, UserCreatedTask } from "./common.js";
await pgBossInstance.createQueue(QUEUE_NAME, {
name: QUEUE_NAME,
retryLimit: 2
});
export function enqueueJob(job: UserCreatedTask) {
return pgBossInstance.send(QUEUE_NAME, job);
}
Code language: TypeScript (typescript)
As you can notice, this code is also easy to understand.
First, you import the Queue name. Then, using the createQueue
method, you create a pg-boss queue with the name and the retryLimit
equal to two; this means that if the same message fails twice, the queue discards it. If you want, you can also define a dead letter queue and find more info here.
Last but not least, you are exporting a function `enqueueJob` to push tasks in the queue.
The last point is to push data inside the queue, so move to the file src/index.ts
and add these two imports after the randomUUID
import
import logger from 'logger.js';
import { enqueueJob } from 'queue.js';
Code language: TypeScript (typescript)
and then replace the TODO with the following code.
const idTask = await enqueueJob(task);
logger.info(task, `Task with id ${idTask} has been pushed`);
Code language: TypeScript (typescript)
This code permits you to push events inside of the queue.
Let’s test it out.
Open the terminal and type npm run start
; the result should be something like this
[19:07:34.036] INFO (23215): Task with id a5a01088-25b9-424c-8a1e-4ec1e678ae96 has been pushed
id: "8cd828c2-4028-45bb-bea4-6cdaf3627496"
[19:07:34.037] INFO (23215): Task with id 92071cf5-d278-4654-8fda-4076763e62d1 has been pushed
id: "13e6b091-b85e-48ba-9d2c-a376fc5ada8d"
[19:07:34.038] INFO (23215): Task with id 1567a32b-daa2-43f7-a016-2729f5511f86 has been pushed
id: "0589cb0e-9423-48dc-ab05-85ac560aa886"
[19:07:34.039] INFO (23215): Task with id 5aecbd45-a6d5-4913-816f-8bb495792569 has been pushed
id: "8b0218d1-22eb-4869-9bf8-7d9bc42b84a0"
[19:07:34.040] INFO (23215): Task with id adc92a04-ff85-49a6-9539-ea993b77a31c has been pushed
id: "1269f0d5-bbcd-49fd-89d5-7102be8237be"
Code language: Bash (bash)
Great! You’ve just completed the first part, and now you know how to push data inside a pg-boss queue. Let’s move on to the processing.
Processing messages with pg-boss
To process the messages, I’ve already prepared a file, src/worker.js
, where you will type the code.
You can open the src/worker.js
file and add the following imports
import { setTimeout } from "timers/promises";
import { QUEUE_NAME, UserCreatedTask } from "./common.js";
import pgBossInstance from "./pgBoss.js";
Code language: TypeScript (typescript)
and replace the TODO with the next code
pgBossInstance.work<UserCreatedTask>(QUEUE_NAME, async ([job]) => {
if (!messagesHandled[job.id]) {
messagesHandled[job.id] = { retries: 0, result: 'created' };
} else {
messagesHandled[job.id].retries++;
}
const resultType = Math.random() > 0.6;
const fakeImplementation = resultType ? 'success' : 'error'
const timeout = fakeImplementation === 'success' ? 2000 : 1000;
await setTimeout(timeout);
if (fakeImplementation === 'error') {
messagesHandled[job.id].result = 'error';
printMessagesHandled();
const message = `User created task got error with id: ${job.id}`
throw new Error(message);
} else {
messagesHandled[job.id].result = 'success';
printMessagesHandled();
}
});
Code language: TypeScript (typescript)
If you look at this code, you are simulating message processing. First, you handle a map with all the processed messages to print the result in the console. Then, using Math.random(), you determine whether the message will fail or succeed and wait two or one seconds based on the result type. Last, if the message must fail, you throw an error; otherwise, you print the result in the console (only for showing the result of this example).
As you can understand, if the function raises an error, pg-boss will mark the message as a failure; otherwise, it will be interpreted as completed.
Let’s run this code by running it again npm run start
.
The final result should look like this:
Unique messages handled: 5
940c9d32-3b83-4034-ba81-3b2d88035305: success (2 retries)
a19a224f-20c1-4481-b54b-81d73b81c691: success (1 retries)
701f1a33-bde2-4013-9bd3-d4a4e0bedcf7: success (2 retries)
d00b4a03-2e11-4f05-b2c3-8e60f2efe4f6: success (1 retries)
6d660781-0fdf-4414-8bc6-d791be0588dc: success (0 retries)
Code language: Bash (bash)
As you can see, the system has handled five messages in this case. Some of them had retries, and some did not.
Perfect! Now, you also learned how to process messages with pg-boss.
Conclusion
It’s time to wrap up this post!
As you can see, working with pg-boss is straightforward; the API is simple, and the code stays simple. This approach can be an excellent solution if you don’t have a specific queue system in your architecture, but you have to pay attention to the load you can have in the database to handle your queues; this is why, in the following article, we will see another solution: the solution that my team chose to strike the right balance between performance and simplicity.
Okay, that’s it from pg-boss. I hope you enjoyed the article, and I look forward to seeing you in the next article.
Bye-bye 👋
p.s. You can find the result code of the article here.