Client-Server Integration
Complete guide to implementing Cleo's client-server architecture with NextJS clients and Bun workers. Learn how to separate task scheduling from processing for scalable distributed systems.
🏗️ Client-Server Integration Guide
Cleo v1.0.9 introduces a robust client-server architecture that separates task scheduling from task processing. This guide shows you how to implement distributed task processing using NextJS for clients and Bun for workers.
🎯 Architecture Overview
flowchart LR
Client[NextJS Client] -->|Schedule Tasks| Redis[(Redis Queue)]
Redis -->|Process Tasks| Worker[Bun Worker]
Worker -->|Results| Redis
Redis -->|Status Updates| Client
subgraph "Client Layer"
Client
API[API Routes]
Services[Service Classes]
end
subgraph "Processing Layer"
Worker
Handlers[Task Handlers]
Jobs[Background Jobs]
end
🔄 Key Concepts
Client Mode
- Purpose: Schedule and queue tasks
- Environment: NextJS applications, API routes, web services
- Responsibilities: Task creation, status monitoring, result retrieval
- No Processing: Tasks are queued but not executed locally
Worker Mode
- Purpose: Process tasks from queues
- Environment: Bun runtime, background services, separate processes
- Responsibilities: Task execution, error handling, result publishing
- Scalable: Multiple workers can process the same queues
📱 NextJS Client Implementation
Basic Client Setup
// lib/cleo-client.ts
import { CleoClient } from "@cleotasks/core";
const cleo = CleoClient.getInstance();
export async function initializeCleo() {
await cleo.configure({
redis: {
host: process.env.REDIS_HOST || "localhost",
port: parseInt(process.env.REDIS_PORT || "6379"),
password: process.env.REDIS_PASSWORD,
},
});
return cleo;
}
export { cleo };
Service Classes with Task Decorators
// services/email.service.ts
import { task } from "@cleotasks/core";
import { TaskPriority } from "@cleotasks/core";
export class EmailService {
@task({
priority: TaskPriority.HIGH,
queue: "email",
group: "notifications",
timeout: 30000,
maxRetries: 3,
})
async sendWelcomeEmail(userId: string, email: string): Promise<void> {
// This method will be queued, not executed immediately
console.log(`Queuing welcome email for ${email}`);
// The actual email sending happens in the worker
// This just schedules the task
}
@task({
priority: TaskPriority.MEDIUM,
queue: "email",
group: "notifications",
timeout: 60000,
})
async sendPasswordResetEmail(email: string, resetToken: string): Promise<void> {
console.log(`Queuing password reset email for ${email}`);
}
}
API Route Implementation
// app/api/emails/route.ts
import { EmailService } from "@/services/email.service";
import { initializeCleo } from "@/lib/cleo-client";
// Initialize Cleo client
await initializeCleo();
export async function POST(request: Request) {
try {
const { action, userId, email, resetToken } = await request.json();
const emailService = new EmailService();
switch (action) {
case "send-welcome":
// This queues the task - returns immediately
await emailService.sendWelcomeEmail(userId, email);
return Response.json({
success: true,
message: "Welcome email queued successfully"
});
case "send-password-reset":
await emailService.sendPasswordResetEmail(email, resetToken);
return Response.json({
success: true,
message: "Password reset email queued successfully"
});
default:
return Response.json({
success: false,
message: "Invalid action"
}, { status: 400 });
}
} catch (error) {
console.error("Error in API route:", error);
return Response.json({
success: false,
message: "Internal server error"
}, { status: 500 });
}
}
Direct Queue Management
// lib/task-manager.ts
import { cleo } from "@/lib/cleo-client";
import { TaskPriority } from "@cleotasks/core";
export async function scheduleCustomTask() {
const queueManager = cleo.getQueueManager();
// Schedule a custom task directly
const task = await queueManager.addTask(
"process-payment",
{
userId: "123",
amount: 99.99,
currency: "USD",
},
{
priority: TaskPriority.HIGH,
queue: "payments",
group: "financial",
timeout: 120000,
maxRetries: 5,
}
);
console.log("Custom task scheduled:", task.id);
return task;
}
export async function scheduleGroupTasks() {
const queueManager = cleo.getQueueManager();
// Schedule multiple related tasks
await queueManager.addTaskToGroup(
"send-welcome-email",
{
group: "user-onboarding",
priority: TaskPriority.HIGH,
},
{
userId: "123",
email: "user@example.com",
}
);
await queueManager.addTaskToGroup(
"create-user-profile",
{
group: "user-onboarding",
priority: TaskPriority.NORMAL,
},
{
userId: "123",
profileData: { name: "John Doe" },
}
);
console.log("Group tasks scheduled");
}
🔧 Bun Worker Implementation
Worker Setup and Configuration
// worker.ts
import { CleoWorker } from "@cleotasks/core";
import { logger } from "@cleotasks/core/utils";
const cleo = CleoWorker.getInstance();
async function main() {
try {
logger.info("🚀 Starting Cleo Worker");
// Configure the worker
await cleo.configure({
redis: {
host: process.env.REDIS_HOST || "localhost",
port: parseInt(process.env.REDIS_PORT || "6379"),
password: process.env.REDIS_PASSWORD,
},
worker: {
concurrency: 5, // Process up to 5 jobs concurrently
},
});
// Create and configure queues
const queueManager = cleo.getQueueManager();
await setupQueues(queueManager);
// Register all task handlers
registerTaskHandlers();
// Start processing
await cleo.startProcessing();
setupHealthChecks();
setupGracefulShutdown();
logger.info("✅ Cleo Worker started successfully");
} catch (error) {
logger.error("❌ Failed to start Cleo Worker", { error });
process.exit(1);
}
}
Task Handler Implementation
// handlers/email.handlers.ts
import { logger } from "@cleotasks/core/utils";
export class EmailTaskHandlers {
async sendWelcomeEmail(userId: string, email: string): Promise<void> {
logger.info("📧 Processing welcome email", {
userId,
email,
});
// Simulate email service integration
await this.sendEmailViaProvider({
to: email,
template: "welcome",
variables: { userId }
});
logger.info(`✅ Welcome email sent to ${email}`);
}
async sendPasswordResetEmail(email: string, resetToken: string): Promise<void> {
logger.info("🔐 Processing password reset email", { email });
await this.sendEmailViaProvider({
to: email,
template: "password-reset",
variables: { resetToken }
});
logger.info(`✅ Password reset email sent to ${email}`);
}
private async sendEmailViaProvider(emailData: any): Promise<void> {
// Integration with actual email service (SendGrid, AWS SES, etc.)
await new Promise(resolve => setTimeout(resolve, 1000));
// In production, replace with actual email service call:
// await emailProvider.send(emailData);
}
}
// handlers/payment.handlers.ts
export class PaymentTaskHandlers {
async processPayment(userId: string, amount: number, currency: string): Promise<void> {
logger.info("💳 Processing payment", {
userId,
amount,
currency,
});
// Payment processing logic
await this.chargePaymentMethod(userId, amount, currency);
await this.updateUserAccount(userId, amount);
await this.sendPaymentConfirmation(userId);
logger.info(`✅ Payment processed: ${amount} ${currency} for user ${userId}`);
}
private async chargePaymentMethod(userId: string, amount: number, currency: string): Promise<void> {
// Integration with payment processor (Stripe, PayPal, etc.)
await new Promise(resolve => setTimeout(resolve, 2000));
}
private async updateUserAccount(userId: string, amount: number): Promise<void> {
// Update user account balance or purchase history
await new Promise(resolve => setTimeout(resolve, 500));
}
private async sendPaymentConfirmation(userId: string): Promise<void> {
// Send confirmation email or notification
await new Promise(resolve => setTimeout(resolve, 300));
}
}
Handler Registration
// worker.ts (continued)
function registerTaskHandlers() {
const emailHandlers = new EmailTaskHandlers();
const paymentHandlers = new PaymentTaskHandlers();
// Register email task handlers
cleo.registerTaskHandler(
"send-welcome-email",
emailHandlers.sendWelcomeEmail.bind(emailHandlers),
"email"
);
cleo.registerTaskHandler(
"send-password-reset-email",
emailHandlers.sendPasswordResetEmail.bind(emailHandlers),
"email"
);
// Register payment task handlers
cleo.registerTaskHandler(
"process-payment",
paymentHandlers.processPayment.bind(paymentHandlers),
"payments"
);
logger.info("🎯 All task handlers registered");
}
async function setupQueues(queueManager: any) {
// Create queues with specific configurations
await queueManager.createQueue("email", {
connection: {
host: process.env.REDIS_HOST || "localhost",
port: parseInt(process.env.REDIS_PORT || "6379"),
password: process.env.REDIS_PASSWORD,
},
});
await queueManager.createQueue("payments", {
connection: {
host: process.env.REDIS_HOST || "localhost",
port: parseInt(process.env.REDIS_PORT || "6379"),
password: process.env.REDIS_PASSWORD,
},
});
}
Health Monitoring and Graceful Shutdown
// worker.ts (continued)
async function healthCheck() {
try {
const workers = await cleo.getAllWorkersStatus();
logger.info("🏥 Worker health check", {
activeWorkers: workers.length,
workers: workers.map(w => ({
id: w.id,
queue: w.queue,
status: w.status,
activeTasks: w.activeTasks.length,
})),
});
return true;
} catch (error) {
logger.error("❌ Health check failed", { error });
return false;
}
}
function setupHealthChecks() {
// Regular health checks every 30 seconds
setInterval(healthCheck, 30000);
}
function setupGracefulShutdown() {
async function gracefulShutdown(signal: string) {
logger.info(`🛑 Received ${signal}, shutting down gracefully...`);
try {
await cleo.stopProcessing();
logger.info("✅ Worker shutdown complete");
process.exit(0);
} catch (error) {
logger.error("❌ Error during shutdown", { error });
process.exit(1);
}
}
process.on("SIGINT", () => gracefulShutdown("SIGINT"));
process.on("SIGTERM", () => gracefulShutdown("SIGTERM"));
}
// Start the worker if this file is run directly
if (import.meta.main) {
main();
}
🚀 Deployment Setup
Package.json Scripts
{
"scripts": {
"start:worker": "bun run worker.ts",
"start:client": "next dev",
"build:worker": "bun build worker.ts --outdir dist",
"build:client": "next build",
"dev:worker": "bun --watch worker.ts",
"dev:client": "next dev"
}
}
Environment Configuration
# .env
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your-redis-password
# For production
NODE_ENV=production
Docker Compose Example
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
command: redis-server --requirepass yourpassword
nextjs-client:
build:
context: .
dockerfile: Dockerfile.client
ports:
- "3000:3000"
environment:
- REDIS_HOST=redis
- REDIS_PASSWORD=yourpassword
depends_on:
- redis
bun-worker:
build:
context: .
dockerfile: Dockerfile.worker
environment:
- REDIS_HOST=redis
- REDIS_PASSWORD=yourpassword
depends_on:
- redis
deploy:
replicas: 2 # Scale workers as needed
📊 Monitoring and Observability
Task Status Monitoring
// lib/task-monitor.ts
import { cleo } from "@/lib/cleo-client";
export async function getTaskStatus(taskId: string) {
const queueManager = cleo.getQueueManager();
const taskStatus = await queueManager.getTaskStatus(taskId);
return {
id: taskId,
status: taskStatus.status,
progress: taskStatus.progress,
result: taskStatus.result,
error: taskStatus.error,
createdAt: taskStatus.createdAt,
completedAt: taskStatus.completedAt,
};
}
export async function getQueueMetrics(queueName: string) {
const queueManager = cleo.getQueueManager();
const metrics = await queueManager.getQueueMetrics(queueName);
return {
queueName,
waiting: metrics.waiting,
active: metrics.active,
completed: metrics.completed,
failed: metrics.failed,
throughput: metrics.throughput,
};
}
Real-time Updates
// hooks/useTaskStatus.ts
import { useEffect, useState } from 'react';
import { cleo } from '@/lib/cleo-client';
export function useTaskStatus(taskId: string) {
const [status, setStatus] = useState(null);
useEffect(() => {
const queueManager = cleo.getQueueManager();
// Subscribe to task updates
const unsubscribe = queueManager.onTaskEvent(
'TASK_STATUS_CHANGE',
(id, newStatus, data) => {
if (id === taskId) {
setStatus({ status: newStatus, data });
}
}
);
return unsubscribe;
}, [taskId]);
return status;
}
🔧 Best Practices
1. Task Design
- Keep tasks idempotent - safe to retry
- Use small payloads - avoid large data in task arguments
- Implement proper error handling in task handlers
- Use timeout values appropriate for task complexity
2. Queue Organization
- Separate queues by priority and type
- Use groups for related tasks
- Configure appropriate concurrency levels
- Monitor queue depth and processing rates
3. Error Handling
- Implement exponential backoff for retries
- Log detailed error information for debugging
- Use dead letter queues for failed tasks
- Set up alerting for high failure rates
4. Scaling
- Horizontal scaling: Add more worker processes
- Vertical scaling: Increase worker concurrency
- Queue partitioning: Distribute load across multiple Redis instances
- Geographic distribution: Deploy workers closer to data sources
5. Security
- Use Redis AUTH and TLS for production
- Implement task validation in handlers
- Sanitize inputs before processing
- Use environment variables for secrets
🚀 Next Steps
Now that you understand the client-server architecture:
- Set up your development environment with Redis
- Create your first client-worker pair using the examples
- Implement monitoring for production readiness
- Scale your workers based on load requirements
- Explore advanced features like task groups and priorities
For more advanced topics, check out:
- Best Practices - Production deployment guidelines
- Scaling Guide - Performance optimization strategies
- API Reference - Complete API documentation