Queue Patterns
Common queue patterns expressed as runnable snippets. Each section is independent - pick the one that matches the job shape you need.
Pattern 1: Work queue (load-balanced)
Section titled “Pattern 1: Work queue (load-balanced)”Use a shared consumer group when work should be processed by exactly one instance at a time.
const queue = queueManager.getQueue<EmailJob>('emails');
await queue.consume(async (job) => { const { to, subject, body } = job.data; await sendEmail({ to, subject, body });}, { consumerGroup: 'emails', // Shared across all instances maxRetries: 3,});
// Any instance may pick this upawait queue.enqueue({ to: 'user@example.com', subject: 'Welcome', body: '...' });Key points:
- All instances share the same
consumerGroup - Each message goes to exactly one instance
- Use for background work that should not be duplicated
Pattern 2: Distributed cache sync (broadcast)
Section titled “Pattern 2: Distributed cache sync (broadcast)”Use broadcast pattern when all instances must process each message.
// Unique consumer group per instance = broadcastconst queue = queueManager.getQueue<AccessRuleSyncData>('access-rule-sync');
await queue.consume(async (job) => { const { userId, roles } = job.data; await accessCache.refresh(userId, roles);}, { consumerGroup: `access-rule-sync:${instanceId}`, // Unique! maxRetries: 3,});
// Trigger sync (all instances receive)await queue.enqueue({ userId: '123', roles: ['admin'] });Key points:
- Unique
consumerGroupper instance enables broadcast - Every instance receives every message
- Use for cache invalidation, config updates
Pattern 3: Background jobs with priority
Section titled “Pattern 3: Background jobs with priority”Use priority for user-triggered tasks that should jump the queue.
const queue = queueManager.getQueue<ExportJob>('exports');
await queue.consume(async (job) => { const { userId, systemIds, format } = job.data; const filePath = await generateExport(systemIds, format); await sendExportReady(userId, filePath);}, { consumerGroup: 'exports', maxRetries: 3,});
// User-triggered = high priorityawait queue.enqueue( { userId, systemIds, format: 'csv' }, { priority: 100 });
// Background task = low priorityawait queue.enqueue( { userId: 'system', systemIds: allIds, format: 'csv' }, { priority: 1 });Key points:
- Higher priority number = processed first
- Combine with work-queue for load balancing
Pattern 4: Delayed execution
Section titled “Pattern 4: Delayed execution”Schedule jobs to run after a delay.
// Send reminder email in 24 hoursawait queue.enqueue( { userId, type: 'reminder' }, { startDelay: 86400 } // seconds);
// Rate-limited retriesawait queue.enqueue(data, { startDelay: 60, // Wait 1 minute priority: 5});Pattern 5: Job deduplication
Section titled “Pattern 5: Job deduplication”Prevent duplicate jobs using custom job IDs.
// Only one sync per user at a timeawait queue.enqueue( { userId: '123', data: newData }, { jobId: `sync:user:123` });
// Second call returns existing job ID (no duplicate)await queue.enqueue( { userId: '123', data: newerData }, { jobId: `sync:user:123` });