Scaling Guide
Learn how to scale your Cleo task processing system from development to production.
🚀 Scaling Guide
Understanding Cleo's Architecture
Cleo is built on a distributed architecture using Redis as its backbone, allowing for horizontal scaling and high availability.
Basic Architecture
import { Cleo } from "@cleotasks/core";
import { TaskPriority } from "@cleotasks/core/types/enums";
// Basic single-worker configuration
const cleo = Cleo.getInstance();
cleo.configure({
redis: {
host: "localhost",
port: 6379,
password: "cleosecret",
},
worker: {
concurrency: 4,
queues: [
{
name: "default",
priority: TaskPriority.NORMAL,
},
],
},
});
🔄 Scaling Strategies
1. Group-Based Scaling
@QueueClass({
defaultOptions: {
priority: TaskPriority.NORMAL,
maxRetries: 3,
retryDelay: 1000,
group: "user1",
},
queue: "notifications",
})
class NotificationService {
// Task methods...
}
// Configure group processing strategy
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.ROUND_ROBIN);
2. Priority-Based Scaling
// Configure multiple priority queues
cleo.configure({
worker: {
queues: [
{ name: "critical", priority: TaskPriority.HIGH },
{ name: "normal", priority: TaskPriority.NORMAL },
{ name: "background", priority: TaskPriority.LOW },
],
},
});
3. Concurrent Processing
// Scale by increasing concurrency
cleo.configure({
worker: {
concurrency: 10, // Increase based on CPU cores
queues: [
{
name: "heavy-processing",
concurrency: 5, // Queue-specific concurrency
},
],
},
});
📊 Monitoring Scaled Systems
Task Event Monitoring
const queueManager = cleo.getQueueManager();
// Monitor task lifecycle
queueManager.onTaskEvent(ObserverEvent.STATUS_CHANGE, (taskId, status, data) => {
console.log(`Task ${taskId} status: ${status}`);
});
// Monitor group performance
queueManager.onTaskEvent(ObserverEvent.GROUP_CHANGE, (taskId, status, data) => {
console.log(`Group operation: ${data.operation}`);
});
Group Statistics
async function monitorGroups() {
const group = await queueManager.getGroup("user1");
const stats = await group.getStats();
console.log("Group Statistics:", {
completedTasks: stats.completed,
failedTasks: stats.failed,
processingTime: stats.avgProcessingTime,
});
}
🎯 Best Practices for Scaling
1. Resource Management
@QueueClass({
defaultOptions: {
maxRetries: 3,
retryDelay: 1000,
},
})
class ResourceAwareService {
async processTask(data: any) {
// Implement resource-aware processing
const available = await checkResourceAvailability();
if (!available) {
throw new ResourceUnavailableError();
}
// Process task...
}
}
2. Error Handling at Scale
@QueueClass({
defaultOptions: {
maxRetries: 3,
retryDelay: 1000,
group: "critical",
},
})
class ReliableService {
async processWithRetry(data: any) {
try {
return await this.process(data);
} catch (error) {
// Log error and retry strategy
throw error; // Let Cleo handle retry logic
}
}
}
🚀 Production Deployment Tips
1. Redis Configuration
// Production Redis setup
cleo.configure({
redis: {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT || "6379"),
password: process.env.REDIS_PASSWORD,
tls: true,
maxRetriesPerRequest: 3,
},
});
2. Worker Distribution
// Configure multiple workers
const workerConfigs = [
{ name: "critical-worker", queues: ["critical"] },
{ name: "normal-worker", queues: ["normal", "background"] },
];
workerConfigs.forEach(config => {
cleo.configure({
worker: {
name: config.name,
queues: config.queues.map(q => ({
name: q,
priority: getPriorityForQueue(q),
})),
},
});
});
📈 Performance Monitoring
1. Queue Metrics
async function monitorQueueHealth() {
const queueManager = cleo.getQueueManager();
// Monitor queue sizes
const queues = await queueManager.getQueues();
for (const queue of queues) {
const stats = await queue.getStats();
console.log(`Queue ${queue.name}:`, {
waiting: stats.waiting,
active: stats.active,
completed: stats.completed,
failed: stats.failed,
});
}
}
2. Worker Health
async function monitorWorkerHealth() {
const workers = await cleo.getWorkers();
for (const worker of workers) {
const health = await worker.getHealth();
console.log(`Worker ${worker.id}:`, {
status: health.status,
uptime: health.uptime,
activeJobs: health.activeJobs,
});
}
}
🎯 Scaling Checklist
-
Pre-scaling Assessment
- Monitor current resource usage
- Identify bottlenecks
- Set performance baselines
-
Infrastructure Setup
- Configure Redis appropriately
- Set up monitoring
- Plan worker distribution
-
Application Configuration
- Optimize worker settings
- Configure group strategies
- Set up error handling
-
Monitoring Setup
- Implement event listeners
- Track group statistics
- Monitor worker health
Remember: Start with proper monitoring before scaling, and scale based on actual metrics rather than assumptions.