PostsEffective Task Grouping Strategies with Cleo 📊

Effective Task Grouping Strategies with Cleo 📊

6 min read
by Fofsinx

Learn how to effectively use Cleo's task grouping capabilities to organize and process related tasks efficiently. Discover different processing strategies and their use cases.

Effective Task Grouping Strategies with Cleo 📊

Task grouping is a powerful feature that helps organize and process related tasks efficiently. Let's explore how to use Cleo's advanced grouping capabilities effectively.

Understanding Group Processing Strategies

Round Robin Strategy with Weights

@QueueClass({
  defaultOptions: {
    group: "notifications",
    maxRetries: 3,
    backoff: {
      type: BackoffType.EXPONENTIAL,
      delay: 1000,
    },
    weight: 5, // Default weight for all tasks
  },
  queue: "notifications",
})
class NotificationService {
  @task({
    id: "send-push",
    priority: TaskPriority.HIGH,
    weight: 8, // Higher weight for push notifications
  })
  async sendPushNotification(data: { message: string }) {
    // Push notification logic
  }
 
  @task({
    id: "send-email",
    priority: TaskPriority.NORMAL,
    weight: 3, // Lower weight for emails
  })
  async sendEmail(data: { message: string }) {
    // Email sending logic
  }
}
 
// Configure Round Robin strategy with weights
const groupConfig: GroupConfig = {
  strategy: GroupProcessingStrategy.ROUND_ROBIN,
  concurrency: 5,
  maxConcurrency: 10,
  rateLimit: {
    max: 1000,
    duration: 60000,
  },
};
 
await queueManager.setGroupConfig("notifications", groupConfig);

FIFO Processing with Backoff

@QueueClass({
  defaultOptions: {
    group: "orders",
    maxRetries: 3,
    backoff: {
      type: BackoffType.EXPONENTIAL,
      delay: 1000,
    },
    removeOnComplete: {
      age: 3600, // Remove completed orders after 1 hour
      count: 1000, // Keep last 1000 completed orders
    },
  },
  queue: "orders",
})
class OrderProcessor {
  @task({
    id: "process-order",
    timeout: 30000,
  })
  async processOrder(orderId: string) {
    try {
      // Order processing logic
    } catch (error) {
      // Error will trigger backoff retry
      throw error;
    }
  }
}
 
// Configure FIFO processing with monitoring
const fifoConfig: GroupConfig = {
  strategy: GroupProcessingStrategy.FIFO,
  concurrency: 3,
  rateLimit: {
    max: 100,
    duration: 60000,
  },
};
 
await queueManager.setGroupConfig("orders", fifoConfig);

Priority-based Processing with Dynamic Weights

@QueueClass({
  defaultOptions: {
    group: "users",
    maxRetries: 3,
    backoff: {
      type: BackoffType.EXPONENTIAL,
      delay: 1000,
    },
  },
  queue: "users",
})
class UserService {
  @task({
    id: "process-vip",
    priority: TaskPriority.HIGH,
    weight: 10,
  })
  async processVIPUser(userId: string) {
    // VIP user processing
  }
 
  @task({
    id: "process-standard",
    priority: TaskPriority.NORMAL,
    weight: 5,
  })
  async processStandardUser(userId: string) {
    // Standard user processing
  }
}
 
// Dynamic priority and weight adjustment
class GroupPriorityManager {
  async adjustGroupSettings(groupId: string) {
    const stats = await queueManager.getGroupStats(groupId);
    const load = await this.getSystemLoad();
    
    await queueManager.setGroupConfig(groupId, {
      priority: this.calculatePriority(stats, load),
      weight: this.calculateWeight(stats, load),
      concurrency: this.calculateConcurrency(stats, load),
    });
  }
}

Enhanced Group Monitoring

Comprehensive Metrics Tracking

class GroupMetricsCollector {
  async collectGroupMetrics(groupId: string) {
    const group = await queueManager.getGroup(groupId);
    const stats = await group.getStats();
    const history = await group.getHistory();
    
    return {
      current: {
        total: stats.total,
        active: stats.active,
        completed: stats.completed,
        failed: stats.failed,
        delayed: stats.delayed,
        waiting: stats.waiting,
      },
      performance: {
        avgProcessingTime: stats.avgProcessingTime,
        throughput: stats.completed / stats.uptime,
        successRate: stats.completed / (stats.completed + stats.failed),
        concurrencyUtilization: stats.active / stats.maxConcurrency,
      },
      history: history.map(entry => ({
        timestamp: entry.timestamp,
        metrics: {
          completed: entry.completed,
          failed: entry.failed,
          processing: entry.processing,
        },
      })),
    };
  }
}

Advanced Event Handling

class GroupEventHandler {
  constructor() {
    // Monitor group state changes
    queueManager.onTaskEvent(ObserverEvent.GROUP_STATE_CHANGE,
      (groupId, status, data) => {
        this.handleStateChange(groupId, status, data);
    });
 
    // Track group operations
    queueManager.onTaskEvent(ObserverEvent.GROUP_CHANGE,
      (groupId, status, data) => {
        this.handleGroupOperation(groupId, status, data);
    });
 
    // Monitor group performance
    queueManager.onTaskEvent(ObserverEvent.GROUP_METRICS,
      (groupId, metrics) => {
        this.updateMetrics(groupId, metrics);
    });
  }
 
