Back to integrations

Custom Integration Guide

Integrate Jobviz with any Node.js queue system. Implement a simple two-method interface, and the agent handles batching, retries, and transport automatically.

1. How it works

The Jobviz agent separates event collection from event delivery. A provider listens to your queue system and pushes normalized events into the agent. The agent handles batching (default: 100 events or 3s), HTTP transport with retries, and error isolation — your application never blocks or crashes because of monitoring.

1

Your provider listens to queue events

2

Call push(event) for each lifecycle change

3

Agent batches & sends to Jobviz automatically

2. The QueueProvider interface

Implement two methods: connect() to start listening, and disconnect() to clean up. That's it.

my-provider.ts
import type { QueueProvider, JobEvent } from 'jobviz-agent';

class MyQueueProvider implements QueueProvider {
  private push!: (event: JobEvent) => void;

  connect(push: (event: JobEvent) => void): void {
    this.push = push;
    // Subscribe to your queue system here
    // Call this.push() for every job lifecycle event
  }

  async disconnect(): Promise<void> {
    // Clean up connections, timers, listeners
  }
}

3. The JobEvent schema

Every event you push must conform to this shape. Only jobId, jobName, queue, event, and timestamp are required — everything else is optional.

types.ts
interface JobEvent {
  jobId: string;        // Unique job identifier
  jobName: string;      // Job type / name
  queue: string;        // Queue name
  event:                // Lifecycle event type
    | 'waiting'         // Job added to queue
    | 'active'          // Job started processing
    | 'completed'       // Job finished successfully
    | 'failed'          // Job threw an error
    | 'delayed'         // Job scheduled for later
    | 'stalled'         // Job exceeded lock timeout
    | 'progress';       // Job reported progress
  timestamp: number;    // Unix ms (Date.now())
  env?: string;         // e.g. 'production', 'staging'
  traceId?: string;     // Correlation / trace ID
  parentJobId?: string; // Parent job ID (for flows)
  parentQueue?: string; // Parent queue name
  data?: {
    returnValue?: unknown;             // Completion result
    failedReason?: string;             // Error message
    stack?: string;                    // Error stack trace
    progress?: unknown;                // Progress payload
    delay?: number;                    // Delay in ms
    attemptsMade?: number;             // Retry count
    input?: Record<string, unknown>;   // Job input data
    log?: {                            // Structured log
      message: string;
      meta?: Record<string, unknown>;
    };
  };
}
Event When to emit Key data fields
waitingJob added to queueinput
activeJob picked up by a worker-
completedJob finished successfullyreturnValue
failedJob threw an errorfailedReason, stack, attemptsMade
delayedJob scheduled for futuredelay
stalledJob exceeded lock timeout-
progressJob reported progressprogress, log

Tip: You don't need to emit all event types. Even just completed and failed is enough for Jobviz to provide failure tracking, alerting, and AI root cause analysis.

4. Full example — generic queue

Here's a complete provider for a hypothetical queue library. Adapt the event names and job properties to match your queue system.

my-queue-provider.ts
import { initJobviz } from 'jobviz-agent';
import type { QueueProvider, JobEvent } from 'jobviz-agent';
import { MyQueue } from './my-queue'; // your queue library

class MyQueueProvider implements QueueProvider {
  private push!: (event: JobEvent) => void;
  private queues: MyQueue[] = [];

  constructor(private queueNames: string[]) {}

  connect(push: (event: JobEvent) => void): void {
    this.push = push;

    for (const name of this.queueNames) {
      const queue = new MyQueue(name);

      queue.on('job:added', (job) => {
        this.push({
          jobId: String(job.id),
          jobName: job.type ?? name,
          queue: name,
          event: 'waiting',
          timestamp: Date.now(),
          data: { input: job.data },
        });
      });

      queue.on('job:started', (job) => {
        this.push({
          jobId: String(job.id),
          jobName: job.type ?? name,
          queue: name,
          event: 'active',
          timestamp: Date.now(),
        });
      });

      queue.on('job:completed', (job, result) => {
        this.push({
          jobId: String(job.id),
          jobName: job.type ?? name,
          queue: name,
          event: 'completed',
          timestamp: Date.now(),
          data: { returnValue: result },
        });
      });

      queue.on('job:failed', (job, err) => {
        this.push({
          jobId: String(job.id),
          jobName: job.type ?? name,
          queue: name,
          event: 'failed',
          timestamp: Date.now(),
          data: {
            failedReason: err.message,
            stack: err.stack,
            attemptsMade: job.attempts ?? 0,
          },
        });
      });

      this.queues.push(queue);
    }
  }

  async disconnect(): Promise<void> {
    for (const queue of this.queues) {
      await queue.close();
    }
    this.queues = [];
  }
}

// Initialize Jobviz with your custom provider
initJobviz({
  apiKey: 'your-api-key',
  provider: new MyQueueProvider(['emails', 'reports']),
});

5. Real-world example — Agenda.js

A concrete example using Agenda, a MongoDB-backed job scheduler.

