Core Concepts

Learn the fundamental building blocks and concepts behind Cleo's distributed task processing system. This guide covers task decorators, queue classes, group processing strategies, event handling, and error management.

🧠 Core Concepts - The Building Blocks of Cleo

🎀 Task Decorators

The @task decorator provides fine-grained control over individual task execution:

class EmailService {
  @task({
    id: "send-email",
    priority: TaskPriority.HIGH,
    queue: 'email',
    group: 'notifications',
    timeout: 30000,
    maxRetries: 3,
    backoff: {
      type: 'exponential',
      delay: 1000
    },
    weight: 10,
    removeOnComplete: true
  })
  async sendEmail(input: { email: string }): Promise<string> {
    // Implementation
  }
}

Key decorator options:

  • id: Unique task identifier
  • priority: Task priority level
  • queue: Target queue name
  • group: Group for coordinated processing
  • timeout: Maximum execution time
  • maxRetries: Maximum retry attempts
  • backoff: Retry delay strategy
  • weight: Task priority within group
  • removeOnComplete: Auto-cleanup after completion

🎯 Queue Classes

The @QueueClass decorator organizes related tasks:

@QueueClass({
  defaultOptions: {
    queue: 'notifications',
    group: 'emails',
    maxRetries: 3
  },
  includeInherited: true,
  exclude: ['privateMethod'],
})
class NotificationService {
  // Methods automatically become tasks
}

📊 Group Processing

Task groups enable coordinated processing with multiple strategies:

const group = await queueManager.getGroup('notifications');
 
// Configure group processing
await group.updateConfig({
  concurrency: 5,
  maxConcurrency: 10,
  rateLimit: {
    max: 100,
    duration: 60000
  },
  strategy: GroupProcessingStrategy.PRIORITY,
  timeout: 30000
});
 
// Monitor group stats
const stats = await group.getStats();
console.log(stats); // { total, active, completed, failed, paused }

🎭 Event Handling

Comprehensive event system for task lifecycle management:

const queueManager = cleo.getQueueManager();
 
// Task events
queueManager.onTaskEvent(ObserverEvent.TASK_COMPLETED, 
  (taskId, status, data) => {
    console.log(`Task ${taskId} completed with result:`, data.result);
});
 
queueManager.onTaskEvent(ObserverEvent.TASK_FAILED,
  (taskId, status, data) => {
    console.error(`Task ${taskId} failed:`, data.error);
});
 
// Group events
queueManager.onTaskEvent(ObserverEvent.GROUP_STATE_CHANGE,
  (groupId, status, data) => {
    console.log(`Group ${groupId} state changed to:`, status);
});

🔄 Error Handling

Robust error handling with configurable retry strategies:

@task({
  maxRetries: 3,
  backoff: {
    type: 'exponential',
    delay: 1000 // Doubles each retry: 1s, 2s, 4s
  },
  retryDelay: 5000
})
async function retryableTask() {
  try {
    // Task implementation
  } catch (error) {
    // Error will trigger retry with backoff
    throw error;
  }
}

📈 Monitoring

Built-in monitoring capabilities:

// Worker metrics
const worker = cleo.getWorker('email-queue');
const metrics = await worker.getMetrics();
const history = await worker.getTaskHistory();
 
// Queue metrics
const queueMetrics = await queueManager.getQueueMetrics('email-queue');
 
// Group monitoring
const groupStats = await group.getStats();

🔐 Security

Redis security and multi-tenant support:

cleo.configure({
  redis: {
    host: 'localhost',
    port: 6379,
    password: 'secure-password',
    tls: true,
    keyPrefix: 'tenant1:', // Multi-tenant support
  }
});

🚦 Rate Limiting

Control processing rates at queue and group levels:

@QueueClass({
  defaultOptions: {
    queue: 'api-calls',
    rateLimit: {
      max: 100,
      duration: 60000 // 100 tasks per minute
    }
  }
})

For more detailed information about specific features, check out our other guides: