Back to examples

Node.js Examples

Complete, copy-paste examples for every supported Node.js queue backend. Each example includes initialization, job processing, in-job logging, and graceful shutdown.

1. BullMQ

The built-in shorthand — pass queues and redisUrl directly. No provider import needed.

Requires Redis
Peer dependency bullmq 5.0+
Events captured All 7 lifecycle events
src/index.ts
import { initJobviz, jobviz } from 'jobviz-agent';
import { Queue, Worker } from 'bullmq';

// 1. Initialize the Jobviz agent
const agent = await initJobviz({
  apiKey: process.env.JOBVIZ_API_KEY,
  queues: ['emails'],
  redisUrl: process.env.REDIS_URL ?? 'redis://localhost:6379',
});

// 2. Track deployments for change correlation
agent.trackDeployment({
  version: '1.2.0',
  commitHash: 'abc123f',
});

// 3. Create a queue and worker as usual
const emailQueue = new Queue('emails', {
  connection: { url: process.env.REDIS_URL ?? 'redis://localhost:6379' },
});

const worker = new Worker(
  'emails',
  async (job) => {
    // 4. Attach structured logs to the job timeline
    jobviz.log(job, 'Fetching template');
    const template = await fetchTemplate(job.data.templateId);

    jobviz.log(job, 'Sending email', { to: job.data.to });
    await sendEmail(template, job.data.to);

    return { sent: true };
  },
  { connection: { url: process.env.REDIS_URL ?? 'redis://localhost:6379' } },
);

// 5. Add jobs
await emailQueue.add('send-welcome', {
  to: 'user@example.com',
  templateId: 'welcome',
});

// 6. Graceful shutdown
process.on('SIGTERM', async () => {
  await worker.close();
  await emailQueue.close();
  await agent.stop();
});

2. bee-queue

Use the BeeQueueProvider to monitor bee-queue queues. The provider attaches to Redis pub/sub in watcher mode.

Requires Redis
Peer dependency bee-queue 1.7+
Events captured active, completed, failed
src/index.ts
import { initJobviz, jobviz, BeeQueueProvider } from 'jobviz-agent';
import Queue from 'bee-queue';

// 1. Initialize with the bee-queue provider
const agent = await initJobviz({
  apiKey: process.env.JOBVIZ_API_KEY,
  provider: new BeeQueueProvider({
    queues: ['notifications'],
    redisUrl: process.env.REDIS_URL ?? 'redis://localhost:6379',
  }),
});

agent.trackDeployment({ version: '1.0.0' });

// 2. Create a bee-queue queue
const notifQueue = new Queue('notifications', {
  redis: { url: process.env.REDIS_URL ?? 'redis://localhost:6379' },
  isWorker: true,
});

// 3. Process jobs
notifQueue.process(async (job) => {
  jobviz.log(job, 'Sending notification', { channel: job.data.channel });
  await sendNotification(job.data);
  return { delivered: true };
});

// 4. Add a job
const job = notifQueue.createJob({
  channel: 'push',
  userId: 'u_123',
  message: 'You have a new message',
});
await job.save();

// 5. Graceful shutdown
process.on('SIGTERM', async () => {
  await notifQueue.close();
  await agent.stop();
});

3. Agenda.js

Use the built-in AgendaProvider. Pass your existing Agenda instance — the provider listens to its event emitter without modifying your setup.

Requires MongoDB
Peer dependency agenda 5.0+
Events captured active, completed, failed
src/index.ts
import { initJobviz, jobviz, AgendaProvider } from 'jobviz-agent';
import Agenda from 'agenda';

const MONGO_URL = process.env.MONGO_URL ?? 'mongodb://localhost:27017/myapp';

// 1. Create an Agenda instance
const agenda = new Agenda({
  db: { address: MONGO_URL, collection: 'jobs' },
});

// 2. Initialize Jobviz with the built-in Agenda provider
const agent = await initJobviz({
  apiKey: process.env.JOBVIZ_API_KEY,
  provider: new AgendaProvider({ agenda }),
});

agent.trackDeployment({ version: '1.0.0' });

// 3. Define jobs as usual
agenda.define('send-welcome-email', async (job) => {
  const { to, subject } = job.attrs.data;

  jobviz.log(job, 'Processing email');
  await sendEmail(to, subject);
  jobviz.log(job, 'Email sent', { to });
});