agenda-provider.ts
import { initJobviz } from 'jobviz-agent';
import type { QueueProvider, JobEvent } from 'jobviz-agent';
import Agenda from 'agenda';

class AgendaProvider implements QueueProvider {
  private push!: (event: JobEvent) => void;
  private agenda: Agenda;

  constructor(private mongoUrl: string) {
    this.agenda = new Agenda({ db: { address: mongoUrl } });
  }

  connect(push: (event: JobEvent) => void): void {
    this.push = push;

    this.agenda.on('start', (job) => {
      this.push({
        jobId: String(job.attrs._id),
        jobName: job.attrs.name,
        queue: 'agenda',
        event: 'active',
        timestamp: Date.now(),
        data: { input: job.attrs.data },
      });
    });

    this.agenda.on('complete', (job) => {
      this.push({
        jobId: String(job.attrs._id),
        jobName: job.attrs.name,
        queue: 'agenda',
        event: 'completed',
        timestamp: Date.now(),
      });
    });

    this.agenda.on('fail', (err, job) => {
      this.push({
        jobId: String(job.attrs._id),
        jobName: job.attrs.name,
        queue: 'agenda',
        event: 'failed',
        timestamp: Date.now(),
        data: {
          failedReason: err.message,
          stack: err.stack,
          attemptsMade: job.attrs.failCount ?? 0,
        },
      });
    });

    void this.agenda.start();
  }

  async disconnect(): Promise<void> {
    await this.agenda.stop();
  }
}

initJobviz({
  apiKey: 'your-api-key',
  provider: new AgendaProvider('mongodb://localhost/agenda'),
});

6. Structured logs

Use jobviz.log() inside your job workers to attach structured log entries. These appear in the Jobviz dashboard as a timeline for each job execution.

worker.ts
import { initJobviz, jobviz } from 'jobviz-agent';
import type { QueueProvider, JobEvent } from 'jobviz-agent';

// After initializing with your custom provider...
initJobviz({
  apiKey: 'your-api-key',
  provider: new MyQueueProvider(['emails']),
});

// Inside your job worker, attach structured logs:
async function processJob(job) {
  jobviz.log(job, 'Starting email send');

  const template = await fetchTemplate(job.data.templateId);
  jobviz.log(job, 'Template fetched', { templateId: job.data.templateId });

  await sendEmail(template, job.data.recipients);
  jobviz.log(job, 'Email sent', { recipients: job.data.recipients.length });
}

Note: The job object only needs id, name, and queueName properties. It works with BullMQ jobs, Bee-Queue jobs, or any object with those fields.

7. Mixing providers

Use MultiProvider to monitor multiple queue systems under a single Jobviz instance.

multi-setup.ts
import { initJobviz, BullMQProvider, MultiProvider } from 'jobviz-agent';

// Combine built-in providers with custom ones
initJobviz({
  apiKey: 'your-api-key',
  provider: new MultiProvider([
    new BullMQProvider({
      queues: ['emails'],
      connection: { url: 'redis://localhost:6379' },
    }),
    new MyQueueProvider(['custom-queue-1', 'custom-queue-2']),
  ]),
});

8. REST API (non-Node)

If your queue system isn't running in Node.js, you can send events directly to the Jobviz REST API from any language or runtime.

rest-example.js
// If you can't use the Node.js SDK (e.g. non-Node runtime),
// send events directly to the REST API.

const JOBVIZ_ENDPOINT = 'https://app.jobviz.dev/api/v1/events';
const API_KEY = 'your-api-key';

async function sendJobEvents(events) {
  const response = await fetch(JOBVIZ_ENDPOINT, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_KEY,
    },
    body: JSON.stringify({ events }),
  });

  if (!response.ok) {
    throw new Error('Jobviz API error: ' + response.status);
  }
}

// Example: send a "completed" event
await sendJobEvents([
  {
    jobId: 'job-123',
    jobName: 'send-email',
    queue: 'emails',
    event: 'completed',
    timestamp: Date.now(),
    data: { returnValue: { sent: true } },
  },
]);
Detail Value
EndpointPOST https://app.jobviz.dev/api/v1/events
AuthAuthorization: Bearer <api-key>
Content-Typeapplication/json
Body{ "events": JobEvent[] }

9. Best practices

Always include timestamps

Use Date.now() at the moment the event actually occurs, not when you push it. This ensures accurate duration calculations in the dashboard.

Include error stacks on failure

The stack field powers Jobviz's AI root cause analysis. Always include it when emitting failed events.

Use consistent job names

Jobviz groups metrics by jobName. Use stable, descriptive names like "send-welcome-email" rather than dynamic values.

Add trace IDs for distributed systems

If your jobs are triggered by HTTP requests or other jobs, pass the traceId field to enable cross-service correlation in the dashboard.

Handle provider errors gracefully

Your connect() method should never throw. Wrap queue subscriptions in try/catch and log errors internally — the agent won't catch provider exceptions.

Use debug mode during development

Set debug: true in your config to see all events logged to console and access the health endpoint at http://127.0.0.1:9888/agent/health.

Need help with your integration?

Join us on Discord or open an issue on GitHub — we're happy to help you get set up.

Get started free