Core Concepts

Learn the fundamental building blocks and concepts behind Cleo's distributed task processing system. This guide covers basic concepts, queue classes, group processing strategies, monitoring, and error handling.

🧠 Core Concepts - The Building Blocks of Cleo

🎯 Queue Classes

The @QueueClass decorator allows you to define service classes with shared queue configuration:

@QueueClass({
  defaultOptions: {
    priority: TaskPriority.NORMAL,
    maxRetries: 3,
    retryDelay: 1000,
    group: "user1",
  },
  queue: "notifications",
})
class NotificationService {
  async sendPushNotification(data: { message: string }) {
    // Push notification logic
  }
 
  async sendSMS(data: { message: string }) {
    // SMS sending logic
  }
 
  async sendEmail(data: { message: string }) {
    // Email sending logic
  }
}

🔄 Group Processing Strategies

Cleo supports different strategies for processing grouped tasks:

Round Robin Strategy

queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.ROUND_ROBIN);

FIFO (First In, First Out) Strategy

queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.FIFO);

Priority-based Strategy

queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.PRIORITY);
 
// Set group priorities
await queueManager.setGroupPriority("user1", 1);
await queueManager.setGroupPriority("user2", 10); // Higher priority

📊 Monitoring and Statistics

// Get group statistics
const user1Stats = await (await queueManager.getGroup("user1")).getStats();
const user2Stats = await (await queueManager.getGroup("user2")).getStats();
 
console.log("User 1 Stats:", user1Stats);
console.log("User 2 Stats:", user2Stats);

🎯 Error Handling

Tasks can be configured to handle errors and retries:

@QueueClass({
  defaultOptions: {
    maxRetries: 3,
    retryDelay: 1000,
  }
})
class ErrorHandlingService {
  async sendErrorTask(data: { message: string }) {
    throw new Error('Simulated error in task');
  }
}