Checkstack Documentation

Queue System

The Checkstack Queue system provides a pluggable, type-safe infrastructure for managing asynchronous tasks and distributed events. It is designed to scale from simple in-memory development environments to multi-node production clusters.

Table of Contents


Overview and Concepts

What is the Queue System?

The Queue system is Checkstack’s abstraction for asynchronous task processing. It provides:

Core Concepts

Jobs

A job represents a unit of work to be processed asynchronously. Each job contains:

interface QueueJob<T = unknown> {
  id: string;              // Unique identifier
  data: T;                 // The payload to process
  priority?: number;       // Higher values = processed first (default: 0)
  timestamp: Date;         // When the job was enqueued
  attempts?: number;       // Current retry count
}

Queues

A queue is a named channel for jobs of a specific type. Queues handle:

Consumer Groups

Consumer groups determine how messages are distributed across multiple instances:

Priorities

Jobs can be assigned a numeric priority (higher = more urgent). The queue processes higher-priority jobs before lower-priority ones, regardless of enqueue order.

When to Use the Queue System

Use the Queue system when you need:


Architecture

The Queue system is built on a factory pattern that allows different backends to be used transparently.

Core Components

QueuePlugin

The QueuePlugin interface defines how to create queue instances:

interface QueuePlugin<Config = unknown> {
  id: string;                      // Unique plugin identifier (e.g., "memory", "redis")
  displayName: string;             // Human-readable name
  description?: string;            // Optional description
  
  configVersion: number;           // Current config schema version
  configSchema: z.ZodType<Config>; // Zod schema for validation
  migrations?: MigrationChain<Config>; // Optional version migrations
  
  createQueue<T>(name: string, config: Config): Queue<T>;
}

All queue plugins must implement this interface to be compatible with the system.

QueuePluginRegistry

A central registry where queue plugins are registered at startup:

interface QueuePluginRegistry {
  register(plugin: QueuePlugin<unknown>): void;
  getPlugin(id: string): QueuePlugin<unknown> | undefined;
  getPlugins(): QueuePlugin<unknown>[];
}

Plugins are typically registered in their backend plugin’s register lifecycle hook.

QueueManager

The QueueManager service is responsible for managing queue instances and backend switching:

interface QueueManager {
  // Get or create a queue proxy (synchronous, returns stable reference)
  getQueue<T>(name: string): Queue<T>;
  
  // Active backend management
  getActivePlugin(): string;
  setActiveBackend(pluginId: string, config: unknown): Promise<void>;
  
  // Multi-instance coordination  
  startPolling(intervalMs?: number): void;
  stopPolling(): void;
  
  // Monitoring and status
  getInFlightJobCount(): Promise<number>;
  listAllRecurringJobs(): Promise<RecurringJobDetails[]>;
  
  // Graceful shutdown
  shutdown(): Promise<void>;
}

Applications use QueueManager.getQueue() to obtain queue references without knowing which backend is active. The returned queue is actually a QueueProxy that provides stable references across backend switches.

QueueProxy

The QueueProxy class wraps actual queue implementations to provide:

class QueueProxy<T> implements Queue<T> {
  // All Queue methods delegate to the underlying implementation
  // On backend switch, subscriptions are automatically replayed
  async switchDelegate(newQueue: Queue<T>): Promise<void>;
}

Queue Interface

The Queue interface is the primary interaction point:

interface Queue<T = unknown> {
  enqueue(
    data: T,
    options?: {
      priority?: number;
      startDelay?: number;  // Delay before job becomes available (renamed from delaySeconds)
      jobId?: string;       // For deduplication
    }
  ): Promise<string>;

  consume(
    consumer: QueueConsumer<T>,
    options: ConsumeOptions
  ): Promise<void>;

  // Recurring job management (native support)
  scheduleRecurring(
    jobId: string,
    data: T,
    intervalSeconds: number,
    options?: { startDelay?: number }
  ): Promise<void>;
  cancelRecurring(jobId: string): Promise<void>;
  listRecurringJobs(): Promise<string[]>;
  getRecurringJobDetails(jobId: string): Promise<RecurringJobDetails | undefined>;
  