agenda.define('generate-invoice', async (job) => {
  const { orderId } = job.attrs.data;

  jobviz.log(job, 'Generating invoice', { orderId });
  const pdf = await generateInvoice(orderId);
  jobviz.log(job, 'Invoice ready', { size: pdf.length });
});

// 4. Start and schedule
await agenda.start();
await agenda.every('5 minutes', 'generate-invoice', { orderId: 'latest' });
await agenda.schedule('now', 'send-welcome-email', {
  to: 'user@example.com',
  subject: 'Welcome!',
});

// 5. Graceful shutdown
process.on('SIGTERM', async () => {
  await agenda.stop();
  await agent.stop();
});

Note: jobviz.log(job, ...) works with Agenda jobs automatically — it reads job.attrs._id and job.attrs.name under the hood.

4. MultiProvider

Use MultiProvider to monitor multiple queue backends under a single Jobviz agent. All events appear in the same dashboard.

src/index.ts
import {
  initJobviz,
  jobviz,
  MultiProvider,
  BullMQProvider,
  BeeQueueProvider,
} from 'jobviz-agent';
import { Queue, Worker } from 'bullmq';
import BeeQueue from 'bee-queue';

const REDIS_URL = process.env.REDIS_URL ?? 'redis://localhost:6379';

// 1. Initialize with multiple queue backends
const agent = await initJobviz({
  apiKey: process.env.JOBVIZ_API_KEY,
  provider: new MultiProvider([
    new BullMQProvider({
      queues: ['emails', 'reports'],
      connection: { url: REDIS_URL },
    }),
    new BeeQueueProvider({
      queues: ['notifications'],
      redisUrl: REDIS_URL,
    }),
  ]),
});

agent.trackDeployment({ version: '2.0.0' });

// 2. Set up BullMQ workers
const emailWorker = new Worker(
  'emails',
  async (job) => {
    jobviz.log(job, 'Sending email', { to: job.data.to });
    await sendEmail(job.data);
    return { sent: true };
  },
  { connection: { url: REDIS_URL } },
);

// 3. Set up bee-queue workers
const notifQueue = new BeeQueue('notifications', {
  redis: { url: REDIS_URL },
  isWorker: true,
});

notifQueue.process(async (job) => {
  jobviz.log(job, 'Delivering notification');
  await deliverNotification(job.data);
  return { delivered: true };
});

// 4. Graceful shutdown
process.on('SIGTERM', async () => {
  await emailWorker.close();
  await notifQueue.close();
  await agent.stop();
});

Tip: You can mix built-in providers with custom ones in the same MultiProvider. See the custom provider example below.

5. Custom provider

Implement the QueueProvider interface to integrate any job or task system. Just two methods: connect(push) and disconnect().

src/cron-provider.js
import { initJobviz } from 'jobviz-agent';

// 1. Implement the QueueProvider interface
class CronJobProvider {
  #push;
  #watchers = [];

  constructor(jobs) {
    this.jobs = jobs; // [{ name, cron, handler }]
  }

  connect(push) {
    this.#push = push;

    for (const job of this.jobs) {
      const watcher = scheduleCron(job.cron, async () => {
        const jobId = `${job.name}-${Date.now()}`;

        // Emit lifecycle events
        this.#push({
          jobId,
          jobName: job.name,
          queue: 'cron',
          event: 'active',
          timestamp: Date.now(),
        });

        try {
          const result = await job.handler();
          this.#push({
            jobId,
            jobName: job.name,
            queue: 'cron',
            event: 'completed',
            timestamp: Date.now(),
            data: { returnValue: result },
          });
        } catch (err) {
          this.#push({
            jobId,
            jobName: job.name,
            queue: 'cron',
            event: 'failed',
            timestamp: Date.now(),
            data: {
              failedReason: err.message,
              stack: err.stack,
            },
          });
        }
      });
      this.#watchers.push(watcher);
    }
  }

  async disconnect() {
    this.#watchers.forEach((w) => w.stop());
    this.#watchers = [];
  }
}

// 2. Use it with Jobviz
const agent = await initJobviz({
  apiKey: process.env.JOBVIZ_API_KEY,
  provider: new CronJobProvider([
    { name: 'sync-analytics', cron: '*/5 * * * *', handler: syncAnalytics },
    { name: 'cleanup-sessions', cron: '0 * * * *', handler: cleanupSessions },
  ]),
});

Learn more: See the Custom Integration Guide for the full JobEvent schema, best practices, and the REST API for non-Node runtimes.

Next steps

Ready to start monitoring?

Sign up for free and get your API key in seconds.

Get started free