Core Concepts
Learn the fundamental building blocks and concepts behind Cleo's distributed task processing system. This guide covers basic concepts, queue classes, group processing strategies, monitoring, and error handling.
🧠 Core Concepts - The Building Blocks of Cleo
🎯 Queue Classes
The @QueueClass
decorator allows you to define service classes with shared queue configuration:
@QueueClass({
defaultOptions: {
priority: TaskPriority.NORMAL,
maxRetries: 3,
retryDelay: 1000,
group: "user1",
},
queue: "notifications",
})
class NotificationService {
async sendPushNotification(data: { message: string }) {
// Push notification logic
}
async sendSMS(data: { message: string }) {
// SMS sending logic
}
async sendEmail(data: { message: string }) {
// Email sending logic
}
}
🔄 Group Processing Strategies
Cleo supports different strategies for processing grouped tasks:
Round Robin Strategy
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.ROUND_ROBIN);
FIFO (First In, First Out) Strategy
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.FIFO);
Priority-based Strategy
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.PRIORITY);
// Set group priorities
await queueManager.setGroupPriority("user1", 1);
await queueManager.setGroupPriority("user2", 10); // Higher priority
📊 Monitoring and Statistics
// Get group statistics
const user1Stats = await (await queueManager.getGroup("user1")).getStats();
const user2Stats = await (await queueManager.getGroup("user2")).getStats();
console.log("User 1 Stats:", user1Stats);
console.log("User 2 Stats:", user2Stats);
🎯 Error Handling
Tasks can be configured to handle errors and retries:
@QueueClass({
defaultOptions: {
maxRetries: 3,
retryDelay: 1000,
}
})
class ErrorHandlingService {
async sendErrorTask(data: { message: string }) {
throw new Error('Simulated error in task');
}
}