Best Practices Guide
Learn the recommended patterns and practices for building robust distributed task processing systems with Cleo.
🎯 Best Practices
Task Design
1. Use Queue Classes for Organization
@QueueClass({
defaultOptions: {
priority: TaskPriority.NORMAL,
maxRetries: 3,
retryDelay: 1000,
group: "notifications",
},
queue: "notifications",
})
class NotificationService {
@task({
id: "send-email",
priority: TaskPriority.HIGH,
})
async sendEmail(data: { email: string; template: string }) {
// Email sending logic
}
@task({
id: "send-sms",
priority: TaskPriority.NORMAL,
})
async sendSMS(data: { phone: string; message: string }) {
// SMS sending logic
}
}
2. Implement Proper Error Handling
@QueueClass({
defaultOptions: {
maxRetries: 3,
retryDelay: 1000,
}
})
class RobustService {
@task({
id: "process-order"
})
async processOrder(orderId: string) {
try {
// Processing logic
} catch (error) {
// Log error details
throw error; // Let Cleo handle retries
}
}
}
3. Use Group Processing Strategies
// Configure strategy based on use case
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.ROUND_ROBIN);
// For ordered processing within groups
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.FIFO);
// For priority-based processing
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.PRIORITY);
await queueManager.setGroupPriority("vip-users", 10);
Monitoring & Observability
1. Implement Event Listeners
const queueManager = cleo.getQueueManager();
// Monitor task lifecycle
queueManager.onTaskEvent(ObserverEvent.STATUS_CHANGE, (taskId, status, data) => {
console.log(`Task ${taskId} status changed to ${status}`, data);
});
queueManager.onTaskEvent(ObserverEvent.TASK_FAILED, (taskId, status, error) => {
console.error(`Task ${taskId} failed:`, error);
});
2. Track Group Statistics
async function monitorGroupPerformance() {
const group = await queueManager.getGroup("user1");
const stats = await group.getStats();
console.log("Group Performance:", {
completed: stats.completed,
failed: stats.failed,
avgProcessingTime: stats.avgProcessingTime
});
}
Production Deployment
1. Configure Redis Properly
cleo.configure({
redis: {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT || "6379"),
password: process.env.REDIS_PASSWORD,
tls: true,
maxRetriesPerRequest: 3,
},
});
2. Implement Graceful Shutdown
process.on('SIGTERM', async () => {
console.log('Shutting down gracefully...');
// Stop accepting new tasks
await cleo.pause();
// Wait for active tasks to complete
await cleo.waitForCompletion();
// Clean up resources
await cleo.close();
process.exit(0);
});
Common Pitfalls to Avoid
1. ❌ Don't Mix Task Responsibilities
// ❌ Bad: Task doing too much
@task()
async function processOrderAndSendEmailAndUpdateInventory() {
// Too many responsibilities
}
// ✅ Good: Separate concerns
@task()
async function processOrder() {
// Order processing only
}
@task()
async function sendOrderConfirmation() {
// Email sending only
}
2. ❌ Don't Ignore Error Handling
// ❌ Bad: No error handling
@task()
async function riskyTask() {
await someRiskyOperation();
}
// ✅ Good: Proper error handling
@task({
maxRetries: 3,
retryDelay: 1000,
})
async function robustTask() {
try {
await someRiskyOperation();
} catch (error) {
// Log error details
throw error; // Let Cleo handle retries
}
}
Best Practices Checklist
-
Task Design
- Use Queue Classes for organization
- Implement proper error handling
- Configure appropriate retry strategies
- Use group processing when needed
-
Monitoring
- Set up event listeners
- Track group statistics
- Monitor queue health
- Implement logging
-
Production
- Configure Redis properly
- Implement graceful shutdown
- Set up monitoring
- Plan for scaling
-
Error Handling
- Configure retries appropriately
- Implement error logging
- Use try-catch blocks
- Handle edge cases