API Reference

Complete API documentation for Cleo, including configuration options, task definitions, worker settings, and monitoring capabilities.

API Reference

Note: Cleo is built on top of BullMQ. For more detailed information about underlying queue features and options, please refer to the BullMQ documentation.

Core Configuration

Cleo.getInstance()

import { Cleo } from "@cleotasks/core";
 
const cleo = Cleo.getInstance();

cleo.configure()

cleo.configure({
  redis: {
    host: string;
    port: number;
    password?: string;
    tls?: boolean;
    db?: number;
    keyPrefix?: string;
  },
  worker: {
    concurrency?: number;
    maxMemoryUsage?: number;
    prefix?: string;
    queues?: {
      name: string;
      priority: TaskPriority;
      rateLimit?: {
        max: number;
        interval: number;
      };
      concurrency?: number;
    }[];
  }
});

Decorators

@task Decorator

import { task } from "@cleotasks/core/decorators/task";
 
@task({
  id?: string;
  priority?: TaskPriority;
  queue?: string;
  maxRetries?: number;
  retryDelay?: number;
  backoff?: {
    type: "exponential" | "fixed";
    delay: number;
  };
  timeout?: number;
  schedule?: RepeatOptions;
  group?: string;
  removeOnComplete?: boolean;
  weight?: number;
})

@QueueClass Decorator

import { QueueClass } from "@cleotasks/core/decorators/class";
 
@QueueClass({
  defaultOptions?: {
    priority?: TaskPriority;
    maxRetries?: number;
    retryDelay?: number;
    backoff?: {
      type: "exponential" | "fixed";
      delay: number;
    };
    group?: string;
    timeout?: number;
  },
  queue?: string;
  includeInherited?: boolean;
  exclude?: string[];
  include?: string[];
  group?: string;
})

Queue Management

QueueManager

const queueManager = cleo.getQueueManager();
 
// Event Handling
queueManager.onTaskEvent(event: ObserverEvent, callback: TaskEventCallback);
queueManager.offTaskEvent(event: ObserverEvent);
 
// Group Management
queueManager.setGroupProcessingStrategy(strategy: GroupProcessingStrategy);
queueManager.setGroupPriority(groupId: string, priority: number);
queueManager.getGroup(groupId: string): Promise<Group>;
queueManager.getAllGroups(): Promise<string[]>;
 
// Task Management
queueManager.addTask(name: string, data: any, options?: TaskOptions): Promise<Task>;
queueManager.getTask(taskId: string, queueName?: string): Promise<Task | null>;
queueManager.removeTask(taskId: string, queueName?: string): Promise<boolean>;

Task Events

enum ObserverEvent {
  STATUS_CHANGE = "status_change",
  TASK_ADDED = "task_added",
  TASK_COMPLETED = "task_completed",
  TASK_FAILED = "task_failed",
  GROUP_CHANGE = "group_change",
  GROUP_STATE_CHANGE = "group_state_change"
}
 
interface TaskEventCallback {
  (taskId: string, status: TaskStatus, data?: any): void | Promise<void>;
}

Group Processing

enum GroupProcessingStrategy {
  ROUND_ROBIN = "round_robin",
  FIFO = "fifo",
  PRIORITY = "priority"
}
 
interface GroupConfig {
  name: string;
  concurrency?: number;
  maxConcurrency?: number;
  rateLimit?: {
    max: number;
    duration: number;
  };
  priority?: number;
  strategy?: GroupProcessingStrategy;
  retryDelay?: number;
  retryLimit?: number;
  timeout?: number;
}

Worker Management

interface WorkerConfig {
  concurrency?: number;
  maxMemoryUsage?: number;
  prefix?: string;
  queues?: QueueConfig[];
}
 
interface WorkerMetrics {
  tasksProcessed: number;
  tasksSucceeded: number;
  tasksFailed: number;
  averageProcessingTime: number;
  totalProcessingTime?: number;
}
 
// Worker API
const worker = cleo.getWorker("queueName");
await worker.getStatus(): Promise<string>;
await worker.getActiveTasks(): Promise<string[]>;
await worker.getMetrics(): Promise<WorkerMetrics>;
await worker.getTaskHistory(): Promise<TaskHistoryEntry[]>;

Types

TaskOptions

interface TaskOptions {
  id?: string;
  priority?: TaskPriority;
  queue?: string;
  maxRetries?: number;
  retryDelay?: number;
  group?: string;
}

QueueClassOptions

interface QueueClassOptions {
  defaultOptions?: TaskOptions;
  queue?: string;
}

REST API

Queue Routes

// Get all queues
GET /api/queues/get-all
Response: {
  queues: Array<{
    name: string;
    metrics: {
      waiting: number;
      active: number;
      completed: number;
      failed: number;
    };
    tasks: Task[];
  }>;
}
 
// Get queue by name
GET /api/queues/get-by-name/:queueName
Response: {
  tasks: Task[];
  metrics: {
    active: number;
    waiting: number;
    completed: number;
    failed: number;
  };
}
 
// Get task by ID
GET /api/queues/get-task/:taskId
Response: {
  task: Task;
  history: Array<{
    timestamp: string;
    state: string;
    data?: any;
  }>;
}
 
// Add task
POST /api/queues/add-task
Body: {
  name: string;
  data: any;
  options: TaskOptions;
}
Response: Task
 
// Remove task
DELETE /api/queues/remove-task/:taskId
Response: { success: boolean }
 
// Get queue metrics
GET /api/queues/:queueName/metrics
Query: {
  start?: number;
  end?: number;
}
Response: {
  metrics: Array<{
    timestamp: string;
    active: number;
    waiting: number;
    completed: number;
    failed: number;
  }>;
}

Group Routes

// Get all groups
GET /api/groups
Response: {
  groups: Array<{
    name: string;
    metrics: {
      active: number;
      waiting: number;
      completed: number;
      failed: number;
    };
  }>;
}
 
// Get tasks for a specific group
GET /api/groups/:groupName/tasks
Response: {
  tasks: Task[];
}
 
// Add task to group
POST /api/groups/:groupName/tasks
Body: {
  methodName: string;
  data: any;
  options: TaskOptions;
}
Response: { message: string }
 
// Set group priority
PUT /api/groups/:groupName/priority
Body: {
  priority: number;
}
Response: { message: string }
 
// Get group stats
GET /api/groups/:groupName/stats
Response: {
  stats: {
    name: string;
    total: number;
    active: number;
    completed: number;
    failed: number;
    paused: number;
  };
}

Worker Routes

// Get all workers
GET /api/workers
Query: {
  queue?: string;
}
Response: {
  workers: Array<{
    id: string;
    queue: string;
    status: string;
    activeTasks: any[];
    metrics: {
      tasksProcessed: number;
      tasksSucceeded: number;
      tasksFailed: number;
      averageProcessingTime: number;
    };
    lastHeartbeat: string;
    isActive: boolean;
  }>;
}
 
// Get specific worker
GET /api/workers/:workerId
Response: {
  id: string;
  queue: string;
  status: string;
  activeTasks: string[];
  metrics: WorkerMetrics;
  lastHeartbeat: string;
  isActive: boolean;
  history?: TaskHistoryEntry[];
}
 
// Get worker metrics
GET /api/workers/:workerId/metrics
Response: {
  current: WorkerMetrics;
  history: Array<WorkerMetrics & { timestamp: string }>;
}

For more detailed information about underlying queue features, job options, and advanced configurations, please refer to the BullMQ documentation.