  // Monitoring
  getInFlightCount(): Promise<number>;
  stop(): Promise<void>;
  getStats(): Promise<QueueStats>;
  testConnection(): Promise<void>;
}

How It Works Internally

Job Lifecycle

  1. Enqueue: A job is added to the queue with optional priority, delay, or custom ID
  2. Routing: The queue determines which consumer groups should receive the job
  3. Selection: Within each consumer group, a consumer is selected (round-robin)
  4. Processing: The consumer’s handler function is invoked with the job
  5. Completion: On success, the job is marked complete; on failure, it may be retried
  6. Cleanup: Once all consumer groups have processed the job, it’s removed from the queue

Consumer Group Routing Logic

The queue maintains state for each consumer group:

interface ConsumerGroupState<T> {
  consumers: Array<{
    id: string;
    handler: QueueConsumer<T>;
    maxRetries: number;
  }>;
  nextConsumerIndex: number;      // For round-robin selection
  processedJobIds: Set<string>;   // Track which jobs this group has seen
}

Routing Rules:

Priority Handling

Jobs are stored in priority order (higher priority first). When multiple jobs are pending:

  1. The queue scans for the highest-priority job that hasn’t been processed by a given consumer group
  2. Jobs with the same priority are processed in FIFO order
  3. Enqueuing a job triggers re-insertion at the correct position based on priority

Delayed Job Execution

Jobs can be enqueued with a startDelay option (in seconds):

await queue.enqueue(data, { startDelay: 60 }); // Available in 60 seconds

Implementation:

Retry Mechanism with Exponential Backoff

When a consumer throws an error:

  1. The job’s attempts counter is incremented
  2. If attempts < maxRetries, the job is re-queued
  3. A delay is calculated: 2^attempts * 1000ms (exponential backoff)
  4. After the delay, the job becomes available for the same consumer group again
  5. If attempts >= maxRetries, the job is marked as failed

Example backoff sequence:

Job Deduplication via jobId

When enqueuing with a custom jobId:

await queue.enqueue(data, { jobId: 'healthcheck:system-123' });

Behavior:

Concurrency Control

Queue implementations typically use a semaphore to limit concurrent job processing:

const semaphore = new Semaphore(config.concurrency); // e.g., 10

async function processJob(job) {
  await semaphore.acquire();
  try {
    await handler(job);
  } finally {
    semaphore.release();
  }
}

This prevents resource exhaustion and allows fine-tuning of throughput.


Behavior and Guarantees

Delivery Semantics

The Queue system provides at-least-once delivery:

Message Ordering

Consumer Group Isolation

Recurring Job Scheduling

When using scheduleRecurring(), jobs run on a fixed wall-clock interval (same as BullMQ):

T=0:   Job A starts (intervalSeconds = 1)
T=1:   Job B starts (Job A still running)
T=2:   Job C starts (Job A still running, Job B running)
T=3:   Job A completes

Key implications:

Recommendation: Set intervalSeconds to a value greater than the expected maximum execution time to avoid job accumulation. For health checks, consider the network timeout plus processing time.

Graceful Shutdown

queue.stop() behavior:

  1. Marks the queue as stopped (no new jobs are processed)
  2. Waits for all currently processing jobs to complete
  3. Returns once all in-flight work is finished

This ensures data consistency during application shutdown.

Statistics and Monitoring

queue.getStats() returns:

interface QueueStats {
  pending: number;       // Jobs waiting to be processed
  processing: number;    // Jobs currently being handled
  completed: number;     // Total successful jobs (lifetime)
  failed: number;        // Total failed jobs after all retries (lifetime)
  consumerGroups: number; // Number of active consumer groups
}

Use these metrics for monitoring, alerting, and capacity planning.

Configuration Versioning and Migrations

Queue plugins support versioned configurations with automatic migrations:

When loading a config, the system:

  1. Checks the stored configVersion
  2. Applies necessary migrations in sequence
  3. Validates the final config against configSchema

This ensures backward compatibility when queue configuration evolves.


Implementing Custom Queue Plugins

This section guides you through creating a custom queue plugin, such as a Redis-based backend.

Step 1: Define Your Configuration Schema

Use Zod to define the plugin’s configuration:

import { z } from 'zod';

const redisConfigSchema = z.object({
  host: z.string().default('localhost').describe('Redis server hostname'),
  port: z.number().min(1).max(65535).default(6379).describe('Redis server port'),
  password: z.string().optional().describe('Redis password (if required)'),
  db: z.number().min(0).default(0).describe('Redis database number'),
  maxRetries: z.number().min(0).default(3).describe('Default max retries for jobs'),
});

export type RedisQueueConfig = z.infer<typeof redisConfigSchema>;

Step 2: Implement the Queue Interface

Create a class that implements the Queue<T> interface:

import { Queue, QueueJob, QueueConsumer, ConsumeOptions, QueueStats } from '@checkstack/queue-api';

export class RedisQueue<T> implements Queue<T> {
  private redis: Redis;
  private stopped = false;

  constructor(private name: string, private config: RedisQueueConfig) {
    // Initialize Redis connection
    this.redis = new Redis({
      host: config.host,
      port: config.port,
      password: config.password,
      db: config.db,
    });
  }

  async enqueue(
    data: T,
    options?: { priority?: number; delaySeconds?: number; jobId?: string }
  ): Promise<string> {
    const jobId = options?.jobId ?? crypto.randomUUID();
    const job: QueueJob<T> = {
      id: jobId,
      data,
      priority: options?.priority ?? 0,
      timestamp: new Date(),
      attempts: 0,
    };

    // Check for duplicates
    if (options?.jobId) {
      const exists = await this.redis.exists(`job:${this.name}:${jobId}`);
      if (exists) return jobId;
    }

    // Store job data
    await this.redis.set(`job:${this.name}:${jobId}`, JSON.stringify(job));

    // Add to priority queue
    const score = options?.priority ?? 0;
    const member = jobId;
    
    if (options?.delaySeconds) {
      const availableAt = Date.now() + options.delaySeconds * 1000;
      await this.redis.zadd(`delayed:${this.name}`, availableAt, member);
    } else {
      await this.redis.zadd(`queue:${this.name}`, -score, member); // Negative for descending
    }

    return jobId;
  }

  async consume(consumer: QueueConsumer<T>, options: ConsumeOptions): Promise<void> {
    // Register consumer and start polling loop
    // Implementation details omitted for brevity
  }

  async stop(): Promise<void> {
    this.stopped = true;
    await this.redis.quit();
  }

  async getStats(): Promise<QueueStats> {
    const pending = await this.redis.zcard(`queue:${this.name}`);
    const delayed = await this.redis.zcard(`delayed:${this.name}`);
    // Implementation details omitted
    return { pending: pending + delayed, processing: 0, completed: 0, failed: 0, consumerGroups: 0 };
  }
}

Step 3: Implement the QueuePlugin Interface

Create the plugin class:

import { QueuePlugin } from '@checkstack/queue-api';

export class RedisQueuePlugin implements QueuePlugin<RedisQueueConfig> {
  id = 'redis';
  displayName = 'Redis Queue';
  description = 'Production-ready queue backed by Redis';
  configVersion = 1;
  configSchema = redisConfigSchema;

  createQueue<T>(name: string, config: RedisQueueConfig): Queue<T> {
    return new RedisQueue<T>(name, config);
  }
}

Step 4: Register the Plugin

In your backend plugin’s register lifecycle:

import { createBackendPlugin, coreServices } from '@checkstack/backend-api';
import { RedisQueuePlugin } from './redis-queue-plugin';
import { pluginMetadata } from './plugin-metadata';

