PostsScaling Task Processing: From Startup to Enterprise 📈

Scaling Task Processing: From Startup to Enterprise 📈

5 min read
by Fofsinx

Learn how to scale your task processing system from handling thousands to millions of tasks using Cleo. Best practices for horizontal and vertical scaling with Redis and BullMQ.

Scaling Task Processing: From Startup to Enterprise 📈

Scaling a distributed task processing system requires careful consideration of various factors. Let's explore how to scale Cleo effectively using Redis and BullMQ's powerful features.

Understanding the Architecture

Cleo is built on top of BullMQ and Redis, providing a robust foundation for scaling:

import { Cleo } from "@cleotasks/core";
import { TaskPriority, BackoffType } from "@cleotasks/core/types/enums";
 
// Enhanced configuration with latest features
const cleo = Cleo.getInstance();
cleo.configure({
  redis: {
    host: "localhost",
    port: 6379,
    password: "cleosecret",
    keyPrefix: "prod:", // Multi-tenant support
  },
  worker: {
    concurrency: 4,
    maxMemoryPercent: 80,
    queues: [
      {
        name: "high-priority",
        priority: TaskPriority.HIGH,
        concurrency: 2,
      },
    ],
  },
});

Vertical Scaling Strategies

1. Smart Concurrency Management

// Advanced worker configuration
cleo.configure({
  worker: {
    concurrency: Math.max(1, os.cpus().length - 1), // Dynamic based on CPU
    maxMemoryPercent: 80,
    memoryCheckInterval: 1000,
    queues: [
      {
        name: "cpu-intensive",
        concurrency: 2,
        weight: 10, // Higher weight = more resources
      },
      {
        name: "io-bound",
        concurrency: 10,
        weight: 5,
      },
    ],
  },
});

2. Intelligent Resource Management

// Advanced resource management
@QueueClass({
  defaultOptions: {
    backoff: {
      type: BackoffType.EXPONENTIAL,
      delay: 1000,
    },
    removeOnComplete: {
      age: 3600, // Remove completed jobs after 1 hour
      count: 1000, // Keep last 1000 completed jobs
    },
    weight: 5, // Resource allocation weight
  },
})
class ResourceOptimizedService {
  @task({
    id: "resource-intensive-task",
    timeout: 30000,
  })
  async process() {
    // Resource-intensive processing
  }
}

Horizontal Scaling

1. Enhanced Redis Cluster

// Advanced Redis Cluster configuration
cleo.configure({
  redis: {
    cluster: [
      { host: "redis-1", port: 6379 },
      { host: "redis-2", port: 6379 },
      { host: "redis-3", port: 6379 },
    ],
    maxRetriesPerRequest: 3,
    retryStrategy: (times: number) => Math.min(times * 50, 2000),
    enableReadyCheck: true,
    keyPrefix: "tenant1:", // Multi-tenant support
  },
});

2. Distributed Workers

// Advanced worker distribution
class WorkerManager {
  async startWorker(config: WorkerConfig) {
    const worker = await cleo.createWorker({
      name: config.name,
      queues: config.queues,
      concurrency: config.concurrency,
      backoff: {
        type: BackoffType.EXPONENTIAL,
        delay: 1000,
      },
      removeOnComplete: true,
    });
 
    // Enhanced worker monitoring
    worker.on('completed', (job) => this.trackMetrics(job));
    worker.on('failed', (job, err) => this.handleFailure(job, err));
    
    return worker;
  }
}

Queue Optimization

1. Advanced Rate Limiting

@QueueClass({
  defaultOptions: {
    rateLimiter: {
      max: 1000,
      duration: 60000,
      groupKey: "ip", // Rate limit by group key
    },
    backoff: {
      type: BackoffType.EXPONENTIAL,
      delay: 1000,
    },
  },
})
class EnhancedRateLimitedService {
  @task({
    id: "rate-limited-task",
    weight: 5,
  })
  async process() {
    // Rate-limited processing
  }
}

2. Priority and Weight Management

@QueueClass({
  defaultOptions: {
    priority: TaskPriority.HIGH,
    weight: 10,
    removeOnComplete: true,
  },
})
class WeightedPriorityService {
  @task({
    id: "critical-task",
    timeout: 30000,
    backoff: {
      type: BackoffType.EXPONENTIAL,
      delay: 1000,
    },
  })
  async processUrgent() {
    // Critical task processing
  }
}

