Advanced

Claude Codeでジョブキュー・非同期処理を実装する

Claude Codeを使ったジョブキューと非同期処理の実装パターンを解説。BullMQ、リトライ戦略、優先度制御、モニタリングまで実践的なコード例付き。

ジョブキューの役割

Webアプリケーションでは、メール送信、画像処理、レポート生成など時間のかかる処理をバックグラウンドで実行する必要があります。ジョブキューを使えば、これらの処理を信頼性高く非同期実行できます。

BullMQ のセットアップ

import { Queue, Worker, Job } from "bullmq";
import { Redis } from "ioredis";

const connection = new Redis(process.env.REDIS_URL!, {
  maxRetriesPerRequest: null,
});

// キューの定義
const emailQueue = new Queue("email", { connection });
const imageQueue = new Queue("image-processing", { connection });
const reportQueue = new Queue("report-generation", { connection });

ジョブの投入

// メール送信ジョブ
interface EmailJob {
  to: string;
  subject: string;
  template: string;
  data: Record<string, unknown>;
}

async function sendEmail(payload: EmailJob) {
  await emailQueue.add("send", payload, {
    attempts: 3,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
    removeOnComplete: { count: 1000 },
    removeOnFail: { count: 5000 },
  });
}

// 優先度付きジョブ
async function processImage(imageUrl: string, priority: number = 0) {
  await imageQueue.add(
    "resize",
    {
      url: imageUrl,
      sizes: [
        { width: 320, suffix: "sm" },
        { width: 768, suffix: "md" },
        { width: 1280, suffix: "lg" },
      ],
    },
    {
      priority,
      attempts: 2,
      timeout: 60000,
    }
  );
}

// 遅延ジョブ(スケジュール実行)
async function scheduleReport(userId: string) {
  await reportQueue.add(
    "weekly",
    { userId },
    {
      delay: 60 * 60 * 1000, // 1時間後に実行
      repeat: {
        pattern: "0 9 * * 1", // 毎週月曜9:00
      },
    }
  );
}

ワーカーの実装

// メール送信ワーカー
const emailWorker = new Worker<EmailJob>(
  "email",
  async (job: Job<EmailJob>) => {
    const { to, subject, template, data } = job.data;

    console.log(`Processing email job ${job.id}: ${subject} -> ${to}`);

    // テンプレートをレンダリング
    const html = await renderTemplate(template, data);

    // メール送信
    await transporter.sendMail({
      from: "[email protected]",
      to,
      subject,
      html,
    });

    return { sent: true, to };
  },
  {
    connection,
    concurrency: 5,
    limiter: {
      max: 10,
      duration: 1000, // 1秒に10件まで
    },
  }
);

// イベントハンドリング
emailWorker.on("completed", (job) => {
  console.log(`Email sent: ${job.id}`);
});

emailWorker.on("failed", (job, error) => {
  console.error(`Email failed: ${job?.id}`, error.message);
});

エラーハンドリングとリトライ

const imageWorker = new Worker(
  "image-processing",
  async (job) => {
    try {
      const { url, sizes } = job.data;

      // 進捗報告
      await job.updateProgress(0);

      const results = [];
      for (let i = 0; i < sizes.length; i++) {
        const resized = await resizeImage(url, sizes[i]);
        results.push(resized);
        await job.updateProgress(((i + 1) / sizes.length) * 100);
      }

      return { results };
    } catch (error) {
      // リトライ可能なエラーかどうか判定
      if (error instanceof NetworkError) {
        throw error; // BullMQがリトライ
      }

      // リトライ不要なエラー
      throw new UnrecoverableError(
        `Invalid image: ${error.message}`
      );
    }
  },
  {
    connection,
    concurrency: 3,
  }
);

フローの実装(パイプライン)

import { FlowProducer } from "bullmq";

const flowProducer = new FlowProducer({ connection });

// 記事公開フロー
async function publishPostFlow(postId: string) {
  await flowProducer.add({
    name: "publish-post",
    queueName: "orchestration",
    data: { postId },
    children: [
      {
        name: "generate-og-image",
        queueName: "image-processing",
        data: { postId, type: "og-image" },
      },
      {
        name: "send-notifications",
        queueName: "email",
        data: { postId, type: "new-post" },
      },
      {
        name: "update-sitemap",
        queueName: "seo",
        data: { postId },
      },
    ],
  });
}

モニタリング

import { QueueEvents } from "bullmq";

const queueEvents = new QueueEvents("email", { connection });

// メトリクス収集
async function getQueueMetrics(queue: Queue) {
  const [waiting, active, completed, failed, delayed] =
    await Promise.all([
      queue.getWaitingCount(),
      queue.getActiveCount(),
      queue.getCompletedCount(),
      queue.getFailedCount(),
      queue.getDelayedCount(),
    ]);

  return { waiting, active, completed, failed, delayed };
}

// 定期的にメトリクスを出力
setInterval(async () => {
  const metrics = await getQueueMetrics(emailQueue);
  console.log("Email queue metrics:", metrics);
}, 30000);

Claude Codeでの活用

ジョブキューの実装をClaude Codeに依頼する例です。Redisの設計についてはRedisキャッシュ設計、イベント駆動の設計はイベント駆動アーキテクチャも参照してください。

BullMQでバックグラウンドジョブを実装して。
- メール送信キュー(レート制限付き)
- 画像リサイズキュー(進捗報告付き)
- 記事公開のフロー(複数ジョブの連携)
- リトライとエラーハンドリング
- ダッシュボード用のメトリクスAPI

BullMQの詳細はBullMQ公式ドキュメントを参照してください。Claude Codeの使い方は公式ドキュメントで確認できます。

まとめ

ジョブキューは本番環境のアプリケーションに不可欠なインフラです。Claude Codeを使えば、BullMQのセットアップからリトライ戦略、モニタリングまで一貫した非同期処理基盤を構築できます。

#Claude Code #ジョブキュー #非同期処理 #BullMQ #Redis