export default createBackendPlugin({
  metadata: pluginMetadata,
  register(env) {
    env.registerInit({
      deps: {
        logger: coreServices.logger,
        queueRegistry: coreServices.queuePluginRegistry,
      },
      init: async ({ logger, queueRegistry }) => {
        logger.debug('🔌 Registering Redis Queue Plugin...');
        queueRegistry.register(new RedisQueuePlugin());
      },
    });
  },
});

Step 5: Consumer Group Support

To support consumer groups properly:

In Redis, you might use:

Step 6: Testing Your Plugin

Create comprehensive tests covering:

Example test structure:

import { describe, test, expect } from 'bun:test';
import { RedisQueue } from './redis-queue';

describe('RedisQueue', () => {
  test('should process jobs in priority order', async () => {
    const queue = new RedisQueue('test', defaultConfig);
    const processed: number[] = [];

    await queue.consume(async (job) => {
      processed.push(job.data as number);
    }, { consumerGroup: 'test-group' });

    await queue.enqueue(1, { priority: 5 });
    await queue.enqueue(2, { priority: 10 });
    await queue.enqueue(3, { priority: 1 });

    await sleep(100);
    expect(processed).toEqual([2, 1, 3]); // Highest priority first
  });

  // More tests...
});

Practical Examples

Example 1: InMemoryQueue Reference Implementation

The InMemoryQueue is a complete, production-ready implementation suitable for single-instance deployments. Let’s examine key implementation details:

Configuration

const configSchema = z.object({
  concurrency: z
    .number()
    .min(1)
    .max(100)
    .default(10)
    .describe('Maximum number of concurrent jobs to process'),
  maxQueueSize: z
    .number()
    .min(1)
    .default(10_000)
    .describe('Maximum number of jobs that can be queued'),
  delayMultiplier: z
    .number()
    .min(0)
    .max(1)
    .default(1)
    .describe('Delay multiplier (default: 1). Only change for testing purposes - set to 0.01 for 100x faster test execution.'),
  heartbeatIntervalMs: z
    .number()
    .min(0)
    .default(5000)
    .describe('Interval for heartbeat checks that recover jobs after system sleep/wake (0 to disable).'),
});

Constructor

The InMemoryQueue constructor requires a Logger instance for diagnostic output:

import { InMemoryQueue } from '@checkstack/queue-memory-backend';
import type { Logger } from '@checkstack/backend-api';

const queue = new InMemoryQueue<MyJobData>(
  'my-queue-name',
  {
    concurrency: 10,
    maxQueueSize: 100,
    heartbeatIntervalMs: 5000,
  },
  logger  // Required: Logger instance for debug/error output
);

Testing Configuration: delayMultiplier

The delayMultiplier configuration option allows tests to run significantly faster by reducing all time-based delays:

Purpose:

Affected delays:

Implementation guidance for queue plugins:

  1. Add to config schema (optional but recommended for development/test backends):
    delayMultiplier: z.number().min(0).max(1).default(1)
      .describe('Delay multiplier (default: 1). Only change for testing purposes.')
    
  2. Apply to all delay calculations:
    // In enqueue()
    const delayMs = options.startDelay * 1000 * (this.config.delayMultiplier ?? 1);
       
    // In retry logic
    const retryDelay = Math.pow(2, job.attempts) * 1000 * (this.config.delayMultiplier ?? 1);
    
  3. Update tests to use faster delays:
    const queue = new InMemoryQueue('test-queue', {
      concurrency: 10,
      maxQueueSize: 100,
      delayMultiplier: 0.01, // 100x faster for testing
    });
       
    // Wait 50ms instead of 5s
    await new Promise(r => setTimeout(r, 50));
    

Note: Production queue backends like BullMQ may not need delayMultiplier since delays are typically handled by the external system (Redis). The option is most useful for in-process queue implementations used in development and testing.

Consumer Group State Tracking

interface ConsumerGroupState<T> {
  consumers: Array<{
    id: string;
    handler: QueueConsumer<T>;
    maxRetries: number;
  }>;
  nextConsumerIndex: number;      // For round-robin
  processedJobIds: Set<string>;   // Track processed jobs per group
}

