DocsAdvanced FeaturesBest Practices

Best Practices Guide

Learn the recommended patterns and practices for building robust distributed task processing systems with Cleo, including advanced task grouping, event handling, and monitoring strategies.

🎯 Best Practices

Task Design

1. Use Task Decorators Effectively

class EmailService {
  @task({
    id: "send-email",
    priority: TaskPriority.HIGH,
    queue: 'notifications',
    group: 'emails',
    // Configure timeouts based on operation
    timeout: 30000,
    // Use exponential backoff for retries
    maxRetries: 3,
    backoff: {
      type: "exponential",
      delay: 2000,
    },
  })
  async sendEmail(data: { email: string, template: string }) {
    // Email sending logic
  }
}

2. Organize with Queue Classes

@QueueClass({
  defaultOptions: {
    maxRetries: 3,
    retryDelay: 1000,
    backoff: {
      type: "exponential",
      delay: 2000,
    },
    group: "notifications",
    timeout: 300000,
  },
  queue: "notifications",
})
class NotificationService {
  @task({
    id: "send-push",
    priority: TaskPriority.HIGH,
  })
  async sendPushNotification(data: { userId: string, message: string }) {
    // Push notification logic
  }
 
  @task({
    id: "send-sms",
    priority: TaskPriority.NORMAL,
  })
  async sendSMS(data: { phone: string, message: string }) {
    // SMS sending logic
  }
}

3. Implement Smart Group Processing

// Choose strategy based on requirements
const queueManager = cleo.getQueueManager();
 
// For fair distribution with time tracking
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.ROUND_ROBIN);
 
// For strict ordering with history
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.FIFO);
 
// For dynamic priority adjustment
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.PRIORITY);
await queueManager.setGroupPriority("premium-users", 10);
await queueManager.setGroupPriority("free-users", 1);
 
// Monitor group operations
queueManager.onTaskEvent(ObserverEvent.GROUP_CHANGE, (taskId, status, data) => {
  console.log(`Group operation for ${taskId}:`, {
    operation: data.operation,
    group: data.group,
    history: data.history
  });
});

Monitoring & Observability

1. Implement Comprehensive Event Handling

const queueManager = cleo.getQueueManager();
 
// Monitor task lifecycle with filtering
queueManager.onTaskEvent(ObserverEvent.TASK_COMPLETED, (taskId, status, data) => {
  if (data?.group === 'premium-users') {
    console.log(`Premium task ${taskId} completed:`, {
      result: data?.result,
      duration: data?.duration
    });
  }
});
 
// Track failures with retry information
queueManager.onTaskEvent(ObserverEvent.TASK_FAILED, (taskId, status, data) => {
  console.error(`Task ${taskId} failed:`, {
    error: data?.error,
    retryCount: data?.retryCount,
    nextRetry: data?.nextRetry,
    group: data?.group
  });
});
 
// Monitor group state changes
queueManager.onTaskEvent(ObserverEvent.GROUP_STATE_CHANGE, (taskId, status, data) => {
  console.log(`Group ${data.group} state changed:`, {
    previousState: data.previousState,
    newState: data.newState,
    reason: data.reason
  });
});

2. Track Comprehensive Metrics

async function monitorSystemHealth() {
  // Get group statistics
  const group = await queueManager.getGroup("premium-users");
  const stats = await group.getStats();
  
  console.log("Group Performance:", {
    total: stats.total,
    active: stats.active,
    completed: stats.completed,
    failed: stats.failed,
    paused: stats.paused
  });
 
  // Get worker metrics
  const worker = cleo.getWorker("notifications");
  const metrics = await worker.getMetrics();
  console.log("Worker Performance:", {
    tasksProcessed: metrics.tasksProcessed,
    tasksSucceeded: metrics.tasksSucceeded,
    tasksFailed: metrics.tasksFailed,
    averageProcessingTime: metrics.averageProcessingTime
  });
 
  // Get task history
  const taskHistory = await worker.getTaskHistory();
  console.log("Recent Task History:", taskHistory.map(entry => ({
    taskId: entry.taskId,
    status: entry.status,
    duration: entry.duration,
    timestamp: entry.timestamp,
    group: entry.group
  })));
}

Production Deployment

1. Configure Redis with Security

cleo.configure({
  redis: {
    host: process.env.REDIS_HOST,
    port: parseInt(process.env.REDIS_PORT || "6379"),
    password: process.env.REDIS_PASSWORD,
    tls: true,
    maxRetriesPerRequest: 3,
    // Enable key prefix for multi-tenant setups
    keyPrefix: process.env.REDIS_KEY_PREFIX,
    // Enable TLS for secure connections
    tls: {
      rejectUnauthorized: true,
      ca: process.env.REDIS_CA_CERT
    }
  },
});

2. Implement Graceful Shutdown

async function gracefulShutdown() {
  console.log('Initiating graceful shutdown...');
  
  // Stop accepting new tasks
  await cleo.pause();
  
  // Get active tasks
  const workers = await cleo.getQueueManager().getAllWorkers();
  for (const workerId of workers) {
    const worker = await cleo.getWorkerManager().getWorker(workerId);
    const activeTasks = await worker.getActiveTasks();
    console.log(`Waiting for ${activeTasks.length} tasks to complete on worker ${workerId}`);
  }
  
  // Wait for active tasks with timeout
  await Promise.race([
    cleo.waitForCompletion(),
    new Promise(resolve => setTimeout(resolve, 30000))
  ]);
  
  // Clean up resources
  await cleo.getQueueManager().close();
  
  process.exit(0);
}
 
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);

Best Practices Checklist

  1. Task Design

    • Use task decorators with appropriate options
    • Implement queue classes for organization
    • Configure smart group processing strategies
    • Set up proper event filtering
  2. Monitoring

    • Implement comprehensive event handling
    • Track group and worker metrics
    • Monitor task history
    • Set up alerting for failures
  3. Production

    • Configure Redis securely
    • Implement robust graceful shutdown
    • Set up monitoring and logging
    • Plan for scaling and failover
  4. Error Handling

    • Configure smart retry strategies
    • Implement comprehensive error logging
    • Use try-catch blocks effectively
    • Handle edge cases and timeouts
  5. Group Processing

    • Choose appropriate processing strategies
    • Monitor group operations
    • Track group statistics
    • Handle group state changes
  6. Performance

    • Configure appropriate timeouts
    • Use efficient batch processing
    • Monitor processing times
    • Optimize resource usage