Best Practices Guide
Learn the recommended patterns and practices for building robust distributed task processing systems with Cleo, including advanced task grouping, event handling, and monitoring strategies.
🎯 Best Practices
Task Design
1. Use Task Decorators Effectively
class EmailService {
@task({
id: "send-email",
priority: TaskPriority.HIGH,
queue: 'notifications',
group: 'emails',
// Configure timeouts based on operation
timeout: 30000,
// Use exponential backoff for retries
maxRetries: 3,
backoff: {
type: "exponential",
delay: 2000,
},
})
async sendEmail(data: { email: string, template: string }) {
// Email sending logic
}
}
2. Organize with Queue Classes
@QueueClass({
defaultOptions: {
maxRetries: 3,
retryDelay: 1000,
backoff: {
type: "exponential",
delay: 2000,
},
group: "notifications",
timeout: 300000,
},
queue: "notifications",
})
class NotificationService {
@task({
id: "send-push",
priority: TaskPriority.HIGH,
})
async sendPushNotification(data: { userId: string, message: string }) {
// Push notification logic
}
@task({
id: "send-sms",
priority: TaskPriority.NORMAL,
})
async sendSMS(data: { phone: string, message: string }) {
// SMS sending logic
}
}
3. Implement Smart Group Processing
// Choose strategy based on requirements
const queueManager = cleo.getQueueManager();
// For fair distribution with time tracking
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.ROUND_ROBIN);
// For strict ordering with history
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.FIFO);
// For dynamic priority adjustment
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.PRIORITY);
await queueManager.setGroupPriority("premium-users", 10);
await queueManager.setGroupPriority("free-users", 1);
// Monitor group operations
queueManager.onTaskEvent(ObserverEvent.GROUP_CHANGE, (taskId, status, data) => {
console.log(`Group operation for ${taskId}:`, {
operation: data.operation,
group: data.group,
history: data.history
});
});
Monitoring & Observability
1. Implement Comprehensive Event Handling
const queueManager = cleo.getQueueManager();
// Monitor task lifecycle with filtering
queueManager.onTaskEvent(ObserverEvent.TASK_COMPLETED, (taskId, status, data) => {
if (data?.group === 'premium-users') {
console.log(`Premium task ${taskId} completed:`, {
result: data?.result,
duration: data?.duration
});
}
});
// Track failures with retry information
queueManager.onTaskEvent(ObserverEvent.TASK_FAILED, (taskId, status, data) => {
console.error(`Task ${taskId} failed:`, {
error: data?.error,
retryCount: data?.retryCount,
nextRetry: data?.nextRetry,
group: data?.group
});
});
// Monitor group state changes
queueManager.onTaskEvent(ObserverEvent.GROUP_STATE_CHANGE, (taskId, status, data) => {
console.log(`Group ${data.group} state changed:`, {
previousState: data.previousState,
newState: data.newState,
reason: data.reason
});
});
2. Track Comprehensive Metrics
async function monitorSystemHealth() {
// Get group statistics
const group = await queueManager.getGroup("premium-users");
const stats = await group.getStats();
console.log("Group Performance:", {
total: stats.total,
active: stats.active,
completed: stats.completed,
failed: stats.failed,
paused: stats.paused
});
// Get worker metrics
const worker = cleo.getWorker("notifications");
const metrics = await worker.getMetrics();
console.log("Worker Performance:", {
tasksProcessed: metrics.tasksProcessed,
tasksSucceeded: metrics.tasksSucceeded,
tasksFailed: metrics.tasksFailed,
averageProcessingTime: metrics.averageProcessingTime
});
// Get task history
const taskHistory = await worker.getTaskHistory();
console.log("Recent Task History:", taskHistory.map(entry => ({
taskId: entry.taskId,
status: entry.status,
duration: entry.duration,
timestamp: entry.timestamp,
group: entry.group
})));
}
Production Deployment
1. Configure Redis with Security
cleo.configure({
redis: {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT || "6379"),
password: process.env.REDIS_PASSWORD,
tls: true,
maxRetriesPerRequest: 3,
// Enable key prefix for multi-tenant setups
keyPrefix: process.env.REDIS_KEY_PREFIX,
// Enable TLS for secure connections
tls: {
rejectUnauthorized: true,
ca: process.env.REDIS_CA_CERT
}
},
});
2. Implement Graceful Shutdown
async function gracefulShutdown() {
console.log('Initiating graceful shutdown...');
// Stop accepting new tasks
await cleo.pause();
// Get active tasks
const workers = await cleo.getQueueManager().getAllWorkers();
for (const workerId of workers) {
const worker = await cleo.getWorkerManager().getWorker(workerId);
const activeTasks = await worker.getActiveTasks();
console.log(`Waiting for ${activeTasks.length} tasks to complete on worker ${workerId}`);
}
// Wait for active tasks with timeout
await Promise.race([
cleo.waitForCompletion(),
new Promise(resolve => setTimeout(resolve, 30000))
]);
// Clean up resources
await cleo.getQueueManager().close();
process.exit(0);
}
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);
Best Practices Checklist
-
Task Design
- Use task decorators with appropriate options
- Implement queue classes for organization
- Configure smart group processing strategies
- Set up proper event filtering
-
Monitoring
- Implement comprehensive event handling
- Track group and worker metrics
- Monitor task history
- Set up alerting for failures
-
Production
- Configure Redis securely
- Implement robust graceful shutdown
- Set up monitoring and logging
- Plan for scaling and failover
-
Error Handling
- Configure smart retry strategies
- Implement comprehensive error logging
- Use try-catch blocks effectively
- Handle edge cases and timeouts
-
Group Processing
- Choose appropriate processing strategies
- Monitor group operations
- Track group statistics
- Handle group state changes
-
Performance
- Configure appropriate timeouts
- Use efficient batch processing
- Monitor processing times
- Optimize resource usage