private consumerGroups = new Map<string, ConsumerGroupState<T>>();

Job Selection Logic

private async processNext(): Promise<void> {
  const now = new Date();

  // For each consumer group, find an unprocessed job
  for (const [groupId, groupState] of this.consumerGroups.entries()) {
    // Find next unprocessed job that is available (not delayed)
    const job = this.jobs.find(
      (j) => !groupState.processedJobIds.has(j.id) && j.availableAt <= now
    );

    if (!job) continue;

    // Mark as processed by this group
    groupState.processedJobIds.add(job.id);

    // Select consumer via round-robin
    const consumerIndex = groupState.nextConsumerIndex % groupState.consumers.length;
    const selectedConsumer = groupState.consumers[consumerIndex];
    groupState.nextConsumerIndex++;

    // Process asynchronously
    void this.processJob(job, selectedConsumer, groupId, groupState);
  }

  // Remove fully processed jobs (all groups have seen them)
  this.jobs = this.jobs.filter((job) => {
    return ![...this.consumerGroups.values()].every((group) =>
      group.processedJobIds.has(job.id)
    );
  });
}

Retry with Exponential Backoff

private async processJob(job, consumer, groupId, groupState): Promise<void> {
  await this.semaphore.acquire();
  this.processing++;

  let isRetrying = false;

  try {
    await consumer.handler(job);
    this.stats.completed++;
  } catch (error) {
    if (job.attempts < consumer.maxRetries) {
      job.attempts++;
      isRetrying = true;

      // Remove from processed set to allow retry
      groupState.processedJobIds.delete(job.id);

      // Re-add to queue
      this.jobs.splice(insertIndex, 0, job);

      // Schedule retry with exponential backoff
      const delay = Math.pow(2, job.attempts) * 1000;
      setTimeout(() => {
        if (!this.stopped) void this.processNext();
      }, delay);
    } else {
      this.stats.failed++;
    }
  } finally {
    this.processing--;
    this.semaphore.release();

    // Continue processing (unless this is a retry)
    if (!isRetrying && !this.stopped) {
      void this.processNext();
    }
  }
}

Key Takeaways:

Example 2: Redis-Based Queue Plugin (Conceptual)

For distributed deployments, a Redis-based plugin provides:

Conceptual structure:

class RedisQueue<T> implements Queue<T> {
  // Redis keys:
  // - job:{queue}:{jobId} → Job data (hash)
  // - queue:{queue}:pending → Sorted set (score = -priority)
  // - queue:{queue}:delayed → Sorted set (score = availableAt timestamp)
  // - queue:{queue}:processing:{group} → Set of job IDs currently processing
  // - queue:{queue}:processed:{group} → Set of job IDs processed by group
  // - queue:{queue}:consumers:{group} → List of consumer IDs

  async enqueue(data, options) {
    // 1. Check for duplicate jobId
    // 2. Store job data as hash
    // 3. Add to pending or delayed sorted set
    // 4. Publish notification to consumers
  }

  async consume(consumer, options) {
    // 1. Register consumer in group list
    // 2. Start polling loop with BLPOP or Pub/Sub
    // 3. Claim job atomically using Lua script
    // 4. Process and update state
  }

  // Use Lua scripts for atomic operations (claim, retry, complete)
}

Advantages:

Example 3: Common Usage Patterns

Pattern 1: Periodic Health Checks (Using Native Recurring Jobs)

// In healthcheck-backend plugin
async function scheduleHealthCheck(queueManager: QueueManager, configId: string, systemId: string, intervalSeconds: number) {
  const queue = queueManager.getQueue<HealthCheckData>('health-checks');
  
  // Use deterministic job ID for deduplication
  const jobId = `healthcheck:${configId}:${systemId}`;
  
  // Schedule as recurring job - the queue handles rescheduling automatically
  await queue.scheduleRecurring(
    jobId,
    { configId, systemId },
    intervalSeconds,
    { startDelay: 0 } // Run immediately the first time
  );
}