  private async handleStateChange(groupId: string, status: string, data: any) {
    await this.notifyStateChange(groupId, status);
    await this.updateDashboard(groupId);
    await this.checkAlerts(groupId, status, data);
  }
}

Best Practices

1. Group Organization

class TaskGroupManager {
  async organizeGroups() {
    // Configure groups with appropriate settings
    await this.setupGroup("critical", {
      strategy: GroupProcessingStrategy.PRIORITY,
      concurrency: 10,
      maxConcurrency: 20,
      priority: 1,
    });
 
    await this.setupGroup("background", {
      strategy: GroupProcessingStrategy.ROUND_ROBIN,
      concurrency: 5,
      maxConcurrency: 10,
      priority: 5,
    });
  }
 
  private async setupGroup(name: string, config: GroupConfig) {
    await queueManager.setGroupConfig(name, {
      ...config,
      rateLimit: this.calculateRateLimit(name),
      backoff: this.getBackoffStrategy(name),
    });
  }
}

2. Error Handling with Recovery

@QueueClass({
  defaultOptions: {
    group: "critical",
    maxRetries: 3,
    backoff: {
      type: BackoffType.EXPONENTIAL,
      delay: 1000,
    },
    removeOnComplete: true,
  },
})
class ReliableService {
  @task({
    id: "critical-operation",
    timeout: 30000,
  })
  async processCriticalTask(data: any) {
    try {
      await this.process(data);
    } catch (error) {
      // Enhanced error handling
      await this.handleError(error);
      await this.notifyFailure(error);
      
      // Determine if retry is appropriate
      if (this.shouldRetry(error)) {
        throw error; // Trigger backoff retry
      } else {
        await this.handleFatalError(error);
      }
    }
  }
}

3. Resource Management

class GroupResourceManager {
  async optimizeResources() {
    const groups = await queueManager.getAllGroups();
    
    for (const group of groups) {
      const stats = await group.getStats();
      const metrics = await this.getGroupMetrics(group.name);
      
      await this.adjustResources(group.name, {
        concurrency: this.calculateOptimalConcurrency(stats, metrics),
        weight: this.calculateOptimalWeight(stats, metrics),
        priority: this.calculateOptimalPriority(stats, metrics),
      });
    }
  }
}

Advanced Patterns

Batch Processing with Progress Tracking

@QueueClass({
  defaultOptions: {
    group: "batch-jobs",
    weight: 8,
  },
})
class BatchProcessor {
  @task({
    id: "process-batch",
    backoff: {
      type: BackoffType.EXPONENTIAL,
      delay: 1000,
    },
  })
  async processBatch(items: any[]) {
    const progress = new ProgressTracker(items.length);
    const chunks = this.chunkArray(items, 100);
    
    for (const chunk of chunks) {
      await Promise.all(chunk.map(async item => {
        try {
          await this.processItem(item);
          progress.increment();
          await this.updateProgress(progress);
        } catch (error) {
          progress.recordError(error);
          throw error;
        }
      }));
    }
    
    return progress.getResults();
  }
}

Sequential Processing with Dependencies

@QueueClass({
  defaultOptions: {
    group: "sequential",
    maxRetries: 3,
  },
})
class SequentialProcessor {
  @task({
    id: "step-1",
    weight: 5,
  })
  async stepOne() {
    // First step implementation
    await this.notifyStepCompletion("step-1");
  }
 
  @task({
    id: "step-2",
    weight: 5,
    dependencies: ["step-1"],
  })
  async stepTwo() {
    // Second step implementation
    await this.notifyStepCompletion("step-2");
  }
 
  @task({
    id: "step-3",
    weight: 5,
    dependencies: ["step-2"],
  })
  async stepThree() {
    // Final step implementation
    await this.notifyProcessComplete();
  }
}

Best Practices Summary

  1. Group Configuration

    • Choose appropriate processing strategies
    • Configure concurrency and rate limits
    • Implement backoff strategies
    • Set up proper monitoring
  2. Resource Management

    • Use weights for resource allocation
    • Monitor group performance
    • Adjust resources dynamically
    • Implement proper cleanup
  3. Error Handling

    • Configure appropriate retry strategies
    • Implement comprehensive error tracking
    • Set up alerting for critical failures
    • Maintain error history
  4. Monitoring

    • Track group metrics
    • Monitor resource usage
    • Set up performance alerts
    • Maintain historical data
  5. Optimization

    • Regular performance review
    • Dynamic resource adjustment
    • Cleanup of completed tasks
    • Regular maintenance

Remember to:

  • Choose appropriate group strategies
  • Monitor group performance
  • Handle errors gracefully
  • Manage resources effectively
  • Maintain proper documentation

Happy task processing! 🚀

Fofsinx

Fofsinx

Software Engineer | Writer | Designer