DocsAdvanced FeaturesScaling Strategies

Scaling Guide

Learn how to scale your Cleo task processing system from development to production.

🚀 Scaling Guide

Understanding Cleo's Architecture

Cleo is built on a distributed architecture using Redis as its backbone, allowing for horizontal scaling and high availability.

Basic Architecture

import { Cleo } from "@cleotasks/core";
import { TaskPriority } from "@cleotasks/core/types/enums";
 
// Basic single-worker configuration
const cleo = Cleo.getInstance();
cleo.configure({
  redis: {
    host: "localhost",
    port: 6379,
    password: "cleosecret",
  },
  worker: {
    concurrency: 4,
    queues: [
      {
        name: "default",
        priority: TaskPriority.NORMAL,
      },
    ],
  },
});

🔄 Scaling Strategies

1. Group-Based Scaling

@QueueClass({
  defaultOptions: {
    priority: TaskPriority.NORMAL,
    maxRetries: 3,
    retryDelay: 1000,
    group: "user1",
  },
  queue: "notifications",
})
class NotificationService {
  // Task methods...
}
 
// Configure group processing strategy
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.ROUND_ROBIN);

2. Priority-Based Scaling

// Configure multiple priority queues
cleo.configure({
  worker: {
    queues: [
      { name: "critical", priority: TaskPriority.HIGH },
      { name: "normal", priority: TaskPriority.NORMAL },
      { name: "background", priority: TaskPriority.LOW },
    ],
  },
});

3. Concurrent Processing

// Scale by increasing concurrency
cleo.configure({
  worker: {
    concurrency: 10, // Increase based on CPU cores
    queues: [
      {
        name: "heavy-processing",
        concurrency: 5, // Queue-specific concurrency
      },
    ],
  },
});

📊 Monitoring Scaled Systems

Task Event Monitoring

const queueManager = cleo.getQueueManager();
 
// Monitor task lifecycle
queueManager.onTaskEvent(ObserverEvent.STATUS_CHANGE, (taskId, status, data) => {
  console.log(`Task ${taskId} status: ${status}`);
});
 
// Monitor group performance
queueManager.onTaskEvent(ObserverEvent.GROUP_CHANGE, (taskId, status, data) => {
  console.log(`Group operation: ${data.operation}`);
});

Group Statistics

async function monitorGroups() {
  const group = await queueManager.getGroup("user1");
  const stats = await group.getStats();
  
  console.log("Group Statistics:", {
    completedTasks: stats.completed,
    failedTasks: stats.failed,
    processingTime: stats.avgProcessingTime,
  });
}

🎯 Best Practices for Scaling

1. Resource Management

@QueueClass({
  defaultOptions: {
    maxRetries: 3,
    retryDelay: 1000,
  },
})
class ResourceAwareService {
  async processTask(data: any) {
    // Implement resource-aware processing
    const available = await checkResourceAvailability();
    if (!available) {
      throw new ResourceUnavailableError();
    }
    // Process task...
  }
}

2. Error Handling at Scale

@QueueClass({
  defaultOptions: {
    maxRetries: 3,
    retryDelay: 1000,
    group: "critical",
  },
})
class ReliableService {
  async processWithRetry(data: any) {
    try {
      return await this.process(data);
    } catch (error) {
      // Log error and retry strategy
      throw error; // Let Cleo handle retry logic
    }
  }
}

🚀 Production Deployment Tips

1. Redis Configuration

// Production Redis setup
cleo.configure({
  redis: {
    host: process.env.REDIS_HOST,
    port: parseInt(process.env.REDIS_PORT || "6379"),
    password: process.env.REDIS_PASSWORD,
    tls: true,
    maxRetriesPerRequest: 3,
  },
});

2. Worker Distribution

// Configure multiple workers
const workerConfigs = [
  { name: "critical-worker", queues: ["critical"] },
  { name: "normal-worker", queues: ["normal", "background"] },
];
 
workerConfigs.forEach(config => {
  cleo.configure({
    worker: {
      name: config.name,
      queues: config.queues.map(q => ({
        name: q,
        priority: getPriorityForQueue(q),
      })),
    },
  });
});

📈 Performance Monitoring

1. Queue Metrics

async function monitorQueueHealth() {
  const queueManager = cleo.getQueueManager();
  
  // Monitor queue sizes
  const queues = await queueManager.getQueues();
  for (const queue of queues) {
    const stats = await queue.getStats();
    console.log(`Queue ${queue.name}:`, {
      waiting: stats.waiting,
      active: stats.active,
      completed: stats.completed,
      failed: stats.failed,
    });
  }
}

2. Worker Health

async function monitorWorkerHealth() {
  const workers = await cleo.getWorkers();
  for (const worker of workers) {
    const health = await worker.getHealth();
    console.log(`Worker ${worker.id}:`, {
      status: health.status,
      uptime: health.uptime,
      activeJobs: health.activeJobs,
    });
  }
}

🎯 Scaling Checklist

  1. Pre-scaling Assessment

    • Monitor current resource usage
    • Identify bottlenecks
    • Set performance baselines
  2. Infrastructure Setup

    • Configure Redis appropriately
    • Set up monitoring
    • Plan worker distribution
  3. Application Configuration

    • Optimize worker settings
    • Configure group strategies
    • Set up error handling
  4. Monitoring Setup

    • Implement event listeners
    • Track group statistics
    • Monitor worker health

Remember: Start with proper monitoring before scaling, and scale based on actual metrics rather than assumptions.