Node.js Examples
Complete, copy-paste examples for every supported Node.js queue backend. Each example includes initialization, job processing, in-job logging, and graceful shutdown.
1. BullMQ
The built-in shorthand — pass queues
and redisUrl directly. No provider
import needed.
bullmq 5.0+
import { initJobviz, jobviz } from 'jobviz-agent';
import { Queue, Worker } from 'bullmq';
// 1. Initialize the Jobviz agent
const agent = await initJobviz({
apiKey: process.env.JOBVIZ_API_KEY,
queues: ['emails'],
redisUrl: process.env.REDIS_URL ?? 'redis://localhost:6379',
});
// 2. Track deployments for change correlation
agent.trackDeployment({
version: '1.2.0',
commitHash: 'abc123f',
});
// 3. Create a queue and worker as usual
const emailQueue = new Queue('emails', {
connection: { url: process.env.REDIS_URL ?? 'redis://localhost:6379' },
});
const worker = new Worker(
'emails',
async (job) => {
// 4. Attach structured logs to the job timeline
jobviz.log(job, 'Fetching template');
const template = await fetchTemplate(job.data.templateId);
jobviz.log(job, 'Sending email', { to: job.data.to });
await sendEmail(template, job.data.to);
return { sent: true };
},
{ connection: { url: process.env.REDIS_URL ?? 'redis://localhost:6379' } },
);
// 5. Add jobs
await emailQueue.add('send-welcome', {
to: 'user@example.com',
templateId: 'welcome',
});
// 6. Graceful shutdown
process.on('SIGTERM', async () => {
await worker.close();
await emailQueue.close();
await agent.stop();
}); 2. bee-queue
Use the BeeQueueProvider to monitor
bee-queue queues. The provider attaches to Redis pub/sub in watcher mode.
bee-queue 1.7+
import { initJobviz, jobviz, BeeQueueProvider } from 'jobviz-agent';
import Queue from 'bee-queue';
// 1. Initialize with the bee-queue provider
const agent = await initJobviz({
apiKey: process.env.JOBVIZ_API_KEY,
provider: new BeeQueueProvider({
queues: ['notifications'],
redisUrl: process.env.REDIS_URL ?? 'redis://localhost:6379',
}),
});
agent.trackDeployment({ version: '1.0.0' });
// 2. Create a bee-queue queue
const notifQueue = new Queue('notifications', {
redis: { url: process.env.REDIS_URL ?? 'redis://localhost:6379' },
isWorker: true,
});
// 3. Process jobs
notifQueue.process(async (job) => {
jobviz.log(job, 'Sending notification', { channel: job.data.channel });
await sendNotification(job.data);
return { delivered: true };
});
// 4. Add a job
const job = notifQueue.createJob({
channel: 'push',
userId: 'u_123',
message: 'You have a new message',
});
await job.save();
// 5. Graceful shutdown
process.on('SIGTERM', async () => {
await notifQueue.close();
await agent.stop();
}); 3. Agenda.js
Use the built-in AgendaProvider.
Pass your existing Agenda instance — the provider listens to its event
emitter without modifying your setup.
agenda 5.0+
import { initJobviz, jobviz, AgendaProvider } from 'jobviz-agent';
import Agenda from 'agenda';
const MONGO_URL = process.env.MONGO_URL ?? 'mongodb://localhost:27017/myapp';
// 1. Create an Agenda instance
const agenda = new Agenda({
db: { address: MONGO_URL, collection: 'jobs' },
});
// 2. Initialize Jobviz with the built-in Agenda provider
const agent = await initJobviz({
apiKey: process.env.JOBVIZ_API_KEY,
provider: new AgendaProvider({ agenda }),
});
agent.trackDeployment({ version: '1.0.0' });
// 3. Define jobs as usual
agenda.define('send-welcome-email', async (job) => {
const { to, subject } = job.attrs.data;
jobviz.log(job, 'Processing email');
await sendEmail(to, subject);
jobviz.log(job, 'Email sent', { to });
});
agenda.define('generate-invoice', async (job) => {
const { orderId } = job.attrs.data;
jobviz.log(job, 'Generating invoice', { orderId });
const pdf = await generateInvoice(orderId);
jobviz.log(job, 'Invoice ready', { size: pdf.length });
});
// 4. Start and schedule
await agenda.start();
await agenda.every('5 minutes', 'generate-invoice', { orderId: 'latest' });
await agenda.schedule('now', 'send-welcome-email', {
to: 'user@example.com',
subject: 'Welcome!',
});
// 5. Graceful shutdown
process.on('SIGTERM', async () => {
await agenda.stop();
await agent.stop();
}); Note: jobviz.log(job, ...)
works with Agenda jobs automatically — it reads job.attrs._id
and job.attrs.name under the hood.
4. MultiProvider
Use MultiProvider to monitor
multiple queue backends under a single Jobviz agent. All events appear
in the same dashboard.
import {
initJobviz,
jobviz,
MultiProvider,
BullMQProvider,
BeeQueueProvider,
} from 'jobviz-agent';
import { Queue, Worker } from 'bullmq';
import BeeQueue from 'bee-queue';
const REDIS_URL = process.env.REDIS_URL ?? 'redis://localhost:6379';
// 1. Initialize with multiple queue backends
const agent = await initJobviz({
apiKey: process.env.JOBVIZ_API_KEY,
provider: new MultiProvider([
new BullMQProvider({
queues: ['emails', 'reports'],
connection: { url: REDIS_URL },
}),
new BeeQueueProvider({
queues: ['notifications'],
redisUrl: REDIS_URL,
}),
]),
});
agent.trackDeployment({ version: '2.0.0' });
// 2. Set up BullMQ workers
const emailWorker = new Worker(
'emails',
async (job) => {
jobviz.log(job, 'Sending email', { to: job.data.to });
await sendEmail(job.data);
return { sent: true };
},
{ connection: { url: REDIS_URL } },
);
// 3. Set up bee-queue workers
const notifQueue = new BeeQueue('notifications', {
redis: { url: REDIS_URL },
isWorker: true,
});
notifQueue.process(async (job) => {
jobviz.log(job, 'Delivering notification');
await deliverNotification(job.data);
return { delivered: true };
});
// 4. Graceful shutdown
process.on('SIGTERM', async () => {
await emailWorker.close();
await notifQueue.close();
await agent.stop();
}); Tip: You can mix built-in providers
with custom ones in the same MultiProvider.
See the custom provider
example below.
5. Custom provider
Implement the QueueProvider
interface to integrate any job or task system. Just two methods:
connect(push) and
disconnect().
import { initJobviz } from 'jobviz-agent';
// 1. Implement the QueueProvider interface
class CronJobProvider {
#push;
#watchers = [];
constructor(jobs) {
this.jobs = jobs; // [{ name, cron, handler }]
}
connect(push) {
this.#push = push;
for (const job of this.jobs) {
const watcher = scheduleCron(job.cron, async () => {
const jobId = `${job.name}-${Date.now()}`;
// Emit lifecycle events
this.#push({
jobId,
jobName: job.name,
queue: 'cron',
event: 'active',
timestamp: Date.now(),
});
try {
const result = await job.handler();
this.#push({
jobId,
jobName: job.name,
queue: 'cron',
event: 'completed',
timestamp: Date.now(),
data: { returnValue: result },
});
} catch (err) {
this.#push({
jobId,
jobName: job.name,
queue: 'cron',
event: 'failed',
timestamp: Date.now(),
data: {
failedReason: err.message,
stack: err.stack,
},
});
}
});
this.#watchers.push(watcher);
}
}
async disconnect() {
this.#watchers.forEach((w) => w.stop());
this.#watchers = [];
}
}
// 2. Use it with Jobviz
const agent = await initJobviz({
apiKey: process.env.JOBVIZ_API_KEY,
provider: new CronJobProvider([
{ name: 'sync-analytics', cron: '*/5 * * * *', handler: syncAnalytics },
{ name: 'cleanup-sessions', cron: '0 * * * *', handler: cleanupSessions },
]),
}); Learn more: See the
Custom Integration Guide
for the full JobEvent schema,
best practices, and the REST API for non-Node runtimes.