API Reference
Complete API documentation for Cleo, including configuration options, task definitions, worker settings, and monitoring capabilities.
API Reference
Note: Cleo is built on top of BullMQ. For more detailed information about underlying queue features and options, please refer to the BullMQ documentation.
Core Configuration
Cleo.getInstance()
import { Cleo } from "@cleotasks/core";
const cleo = Cleo.getInstance();
cleo.configure()
cleo.configure({
redis: {
host: string;
port: number;
password?: string;
tls?: boolean;
db?: number;
keyPrefix?: string;
},
worker: {
concurrency?: number;
maxMemoryUsage?: number;
prefix?: string;
queues?: {
name: string;
priority: TaskPriority;
rateLimit?: {
max: number;
interval: number;
};
concurrency?: number;
}[];
}
});
Decorators
@task Decorator
import { task } from "@cleotasks/core/decorators/task";
@task({
id?: string;
priority?: TaskPriority;
queue?: string;
maxRetries?: number;
retryDelay?: number;
backoff?: {
type: "exponential" | "fixed";
delay: number;
};
timeout?: number;
schedule?: RepeatOptions;
group?: string;
removeOnComplete?: boolean;
weight?: number;
})
@QueueClass Decorator
import { QueueClass } from "@cleotasks/core/decorators/class";
@QueueClass({
defaultOptions?: {
priority?: TaskPriority;
maxRetries?: number;
retryDelay?: number;
backoff?: {
type: "exponential" | "fixed";
delay: number;
};
group?: string;
timeout?: number;
},
queue?: string;
includeInherited?: boolean;
exclude?: string[];
include?: string[];
group?: string;
})
Queue Management
QueueManager
const queueManager = cleo.getQueueManager();
// Event Handling
queueManager.onTaskEvent(event: ObserverEvent, callback: TaskEventCallback);
queueManager.offTaskEvent(event: ObserverEvent);
// Group Management
queueManager.setGroupProcessingStrategy(strategy: GroupProcessingStrategy);
queueManager.setGroupPriority(groupId: string, priority: number);
queueManager.getGroup(groupId: string): Promise<Group>;
queueManager.getAllGroups(): Promise<string[]>;
// Task Management
queueManager.addTask(name: string, data: any, options?: TaskOptions): Promise<Task>;
queueManager.getTask(taskId: string, queueName?: string): Promise<Task | null>;
queueManager.removeTask(taskId: string, queueName?: string): Promise<boolean>;
Task Events
enum ObserverEvent {
STATUS_CHANGE = "status_change",
TASK_ADDED = "task_added",
TASK_COMPLETED = "task_completed",
TASK_FAILED = "task_failed",
GROUP_CHANGE = "group_change",
GROUP_STATE_CHANGE = "group_state_change"
}
interface TaskEventCallback {
(taskId: string, status: TaskStatus, data?: any): void | Promise<void>;
}
Group Processing
enum GroupProcessingStrategy {
ROUND_ROBIN = "round_robin",
FIFO = "fifo",
PRIORITY = "priority"
}
interface GroupConfig {
name: string;
concurrency?: number;
maxConcurrency?: number;
rateLimit?: {
max: number;
duration: number;
};
priority?: number;
strategy?: GroupProcessingStrategy;
retryDelay?: number;
retryLimit?: number;
timeout?: number;
}
Worker Management
interface WorkerConfig {
concurrency?: number;
maxMemoryUsage?: number;
prefix?: string;
queues?: QueueConfig[];
}
interface WorkerMetrics {
tasksProcessed: number;
tasksSucceeded: number;
tasksFailed: number;
averageProcessingTime: number;
totalProcessingTime?: number;
}
// Worker API
const worker = cleo.getWorker("queueName");
await worker.getStatus(): Promise<string>;
await worker.getActiveTasks(): Promise<string[]>;
await worker.getMetrics(): Promise<WorkerMetrics>;
await worker.getTaskHistory(): Promise<TaskHistoryEntry[]>;
Types
TaskOptions
interface TaskOptions {
id?: string;
priority?: TaskPriority;
queue?: string;
maxRetries?: number;
retryDelay?: number;
group?: string;
}
QueueClassOptions
interface QueueClassOptions {
defaultOptions?: TaskOptions;
queue?: string;
}
REST API
Queue Routes
// Get all queues
GET /api/queues/get-all
Response: {
queues: Array<{
name: string;
metrics: {
waiting: number;
active: number;
completed: number;
failed: number;
};
tasks: Task[];
}>;
}
// Get queue by name
GET /api/queues/get-by-name/:queueName
Response: {
tasks: Task[];
metrics: {
active: number;
waiting: number;
completed: number;
failed: number;
};
}
// Get task by ID
GET /api/queues/get-task/:taskId
Response: {
task: Task;
history: Array<{
timestamp: string;
state: string;
data?: any;
}>;
}
// Add task
POST /api/queues/add-task
Body: {
name: string;
data: any;
options: TaskOptions;
}
Response: Task
// Remove task
DELETE /api/queues/remove-task/:taskId
Response: { success: boolean }
// Get queue metrics
GET /api/queues/:queueName/metrics
Query: {
start?: number;
end?: number;
}
Response: {
metrics: Array<{
timestamp: string;
active: number;
waiting: number;
completed: number;
failed: number;
}>;
}
Group Routes
// Get all groups
GET /api/groups
Response: {
groups: Array<{
name: string;
metrics: {
active: number;
waiting: number;
completed: number;
failed: number;
};
}>;
}
// Get tasks for a specific group
GET /api/groups/:groupName/tasks
Response: {
tasks: Task[];
}
// Add task to group
POST /api/groups/:groupName/tasks
Body: {
methodName: string;
data: any;
options: TaskOptions;
}
Response: { message: string }
// Set group priority
PUT /api/groups/:groupName/priority
Body: {
priority: number;
}
Response: { message: string }
// Get group stats
GET /api/groups/:groupName/stats
Response: {
stats: {
name: string;
total: number;
active: number;
completed: number;
failed: number;
paused: number;
};
}
Worker Routes
// Get all workers
GET /api/workers
Query: {
queue?: string;
}
Response: {
workers: Array<{
id: string;
queue: string;
status: string;
activeTasks: any[];
metrics: {
tasksProcessed: number;
tasksSucceeded: number;
tasksFailed: number;
averageProcessingTime: number;
};
lastHeartbeat: string;
isActive: boolean;
}>;
}
// Get specific worker
GET /api/workers/:workerId
Response: {
id: string;
queue: string;
status: string;
activeTasks: string[];
metrics: WorkerMetrics;
lastHeartbeat: string;
isActive: boolean;
history?: TaskHistoryEntry[];
}
// Get worker metrics
GET /api/workers/:workerId/metrics
Response: {
current: WorkerMetrics;
history: Array<WorkerMetrics & { timestamp: string }>;
}
For more detailed information about underlying queue features, job options, and advanced configurations, please refer to the BullMQ documentation.