Aviary
Dispatchers

BullMQ

Out-of-process delivery on a BullMQ worker, reusing your app's existing @nestjs/bullmq and Redis connection. Jobs serialize, enqueue with retry and backoff, and rehydrate on the worker.

If your app already runs BullMQ, this dispatcher is the natural fit. It enqueues notifications onto a BullMQ queue for out-of-process delivery, reusing your existing @nestjs/bullmq setup and Redis connection — no new infrastructure.

The dispatcher serializes the job and adds it to the queue; a processor on the worker side pops it, rehydrates the notification and recipient, and runs the channels. See Async dispatch for the round trip.

Install

pnpm add @dudousxd/nestjs-notifications-bullmq @nestjs/bullmq bullmq
npm install @dudousxd/nestjs-notifications-bullmq @nestjs/bullmq bullmq

Wire it up

Use the bullmqDispatcher() helper and spread it into forRoot. It contributes the dispatcher, registers the queue, and registers the processor for you. Pair it with BullModule.forRoot() for the connection, plus the notifications registry and resolveNotifiable the worker needs to rebuild jobs.

app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { NotificationsModule } from '@dudousxd/nestjs-notifications-core';
import { bullmqDispatcher } from '@dudousxd/nestjs-notifications-bullmq';
import { InvoicePaid } from './invoice-paid.notification';

@Module({
  imports: [
    BullModule.forRoot({ connection: { host: 'localhost', port: 6379 } }),
    NotificationsModule.forRoot({
      ...bullmqDispatcher(), 
      notifications: [InvoicePaid],
      resolveNotifiable: (ref) => users.findById(ref.id), // reload the recipient
    }),
  ],
})
export class AppModule {}

resolveNotifiable usually needs an injected repository, so reach for forRootAsync in real apps — see Async dispatch.

What the helper does

bullmqDispatcher() returns a slice of forRoot options:

  • dispatcherBullmqNotificationDispatcher
  • importsBullModule.registerQueue({ name: NOTIFICATIONS_QUEUE })
  • providersBullmqNotificationProcessor

The queue name is the fixed constant NOTIFICATIONS_QUEUE ('nestjs-notifications'), exported from the package. It registers the queue against the connection you configured with BullModule.forRoot(), so the dispatcher and the app share one Redis connection.

Retry, backoff & dead-letter

Jobs are added with retry built in — by default 3 attempts with exponential backoff starting at one second, so a transient failure (SMTP hiccup, network blip) is retried rather than dropped. Pass BullmqDispatcherOptions to bullmqDispatcher() to tune it:

app.module.ts
NotificationsModule.forRoot({
  ...bullmqDispatcher({
    attempts: 5,                                   // default 3
    backoff: { type: 'exponential', delay: 2000 }, // or { type: 'fixed', delay: 1000 }
    removeOnComplete: { age: 3600, count: 1000 },  // keep 1h / last 1000; default: keep all
    removeOnFail: false,                           // keep failed jobs for inspection
    onFailed: (job, reason) => {
      // dead-letter hook: fires once a job exhausts all attempts
      deadLetters.record(job?.id, reason);
    },
  }),
});
OptionDefaultDescription
attempts3Delivery attempts before a job fails terminally.
backoff{ type: 'exponential', delay: 1000 }Retry strategy — exponential, fixed, or a custom strategy registered on the worker.
removeOnComplete / removeOnFailkeepJob retention — true (drop), false (keep), a count, or { age, count? }. Passes straight to BullMQ.
onFailedDead-letter hook — invoked with (job, reason) once a job exhausts all attempts. Route it to a DLQ, alert, or persist. Errors thrown here are swallowed so they can't crash the worker.

Omit bullmqDispatcher's options entirely and you get the original hardcoded behavior.

On the worker

The BullmqNotificationProcessor consumes jobs from the queue. For each job it rehydrates the notification (by name, from the notifications registry) and the notifiable (via resolveNotifiable), then runs the same channels — on the worker.

The processor only runs where a BullMQ worker is active. In a split deployment (API process

  • worker process), the worker process must import NotificationsModule with ...bullmqDispatcher() too, so the processor and channel modules are registered there. If nothing consumes the queue, jobs pile up undelivered.

On this page