The Checkstack Queue system provides a pluggable, type-safe infrastructure for managing asynchronous tasks and distributed events. It is designed to scale from simple in-memory development environments to multi-node production clusters.
The Queue system is Checkstack’s abstraction for asynchronous task processing. It provides:
A job represents a unit of work to be processed asynchronously. Each job contains:
interface QueueJob<T = unknown> {
id: string; // Unique identifier
data: T; // The payload to process
priority?: number; // Higher values = processed first (default: 0)
timestamp: Date; // When the job was enqueued
attempts?: number; // Current retry count
}
A queue is a named channel for jobs of a specific type. Queues handle:
Consumer groups determine how messages are distributed across multiple instances:
consumerGroup ID. Each message is delivered to only one consumer in the group (load balancing).consumerGroup ID (e.g., suffixed with instance ID). Every consumer receives a copy of every message.Jobs can be assigned a numeric priority (higher = more urgent). The queue processes higher-priority jobs before lower-priority ones, regardless of enqueue order.
Use the Queue system when you need:
The Queue system is built on a factory pattern that allows different backends to be used transparently.
The QueuePlugin interface defines how to create queue instances:
interface QueuePlugin<Config = unknown> {
id: string; // Unique plugin identifier (e.g., "memory", "redis")
displayName: string; // Human-readable name
description?: string; // Optional description
configVersion: number; // Current config schema version
configSchema: z.ZodType<Config>; // Zod schema for validation
migrations?: MigrationChain<Config>; // Optional version migrations
createQueue<T>(name: string, config: Config): Queue<T>;
}
All queue plugins must implement this interface to be compatible with the system.
A central registry where queue plugins are registered at startup:
interface QueuePluginRegistry {
register(plugin: QueuePlugin<unknown>): void;
getPlugin(id: string): QueuePlugin<unknown> | undefined;
getPlugins(): QueuePlugin<unknown>[];
}
Plugins are typically registered in their backend plugin’s register lifecycle hook.
The QueueManager service is responsible for managing queue instances and backend switching:
interface QueueManager {
// Get or create a queue proxy (synchronous, returns stable reference)
getQueue<T>(name: string): Queue<T>;
// Active backend management
getActivePlugin(): string;
setActiveBackend(pluginId: string, config: unknown): Promise<void>;
// Multi-instance coordination
startPolling(intervalMs?: number): void;
stopPolling(): void;
// Monitoring and status
getInFlightJobCount(): Promise<number>;
listAllRecurringJobs(): Promise<RecurringJobDetails[]>;
// Graceful shutdown
shutdown(): Promise<void>;
}
Applications use QueueManager.getQueue() to obtain queue references without knowing which backend is active. The returned queue is actually a QueueProxy that provides stable references across backend switches.
The QueueProxy class wraps actual queue implementations to provide:
class QueueProxy<T> implements Queue<T> {
// All Queue methods delegate to the underlying implementation
// On backend switch, subscriptions are automatically replayed
async switchDelegate(newQueue: Queue<T>): Promise<void>;
}
The Queue interface is the primary interaction point:
interface Queue<T = unknown> {
enqueue(
data: T,
options?: {
priority?: number;
startDelay?: number; // Delay before job becomes available (renamed from delaySeconds)
jobId?: string; // For deduplication
}
): Promise<string>;
consume(
consumer: QueueConsumer<T>,
options: ConsumeOptions
): Promise<void>;
// Recurring job management (native support)
scheduleRecurring(
jobId: string,
data: T,
intervalSeconds: number,
options?: { startDelay?: number }
): Promise<void>;
cancelRecurring(jobId: string): Promise<void>;
listRecurringJobs(): Promise<string[]>;
getRecurringJobDetails(jobId: string): Promise<RecurringJobDetails | undefined>;
// Monitoring
getInFlightCount(): Promise<number>;
stop(): Promise<void>;
getStats(): Promise<QueueStats>;
testConnection(): Promise<void>;
}
The queue maintains state for each consumer group:
interface ConsumerGroupState<T> {
consumers: Array<{
id: string;
handler: QueueConsumer<T>;
maxRetries: number;
}>;
nextConsumerIndex: number; // For round-robin selection
processedJobIds: Set<string>; // Track which jobs this group has seen
}
Routing Rules:
Jobs are stored in priority order (higher priority first). When multiple jobs are pending:
Jobs can be enqueued with a startDelay option (in seconds):
await queue.enqueue(data, { startDelay: 60 }); // Available in 60 seconds
Implementation:
availableAt timestamp (current time + delay)availableAt > now during selectionWhen a consumer throws an error:
attempts counter is incrementedattempts < maxRetries, the job is re-queued2^attempts * 1000ms (exponential backoff)attempts >= maxRetries, the job is marked as failedExample backoff sequence:
When enqueuing with a custom jobId:
await queue.enqueue(data, { jobId: 'healthcheck:system-123' });
Behavior:
Queue implementations typically use a semaphore to limit concurrent job processing:
const semaphore = new Semaphore(config.concurrency); // e.g., 10
async function processJob(job) {
await semaphore.acquire();
try {
await handler(job);
} finally {
semaphore.release();
}
}
This prevents resource exhaustion and allows fine-tuning of throughput.
The Queue system provides at-least-once delivery:
maxRetries settingsWhen using scheduleRecurring(), jobs run on a fixed wall-clock interval (same as BullMQ):
T=0: Job A starts (intervalSeconds = 1)
T=1: Job B starts (Job A still running)
T=2: Job C starts (Job A still running, Job B running)
T=3: Job A completes
Key implications:
intervalSeconds, multiple jobs may run concurrentlyscheduleRecurring() with an existing jobId cancels the old interval and starts a new one with the updated configurationRecommendation: Set
intervalSecondsto a value greater than the expected maximum execution time to avoid job accumulation. For health checks, consider the network timeout plus processing time.
queue.stop() behavior:
This ensures data consistency during application shutdown.
queue.getStats() returns:
interface QueueStats {
pending: number; // Jobs waiting to be processed
processing: number; // Jobs currently being handled
completed: number; // Total successful jobs (lifetime)
failed: number; // Total failed jobs after all retries (lifetime)
consumerGroups: number; // Number of active consumer groups
}
Use these metrics for monitoring, alerting, and capacity planning.
Queue plugins support versioned configurations with automatic migrations:
When loading a config, the system:
configVersionconfigSchemaThis ensures backward compatibility when queue configuration evolves.
This section guides you through creating a custom queue plugin, such as a Redis-based backend.
Use Zod to define the plugin’s configuration:
import { z } from 'zod';
const redisConfigSchema = z.object({
host: z.string().default('localhost').describe('Redis server hostname'),
port: z.number().min(1).max(65535).default(6379).describe('Redis server port'),
password: z.string().optional().describe('Redis password (if required)'),
db: z.number().min(0).default(0).describe('Redis database number'),
maxRetries: z.number().min(0).default(3).describe('Default max retries for jobs'),
});
export type RedisQueueConfig = z.infer<typeof redisConfigSchema>;
Create a class that implements the Queue<T> interface:
import { Queue, QueueJob, QueueConsumer, ConsumeOptions, QueueStats } from '@checkstack/queue-api';
export class RedisQueue<T> implements Queue<T> {
private redis: Redis;
private stopped = false;
constructor(private name: string, private config: RedisQueueConfig) {
// Initialize Redis connection
this.redis = new Redis({
host: config.host,
port: config.port,
password: config.password,
db: config.db,
});
}
async enqueue(
data: T,
options?: { priority?: number; delaySeconds?: number; jobId?: string }
): Promise<string> {
const jobId = options?.jobId ?? crypto.randomUUID();
const job: QueueJob<T> = {
id: jobId,
data,
priority: options?.priority ?? 0,
timestamp: new Date(),
attempts: 0,
};
// Check for duplicates
if (options?.jobId) {
const exists = await this.redis.exists(`job:${this.name}:${jobId}`);
if (exists) return jobId;
}
// Store job data
await this.redis.set(`job:${this.name}:${jobId}`, JSON.stringify(job));
// Add to priority queue
const score = options?.priority ?? 0;
const member = jobId;
if (options?.delaySeconds) {
const availableAt = Date.now() + options.delaySeconds * 1000;
await this.redis.zadd(`delayed:${this.name}`, availableAt, member);
} else {
await this.redis.zadd(`queue:${this.name}`, -score, member); // Negative for descending
}
return jobId;
}
async consume(consumer: QueueConsumer<T>, options: ConsumeOptions): Promise<void> {
// Register consumer and start polling loop
// Implementation details omitted for brevity
}
async stop(): Promise<void> {
this.stopped = true;
await this.redis.quit();
}
async getStats(): Promise<QueueStats> {
const pending = await this.redis.zcard(`queue:${this.name}`);
const delayed = await this.redis.zcard(`delayed:${this.name}`);
// Implementation details omitted
return { pending: pending + delayed, processing: 0, completed: 0, failed: 0, consumerGroups: 0 };
}
}
Create the plugin class:
import { QueuePlugin } from '@checkstack/queue-api';
export class RedisQueuePlugin implements QueuePlugin<RedisQueueConfig> {
id = 'redis';
displayName = 'Redis Queue';
description = 'Production-ready queue backed by Redis';
configVersion = 1;
configSchema = redisConfigSchema;
createQueue<T>(name: string, config: RedisQueueConfig): Queue<T> {
return new RedisQueue<T>(name, config);
}
}
In your backend plugin’s register lifecycle:
import { createBackendPlugin, coreServices } from '@checkstack/backend-api';
import { RedisQueuePlugin } from './redis-queue-plugin';
import { pluginMetadata } from './plugin-metadata';
export default createBackendPlugin({
metadata: pluginMetadata,
register(env) {
env.registerInit({
deps: {
logger: coreServices.logger,
queueRegistry: coreServices.queuePluginRegistry,
},
init: async ({ logger, queueRegistry }) => {
logger.debug('🔌 Registering Redis Queue Plugin...');
queueRegistry.register(new RedisQueuePlugin());
},
});
},
});
To support consumer groups properly:
In Redis, you might use:
SET for each group’s processed job IDs: processed:{queue}:{group}LIST for consumers in each group: consumers:{queue}:{group}HASH for consumer metadata (max retries, etc.)Create comprehensive tests covering:
Example test structure:
import { describe, test, expect } from 'bun:test';
import { RedisQueue } from './redis-queue';
describe('RedisQueue', () => {
test('should process jobs in priority order', async () => {
const queue = new RedisQueue('test', defaultConfig);
const processed: number[] = [];
await queue.consume(async (job) => {
processed.push(job.data as number);
}, { consumerGroup: 'test-group' });
await queue.enqueue(1, { priority: 5 });
await queue.enqueue(2, { priority: 10 });
await queue.enqueue(3, { priority: 1 });
await sleep(100);
expect(processed).toEqual([2, 1, 3]); // Highest priority first
});
// More tests...
});
The InMemoryQueue is a complete, production-ready implementation suitable for single-instance deployments. Let’s examine key implementation details:
const configSchema = z.object({
concurrency: z
.number()
.min(1)
.max(100)
.default(10)
.describe('Maximum number of concurrent jobs to process'),
maxQueueSize: z
.number()
.min(1)
.default(10_000)
.describe('Maximum number of jobs that can be queued'),
delayMultiplier: z
.number()
.min(0)
.max(1)
.default(1)
.describe('Delay multiplier (default: 1). Only change for testing purposes - set to 0.01 for 100x faster test execution.'),
heartbeatIntervalMs: z
.number()
.min(0)
.default(5000)
.describe('Interval for heartbeat checks that recover jobs after system sleep/wake (0 to disable).'),
});
The InMemoryQueue constructor requires a Logger instance for diagnostic output:
import { InMemoryQueue } from '@checkstack/queue-memory-backend';
import type { Logger } from '@checkstack/backend-api';
const queue = new InMemoryQueue<MyJobData>(
'my-queue-name',
{
concurrency: 10,
maxQueueSize: 100,
heartbeatIntervalMs: 5000,
},
logger // Required: Logger instance for debug/error output
);
The delayMultiplier configuration option allows tests to run significantly faster by reducing all time-based delays:
Purpose:
delayMultiplier defaults to 1 (normal delays)0.01 for 100x faster execution (2s delay → 20ms)Affected delays:
startDelay in enqueue() - Jobs become available soonerintervalSeconds in recurring jobs - Jobs repeat more quicklyImplementation guidance for queue plugins:
delayMultiplier: z.number().min(0).max(1).default(1)
.describe('Delay multiplier (default: 1). Only change for testing purposes.')
// In enqueue()
const delayMs = options.startDelay * 1000 * (this.config.delayMultiplier ?? 1);
// In retry logic
const retryDelay = Math.pow(2, job.attempts) * 1000 * (this.config.delayMultiplier ?? 1);
const queue = new InMemoryQueue('test-queue', {
concurrency: 10,
maxQueueSize: 100,
delayMultiplier: 0.01, // 100x faster for testing
});
// Wait 50ms instead of 5s
await new Promise(r => setTimeout(r, 50));
Note: Production queue backends like BullMQ may not need
delayMultipliersince delays are typically handled by the external system (Redis). The option is most useful for in-process queue implementations used in development and testing.
interface ConsumerGroupState<T> {
consumers: Array<{
id: string;
handler: QueueConsumer<T>;
maxRetries: number;
}>;
nextConsumerIndex: number; // For round-robin
processedJobIds: Set<string>; // Track processed jobs per group
}
private consumerGroups = new Map<string, ConsumerGroupState<T>>();
private async processNext(): Promise<void> {
const now = new Date();
// For each consumer group, find an unprocessed job
for (const [groupId, groupState] of this.consumerGroups.entries()) {
// Find next unprocessed job that is available (not delayed)
const job = this.jobs.find(
(j) => !groupState.processedJobIds.has(j.id) && j.availableAt <= now
);
if (!job) continue;
// Mark as processed by this group
groupState.processedJobIds.add(job.id);
// Select consumer via round-robin
const consumerIndex = groupState.nextConsumerIndex % groupState.consumers.length;
const selectedConsumer = groupState.consumers[consumerIndex];
groupState.nextConsumerIndex++;
// Process asynchronously
void this.processJob(job, selectedConsumer, groupId, groupState);
}
// Remove fully processed jobs (all groups have seen them)
this.jobs = this.jobs.filter((job) => {
return ![...this.consumerGroups.values()].every((group) =>
group.processedJobIds.has(job.id)
);
});
}
private async processJob(job, consumer, groupId, groupState): Promise<void> {
await this.semaphore.acquire();
this.processing++;
let isRetrying = false;
try {
await consumer.handler(job);
this.stats.completed++;
} catch (error) {
if (job.attempts < consumer.maxRetries) {
job.attempts++;
isRetrying = true;
// Remove from processed set to allow retry
groupState.processedJobIds.delete(job.id);
// Re-add to queue
this.jobs.splice(insertIndex, 0, job);
// Schedule retry with exponential backoff
const delay = Math.pow(2, job.attempts) * 1000;
setTimeout(() => {
if (!this.stopped) void this.processNext();
}, delay);
} else {
this.stats.failed++;
}
} finally {
this.processing--;
this.semaphore.release();
// Continue processing (unless this is a retry)
if (!isRetrying && !this.stopped) {
void this.processNext();
}
}
}
Key Takeaways:
For distributed deployments, a Redis-based plugin provides:
Conceptual structure:
class RedisQueue<T> implements Queue<T> {
// Redis keys:
// - job:{queue}:{jobId} → Job data (hash)
// - queue:{queue}:pending → Sorted set (score = -priority)
// - queue:{queue}:delayed → Sorted set (score = availableAt timestamp)
// - queue:{queue}:processing:{group} → Set of job IDs currently processing
// - queue:{queue}:processed:{group} → Set of job IDs processed by group
// - queue:{queue}:consumers:{group} → List of consumer IDs
async enqueue(data, options) {
// 1. Check for duplicate jobId
// 2. Store job data as hash
// 3. Add to pending or delayed sorted set
// 4. Publish notification to consumers
}
async consume(consumer, options) {
// 1. Register consumer in group list
// 2. Start polling loop with BLPOP or Pub/Sub
// 3. Claim job atomically using Lua script
// 4. Process and update state
}
// Use Lua scripts for atomic operations (claim, retry, complete)
}
Advantages:
// In healthcheck-backend plugin
async function scheduleHealthCheck(queueManager: QueueManager, configId: string, systemId: string, intervalSeconds: number) {
const queue = queueManager.getQueue<HealthCheckData>('health-checks');
// Use deterministic job ID for deduplication
const jobId = `healthcheck:${configId}:${systemId}`;
// Schedule as recurring job - the queue handles rescheduling automatically
await queue.scheduleRecurring(
jobId,
{ configId, systemId },
intervalSeconds,
{ startDelay: 0 } // Run immediately the first time
);
}
// Consumer (work-queue mode - only one instance runs each check)
const queue = queueManager.getQueue<HealthCheckData>('health-checks');
await queue.consume(async (job) => {
const { configId, systemId } = job.data;
// Execute health check
const result = await runHealthCheck(configId, systemId);
// Store result
await saveHealthCheckResult(systemId, result);
// No need to manually reschedule - scheduleRecurring handles this!
}, {
consumerGroup: 'health-checks', // Shared group = work queue
maxRetries: 0, // Don't retry (next interval will run anyway)
});
// Broadcast pattern - all instances must sync
const queue = queueManager.getQueue<AccessRuleSyncData>('access-sync');
await queue.consume(async (job) => {
const { userId, roles } = job.data;
// Update local access cache
await accessCache.refresh(userId, roles);
}, {
consumerGroup: `access-sync:${instanceId}`, // Unique per instance = broadcast
maxRetries: 3,
});
// Trigger sync from one instance
await queue.enqueue({ userId: '123', roles: ['admin', 'editor'] });
// All instances receive and process this
// High-priority work queue for user-triggered exports
const queue = queueManager.getQueue<ExportJob>('exports');
await queue.consume(async (job) => {
const { userId, systemIds, format } = job.data;
// Generate export file
const filePath = await generateExport(systemIds, format);
// Notify user
await sendExportReady(userId, filePath);
}, {
consumerGroup: 'exports', // Work queue - any instance can handle
maxRetries: 3,
});
// User triggers export (high priority)
await queue.enqueue(
{ userId, systemIds, format: 'csv' },
{ priority: 100 } // Higher than routine tasks
);
afterPluginsReadyQueue consumers should be defined in the afterPluginsReady lifecycle hook rather than init. This ensures:
emitHook: The emitHook function for emitting integration events is only available in afterPluginsReadyRecommended pattern:
env.registerInit({
schema,
deps: {
queueManager: coreServices.queueManager,
signalService: coreServices.signalService,
// ... other deps
},
init: async ({ database, rpc }) => {
// Phase 2: Register routers and services
// Do NOT set up queue consumers here
rpc.registerRouter(router, contract);
},
afterPluginsReady: async ({ queueManager, signalService, emitHook }) => {
// Phase 3: Set up queue consumers and schedule recurring jobs
const queue = queueManager.getQueue<MyJobPayload>('my-queue');
// Consumer has access to emitHook for integration events
await queue.consume(
async (job) => {
// Process job...
const result = await processJob(job.data);
// Emit integration event (only available in afterPluginsReady!)
await emitHook(myHooks.jobCompleted, {
jobId: job.id,
result,
});
// Send real-time signal
await signalService.broadcast(MY_JOB_COMPLETED, { jobId: job.id });
},
{
consumerGroup: 'my-worker',
maxRetries: 3,
}
);
// Schedule recurring job
await queue.scheduleRecurring({}, {
jobId: 'my-recurring-job',
intervalSeconds: 60,
});
},
});
Why not
init? TheemitHookfunction is not available duringinitbecause the integration event system hasn’t finished initializing. Defining consumers ininitmeans you cannot emit integration events from job handlers.
The Checkstack Queue system provides a powerful, flexible foundation for asynchronous task processing:
By implementing the QueuePlugin interface, you can integrate any backend (Redis, RabbitMQ, AWS SQS, etc.) while maintaining a consistent API for the rest of your application.
For reference implementations, see:
InMemoryQueue - Complete working exampleQueue API - Core interfacesQueuePlugin Interface - Plugin contract