// Consumer (work-queue mode - only one instance runs each check)
const queue = queueManager.getQueue<HealthCheckData>('health-checks');
await queue.consume(async (job) => {
  const { configId, systemId } = job.data;
  
  // Execute health check
  const result = await runHealthCheck(configId, systemId);
  
  // Store result
  await saveHealthCheckResult(systemId, result);
  
  // No need to manually reschedule - scheduleRecurring handles this!
}, {
  consumerGroup: 'health-checks',  // Shared group = work queue
  maxRetries: 0,                    // Don't retry (next interval will run anyway)
});

Pattern 2: Distributed Access Sync

// Broadcast pattern - all instances must sync
const queue = queueManager.getQueue<AccessRuleSyncData>('access-sync');

await queue.consume(async (job) => {
  const { userId, roles } = job.data;
  
  // Update local access cache
  await accessCache.refresh(userId, roles);
}, {
  consumerGroup: `access-sync:${instanceId}`, // Unique per instance = broadcast
  maxRetries: 3,
});

// Trigger sync from one instance
await queue.enqueue({ userId: '123', roles: ['admin', 'editor'] });
// All instances receive and process this

Pattern 3: Background Data Export

// High-priority work queue for user-triggered exports
const queue = queueManager.getQueue<ExportJob>('exports');

await queue.consume(async (job) => {
  const { userId, systemIds, format } = job.data;
  
  // Generate export file
  const filePath = await generateExport(systemIds, format);
  
  // Notify user
  await sendExportReady(userId, filePath);
}, {
  consumerGroup: 'exports',  // Work queue - any instance can handle
  maxRetries: 3,
});

// User triggers export (high priority)
await queue.enqueue(
  { userId, systemIds, format: 'csv' },
  { priority: 100 } // Higher than routine tasks
);

Best Practices

Define Consumers in afterPluginsReady

Queue consumers should be defined in the afterPluginsReady lifecycle hook rather than init. This ensures:

  1. Access to emitHook: The emitHook function for emitting integration events is only available in afterPluginsReady
  2. All plugins initialized: Other plugins’ routers and services are available for RPC calls
  3. Consistent pattern: All queue setup happens in one place, making the code easier to understand

Recommended pattern:

env.registerInit({
  schema,
  deps: {
    queueManager: coreServices.queueManager,
    signalService: coreServices.signalService,
    // ... other deps
  },
  init: async ({ database, rpc }) => {
    // Phase 2: Register routers and services
    // Do NOT set up queue consumers here
    rpc.registerRouter(router, contract);
  },
  afterPluginsReady: async ({ queueManager, signalService, emitHook }) => {
    // Phase 3: Set up queue consumers and schedule recurring jobs
    const queue = queueManager.getQueue<MyJobPayload>('my-queue');

    // Consumer has access to emitHook for integration events
    await queue.consume(
      async (job) => {
        // Process job...
        const result = await processJob(job.data);

        // Emit integration event (only available in afterPluginsReady!)
        await emitHook(myHooks.jobCompleted, {
          jobId: job.id,
          result,
        });

        // Send real-time signal
        await signalService.broadcast(MY_JOB_COMPLETED, { jobId: job.id });
      },
      {
        consumerGroup: 'my-worker',
        maxRetries: 3,
      }
    );

    // Schedule recurring job
    await queue.scheduleRecurring({}, {
      jobId: 'my-recurring-job',
      intervalSeconds: 60,
    });
  },
});

Why not init? The emitHook function is not available during init because the integration event system hasn’t finished initializing. Defining consumers in init means you cannot emit integration events from job handlers.


Summary

The Checkstack Queue system provides a powerful, flexible foundation for asynchronous task processing:

By implementing the QueuePlugin interface, you can integrate any backend (Redis, RabbitMQ, AWS SQS, etc.) while maintaining a consistent API for the rest of your application.

For reference implementations, see: