Core Concepts
Learn the fundamental building blocks and concepts behind Cleo's distributed task processing system. This guide covers task decorators, queue classes, group processing strategies, event handling, and error management.
🧠 Core Concepts - The Building Blocks of Cleo
🎀 Task Decorators
The @task
decorator provides fine-grained control over individual task execution:
class EmailService {
@task({
id: "send-email",
priority: TaskPriority.HIGH,
queue: 'email',
group: 'notifications',
timeout: 30000,
maxRetries: 3,
backoff: {
type: 'exponential',
delay: 1000
},
weight: 10,
removeOnComplete: true
})
async sendEmail(input: { email: string }): Promise<string> {
// Implementation
}
}
Key decorator options:
id
: Unique task identifierpriority
: Task priority levelqueue
: Target queue namegroup
: Group for coordinated processingtimeout
: Maximum execution timemaxRetries
: Maximum retry attemptsbackoff
: Retry delay strategyweight
: Task priority within groupremoveOnComplete
: Auto-cleanup after completion
🎯 Queue Classes
The @QueueClass
decorator organizes related tasks:
@QueueClass({
defaultOptions: {
queue: 'notifications',
group: 'emails',
maxRetries: 3
},
includeInherited: true,
exclude: ['privateMethod'],
})
class NotificationService {
// Methods automatically become tasks
}
📊 Group Processing
Task groups enable coordinated processing with multiple strategies:
const group = await queueManager.getGroup('notifications');
// Configure group processing
await group.updateConfig({
concurrency: 5,
maxConcurrency: 10,
rateLimit: {
max: 100,
duration: 60000
},
strategy: GroupProcessingStrategy.PRIORITY,
timeout: 30000
});
// Monitor group stats
const stats = await group.getStats();
console.log(stats); // { total, active, completed, failed, paused }
🎭 Event Handling
Comprehensive event system for task lifecycle management:
const queueManager = cleo.getQueueManager();
// Task events
queueManager.onTaskEvent(ObserverEvent.TASK_COMPLETED,
(taskId, status, data) => {
console.log(`Task ${taskId} completed with result:`, data.result);
});
queueManager.onTaskEvent(ObserverEvent.TASK_FAILED,
(taskId, status, data) => {
console.error(`Task ${taskId} failed:`, data.error);
});
// Group events
queueManager.onTaskEvent(ObserverEvent.GROUP_STATE_CHANGE,
(groupId, status, data) => {
console.log(`Group ${groupId} state changed to:`, status);
});
🔄 Error Handling
Robust error handling with configurable retry strategies:
@task({
maxRetries: 3,
backoff: {
type: 'exponential',
delay: 1000 // Doubles each retry: 1s, 2s, 4s
},
retryDelay: 5000
})
async function retryableTask() {
try {
// Task implementation
} catch (error) {
// Error will trigger retry with backoff
throw error;
}
}
📈 Monitoring
Built-in monitoring capabilities:
// Worker metrics
const worker = cleo.getWorker('email-queue');
const metrics = await worker.getMetrics();
const history = await worker.getTaskHistory();
// Queue metrics
const queueMetrics = await queueManager.getQueueMetrics('email-queue');
// Group monitoring
const groupStats = await group.getStats();
🔐 Security
Redis security and multi-tenant support:
cleo.configure({
redis: {
host: 'localhost',
port: 6379,
password: 'secure-password',
tls: true,
keyPrefix: 'tenant1:', // Multi-tenant support
}
});
🚦 Rate Limiting
Control processing rates at queue and group levels:
@QueueClass({
defaultOptions: {
queue: 'api-calls',
rateLimit: {
max: 100,
duration: 60000 // 100 tasks per minute
}
}
})
For more detailed information about specific features, check out our other guides: