Advanced

实现Job Queues and Async Processing:Claude Code 实战指南

了解implementing job queues and async processing:Claude Code 实战. 包含实用代码示例。

作业队列の役割

Web应用では、メール发送、图片処理、レポート生成など时间のかかる処理をバックグラウンドで実行需要。作业队列を使えば、这らの処理を可靠性高く异步実行可以。

BullMQ のセットアップ

import { Queue, Worker, Job } from "bullmq";
import { Redis } from "ioredis";

const connection = new Redis(process.env.REDIS_URL!, {
  maxRetriesPerRequest: null,
});

// 队列の定義
const emailQueue = new Queue("email", { connection });
const imageQueue = new Queue("image-processing", { connection });
const reportQueue = new Queue("report-generation", { connection });

作业の投入

// メール发送作业
interface EmailJob {
  to: string;
  subject: string;
  template: string;
  data: Record<string, unknown>;
}

async function sendEmail(payload: EmailJob) {
  await emailQueue.add("send", payload, {
    attempts: 3,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
    removeOnComplete: { count: 1000 },
    removeOnFail: { count: 5000 },
  });
}

// 優先度付き作业
async function processImage(imageUrl: string, priority: number = 0) {
  await imageQueue.add(
    "resize",
    {
      url: imageUrl,
      sizes: [
        { width: 320, suffix: "sm" },
        { width: 768, suffix: "md" },
        { width: 1280, suffix: "lg" },
      ],
    },
    {
      priority,
      attempts: 2,
      timeout: 60000,
    }
  );
}

// 遅延作业(スケジュール実行)
async function scheduleReport(userId: string) {
  await reportQueue.add(
    "weekly",
    { userId },
    {
      delay: 60 * 60 * 1000, // 1时间後に実行
      repeat: {
        pattern: "0 9 * * 1", // 毎週月曜9:00
      },
    }
  );
}

ワーカーの实现

// メール发送ワーカー
const emailWorker = new Worker<EmailJob>(
  "email",
  async (job: Job<EmailJob>) => {
    const { to, subject, template, data } = job.data;

    console.log(`Processing email job ${job.id}: ${subject} -> ${to}`);

    // 模板を渲染
    const html = await renderTemplate(template, data);

    // メール发送
    await transporter.sendMail({
      from: "[email protected]",
      to,
      subject,
      html,
    });

    return { sent: true, to };
  },
  {
    connection,
    concurrency: 5,
    limiter: {
      max: 10,
      duration: 1000, // 1秒に10件まで
    },
  }
);

// 事件ハンドリング
emailWorker.on("completed", (job) => {
  console.log(`Email sent: ${job.id}`);
});

emailWorker.on("failed", (job, error) => {
  console.error(`Email failed: ${job?.id}`, error.message);
});

错误处理とリトライ

const imageWorker = new Worker(
  "image-processing",
  async (job) => {
    try {
      const { url, sizes } = job.data;

      // 進捗報告
      await job.updateProgress(0);

      const results = [];
      for (let i = 0; i < sizes.length; i++) {
        const resized = await resizeImage(url, sizes[i]);
        results.push(resized);
        await job.updateProgress(((i + 1) / sizes.length) * 100);
      }

      return { results };
    } catch (error) {
      // リトライ可能な错误かどうか判定
      if (error instanceof NetworkError) {
        throw error; // BullMQがリトライ
      }

      // リトライ不要な错误
      throw new UnrecoverableError(
        `Invalid image: ${error.message}`
      );
    }
  },
  {
    connection,
    concurrency: 3,
  }
);

フローの实现(流水线)

import { FlowProducer } from "bullmq";

const flowProducer = new FlowProducer({ connection });

// 文章公開フロー
async function publishPostFlow(postId: string) {
  await flowProducer.add({
    name: "publish-post",
    queueName: "orchestration",
    data: { postId },
    children: [
      {
        name: "generate-og-image",
        queueName: "image-processing",
        data: { postId, type: "og-image" },
      },
      {
        name: "send-notifications",
        queueName: "email",
        data: { postId, type: "new-post" },
      },
      {
        name: "update-sitemap",
        queueName: "seo",
        data: { postId },
      },
    ],
  });
}

监控

import { QueueEvents } from "bullmq";

const queueEvents = new QueueEvents("email", { connection });

// メトリクス収集
async function getQueueMetrics(queue: Queue) {
  const [waiting, active, completed, failed, delayed] =
    await Promise.all([
      queue.getWaitingCount(),
      queue.getActiveCount(),
      queue.getCompletedCount(),
      queue.getFailedCount(),
      queue.getDelayedCount(),
    ]);

  return { waiting, active, completed, failed, delayed };
}

// 定期的にメトリクスを输出
setInterval(async () => {
  const metrics = await getQueueMetrics(emailQueue);
  console.log("Email queue metrics:", metrics);
}, 30000);

通过 Claude Codeの活用

作业队列の实现を让 Claude Code依頼する例です。Redisの设计相关内容请参阅Redis缓存设计、事件駆動の设计は事件駆動架构也请参阅。

BullMQでバックグラウンドジョブを実装して。
- メール送信キュー(レート制限付き)
- 画像リサイズキュー(進捗報告付き)
- 記事公開のフロー(複数ジョブの連携)
- リトライとエラーハンドリング
- ダッシュボード用のメトリクスAPI

BullMQ的详细信息请参阅BullMQ官方文档。Claude Codeの使い方は官方文档中可以查看。

总结

作业队列は生产环境の应用に不可欠な基础设施です。借助 Claude Code,BullMQのセットアップからリトライ戦略、监控まで一貫した异步処理基盤を构建可以。

#Claude Code #ジョブキュー #非同期処理 #BullMQ #Redis