Performance Optimization

1. Smart Batch Processing

@QueueClass({
  defaultOptions: {
    group: "batch-jobs",
    weight: 8,
  },
})
class SmartBatchProcessor {
  @task({
    id: "process-batch",
    backoff: {
      type: BackoffType.EXPONENTIAL,
      delay: 1000,
    },
  })
  async processBatch(items: any[]) {
    const chunks = this.chunkArray(items, 100);
    const results = [];
    
    for (const chunk of chunks) {
      const chunkResults = await Promise.all(
        chunk.map(item => this.processWithRetry(item))
      );
      results.push(...chunkResults);
    }
    
    return results;
  }
 
  private async processWithRetry(item: any) {
    try {
      return await this.processItem(item);
    } catch (error) {
      // Intelligent error handling with backoff
      throw error;
    }
  }
}

2. Enhanced Connection Management

// Advanced connection pool configuration
cleo.configure({
  redis: {
    enableReadyCheck: true,
    maxRetriesPerRequest: 3,
    connectionNamespace: "cleo",
    connectTimeout: 5000,
    disconnectTimeout: 5000,
    retryStrategy: (times: number) => {
      if (times > 10) return null; // Stop retrying after 10 attempts
      return Math.min(times * 100, 3000); // Progressive delay
    },
  },
});

Comprehensive Monitoring

1. Enhanced Metrics Collection

class MetricsCollector {
  async collectMetrics() {
    const queues = await queueManager.getQueues();
    const metrics = await Promise.all(
      queues.map(async queue => {
        const queueMetrics = await queue.getMetrics();
        const workerMetrics = await this.getWorkerMetrics(queue.name);
        
        return {
          queue: queue.name,
          metrics: {
            processed: queueMetrics.processed,
            failed: queueMetrics.failed,
            delayed: queueMetrics.delayed,
            active: queueMetrics.active,
            waitTime: queueMetrics.waitTime,
            processingTime: queueMetrics.processingTime,
          },
          workers: workerMetrics,
        };
      })
    );
 
    return this.aggregateMetrics(metrics);
  }
}

2. Advanced Health Monitoring

class HealthMonitor {
  async checkSystemHealth() {
    const health = await cleo.getHealth();
    const redisInfo = await health.getRedisInfo();
    const workerStats = await this.getWorkerStats();
    
    return {
      redis: {
        connectedClients: redisInfo.connected_clients,
        usedMemory: redisInfo.used_memory_human,
        commandsProcessed: redisInfo.total_commands_processed,
        keyspace: redisInfo.keyspace_hits,
      },
      queues: await this.getQueueHealth(),
      workers: workerStats,
      system: {
        memory: process.memoryUsage(),
        cpu: process.cpuUsage(),
        uptime: process.uptime(),
      },
    };
  }
}

Best Practices for Scaling

  1. Redis Configuration

    • Enable persistence with appropriate fsync settings
    • Configure maxmemory-policy based on workload
    • Use Redis Cluster for high availability
    • Implement key prefixing for multi-tenant setups
  2. Worker Management

    • Scale workers dynamically based on system metrics
    • Implement graceful shutdown with task completion
    • Use worker weights for resource allocation
    • Configure appropriate backoff strategies
  3. Queue Design

    • Implement priority and weight-based processing
    • Use rate limiting with group keys
    • Design efficient batch processing with retries
    • Configure automatic job cleanup
  4. Monitoring and Maintenance

    • Track comprehensive metrics
    • Implement proactive health checks
    • Set up alerts for critical thresholds
    • Monitor Redis cluster health
    • Track worker performance metrics
  5. Security and Multi-tenancy

    • Use key prefixing for tenant isolation
    • Implement rate limiting per tenant
    • Configure secure Redis connections
    • Monitor tenant resource usage

Remember to:

  • Scale gradually based on metrics
  • Monitor system performance
  • Implement proper error handling
  • Plan for disaster recovery
  • Regular maintenance and optimization

Happy scaling! 🚀

Fofsinx

Fofsinx

Software Engineer | Writer | Designer