Advanced

Mengimplementasikan Job Queues and Async Processing dengan Claude Code

Learn about implementing job queues and async processing using Claude Code. Includes practical code examples.

役割 ジョブキュー

Webaplikasi 、メールpengiriman、gambarpemrosesan、レポートgenerate dll.時間 かかるpemrosesan バックグラウンド 実行 diperlukan あり.ジョブキュー 使えば、これら pemrosesan reliabilitas高く非同期実行 bisa dilakukan.

セットアップ BullMQ

import { Queue, Worker, Job } from "bullmq";
import { Redis } from "ioredis";

const connection = new Redis(process.env.REDIS_URL!, {
  maxRetriesPerRequest: null,
});

// キュー definisi
const emailQueue = new Queue("email", { connection });
const imageQueue = new Queue("image-processing", { connection });
const reportQueue = new Queue("report-generation", { connection });

投入 ジョブ

// メールpengirimanジョブ
interface EmailJob {
  to: string;
  subject: string;
  template: string;
  data: Record<string, unknown>;
}

async function sendEmail(payload: EmailJob) {
  await emailQueue.add("send", payload, {
    attempts: 3,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
    removeOnComplete: { count: 1000 },
    removeOnFail: { count: 5000 },
  });
}

// 優先度付きジョブ
async function processImage(imageUrl: string, priority: number = 0) {
  await imageQueue.add(
    "resize",
    {
      url: imageUrl,
      sizes: [
        { width: 320, suffix: "sm" },
        { width: 768, suffix: "md" },
        { width: 1280, suffix: "lg" },
      ],
    },
    {
      priority,
      attempts: 2,
      timeout: 60000,
    }
  );
}

// 遅延ジョブ(スケジュール実行)
async function scheduleReport(userId: string) {
  await reportQueue.add(
    "weekly",
    { userId },
    {
      delay: 60 * 60 * 1000, // 1時間後 実行
      repeat: {
        pattern: "0 9 * * 1", // 毎週月曜9:00
      },
    }
  );
}

implementasi ワーカー

// メールpengirimanワーカー
const emailWorker = new Worker<EmailJob>(
  "email",
  async (job: Job<EmailJob>) => {
    const { to, subject, template, data } = job.data;

    console.log(`Processing email job ${job.id}: ${subject} -> ${to}`);

    // template rendering
    const html = await renderTemplate(template, data);

    // メールpengiriman
    await transporter.sendMail({
      from: "[email protected]",
      to,
      subject,
      html,
    });

    return { sent: true, to };
  },
  {
    connection,
    concurrency: 5,
    limiter: {
      max: 10,
      duration: 1000, // 1秒 10件ま
    },
  }
);

// eventハンドリング
emailWorker.on("completed", (job) => {
  console.log(`Email sent: ${job.id}`);
});

emailWorker.on("failed", (job, error) => {
  console.error(`Email failed: ${job?.id}`, error.message);
});

errorハンドリングとリトライ

const imageWorker = new Worker(
  "image-processing",
  async (job) => {
    try {
      const { url, sizes } = job.data;

      // 進捗報告
      await job.updateProgress(0);

      const results = [];
      for (let i = 0; i < sizes.length; i++) {
        const resized = await resizeImage(url, sizes[i]);
        results.push(resized);
        await job.updateProgress(((i + 1) / sizes.length) * 100);
      }

      return { results };
    } catch (error) {
      // リトライdimungkinkanなerrorかどうか判定
      if (error instanceof NetworkError) {
        throw error; // BullMQ リトライ
      }

      // リトライ不要なerror
      throw new UnrecoverableError(
        `Invalid image: ${error.message}`
      );
    }
  },
  {
    connection,
    concurrency: 3,
  }
);

implementasi(pipeline) フロー

import { FlowProducer } from "bullmq";

const flowProducer = new FlowProducer({ connection });

// artikelpublikasiフロー
async function publishPostFlow(postId: string) {
  await flowProducer.add({
    name: "publish-post",
    queueName: "orchestration",
    data: { postId },
    children: [
      {
        name: "generate-og-image",
        queueName: "image-processing",
        data: { postId, type: "og-image" },
      },
      {
        name: "send-notifications",
        queueName: "email",
        data: { postId, type: "new-post" },
      },
      {
        name: "update-sitemap",
        queueName: "seo",
        data: { postId },
      },
    ],
  });
}

monitoring

import { QueueEvents } from "bullmq";

const queueEvents = new QueueEvents("email", { connection });

// メトリクス収集
async function getQueueMetrics(queue: Queue) {
  const [waiting, active, completed, failed, delayed] =
    await Promise.all([
      queue.getWaitingCount(),
      queue.getActiveCount(),
      queue.getCompletedCount(),
      queue.getFailedCount(),
      queue.getDelayedCount(),
    ]);

  return { waiting, active, completed, failed, delayed };
}

// 定期的 メトリクス output
setInterval(async () => {
  const metrics = await getQueueMetrics(emailQueue);
  console.log("Email queue metrics:", metrics);
}, 30000);

Pemanfaatan dengan Claude Code

ジョブキュー implementasi Claude Code 依頼 例.Redis 設計 mengenai Rediscache設計、event駆動 設計 event駆動arsitektur juga bisa dijadikan referensi.

BullMQ dengan バックグラウンドジョブ implementasikan.
- メールpengirimanキュー(レート制限付き)
- gambarリサイズキュー(進捗報告付き)
- artikelpublikasi フロー(複数ジョブ integrasi)
- リトライ dan errorハンドリング
- dashboard用 メトリクスAPI

BullMQ 詳細 BullMQ公式dokumen silakan lihat.Claude Code 使い方 公式dokumen konfirmasi bisa dilakukan.

Summary

ジョブキュー environment produksi aplikasi 不可欠なinfra.Claude Code 使えば、BullMQ セットアップ dari リトライ戦略、monitoringま 一貫 非同期pemrosesanfondasi pembangunan bisa dilakukan.

#Claude Code #ジョブキュー #非同期pemrosesan #BullMQ #Redis