Advanced

Claude Code के साथ Implementing Job Queues and Async Processing

Claude Code का उपयोग करके implementing job queues and async processing सीखें। Practical code examples शामिल हैं।

jobqueueの役割

Webapplicationでは、メール送信、画像processing、レポートgenerate आदि時बीचのかかるprocessingをバックグラウンドで実行するज़रूरीがあり है।jobqueueを使えば、これらのprocessingを信頼性高くasync実行でき है।

BullMQ のsetup

import { Queue, Worker, Job } from "bullmq";
import { Redis } from "ioredis";

const connection = new Redis(process.env.REDIS_URL!, {
  maxRetriesPerRequest: null,
});

// queueの定義
const emailQueue = new Queue("email", { connection });
const imageQueue = new Queue("image-processing", { connection });
const reportQueue = new Queue("report-generation", { connection });

jobの投入

// メール送信job
interface EmailJob {
  to: string;
  subject: string;
  template: string;
  data: Record<string, unknown>;
}

async function sendEmail(payload: EmailJob) {
  await emailQueue.add("send", payload, {
    attempts: 3,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
    removeOnComplete: { count: 1000 },
    removeOnFail: { count: 5000 },
  });
}

// 優先度付きjob
async function processImage(imageUrl: string, priority: number = 0) {
  await imageQueue.add(
    "resize",
    {
      url: imageUrl,
      sizes: [
        { width: 320, suffix: "sm" },
        { width: 768, suffix: "md" },
        { width: 1280, suffix: "lg" },
      ],
    },
    {
      priority,
      attempts: 2,
      timeout: 60000,
    }
  );
}

// 遅延job(schedule実行)
async function scheduleReport(userId: string) {
  await reportQueue.add(
    "weekly",
    { userId },
    {
      delay: 60 * 60 * 1000, // 1時बीचबादに実行
      repeat: {
        pattern: "0 9 * * 1", // 毎週月曜9:00
      },
    }
  );
}

workerのimplementation

// メール送信worker
const emailWorker = new Worker<EmailJob>(
  "email",
  async (job: Job<EmailJob>) => {
    const { to, subject, template, data } = job.data;

    console.log(`Processing email job ${job.id}: ${subject} -> ${to}`);

    // templateをレンダリング
    const html = await renderTemplate(template, data);

    // メール送信
    await transporter.sendMail({
      from: "[email protected]",
      to,
      subject,
      html,
    });

    return { sent: true, to };
  },
  {
    connection,
    concurrency: 5,
    limiter: {
      max: 10,
      duration: 1000, // 1秒に10件 तक
    },
  }
);

// eventハンドリング
emailWorker.on("completed", (job) => {
  console.log(`Email sent: ${job.id}`);
});

emailWorker.on("failed", (job, error) => {
  console.error(`Email failed: ${job?.id}`, error.message);
});

error handlingとリトライ

const imageWorker = new Worker(
  "image-processing",
  async (job) => {
    try {
      const { url, sizes } = job.data;

      // 進捗報告
      await job.updateProgress(0);

      const results = [];
      for (let i = 0; i < sizes.length; i++) {
        const resized = await resizeImage(url, sizes[i]);
        results.push(resized);
        await job.updateProgress(((i + 1) / sizes.length) * 100);
      }

      return { results };
    } catch (error) {
      // リトライpossibleなerrorかどうか判定
      if (error instanceof NetworkError) {
        throw error; // BullMQがリトライ
      }

      // リトライ不要なerror
      throw new UnrecoverableError(
        `Invalid image: ${error.message}`
      );
    }
  },
  {
    connection,
    concurrency: 3,
  }
);

フローのimplementation(pipeline)

import { FlowProducer } from "bullmq";

const flowProducer = new FlowProducer({ connection });

// 記事公開フロー
async function publishPostFlow(postId: string) {
  await flowProducer.add({
    name: "publish-post",
    queueName: "orchestration",
    data: { postId },
    children: [
      {
        name: "generate-og-image",
        queueName: "image-processing",
        data: { postId, type: "og-image" },
      },
      {
        name: "send-notifications",
        queueName: "email",
        data: { postId, type: "new-post" },
      },
      {
        name: "update-sitemap",
        queueName: "seo",
        data: { postId },
      },
    ],
  });
}

monitoring

import { QueueEvents } from "bullmq";

const queueEvents = new QueueEvents("email", { connection });

// メトリクス収集
async function getQueueMetrics(queue: Queue) {
  const [waiting, active, completed, failed, delayed] =
    await Promise.all([
      queue.getWaitingCount(),
      queue.getActiveCount(),
      queue.getCompletedCount(),
      queue.getFailedCount(),
      queue.getDelayedCount(),
    ]);

  return { waiting, active, completed, failed, delayed };
}

// 定期的にメトリクスを出力
setInterval(async () => {
  const metrics = await getQueueMetrics(emailQueue);
  console.log("Email queue metrics:", metrics);
}, 30000);

Claude Code सेのutilization

jobqueueのimplementationをClaude Code को requestする例 है।Redisの設計के बारे मेंはRediscache設計、event駆動の設計はevent駆動アーキテクチャもदेखें。

BullMQでバックグラウンドjobをimplement करो。
- メール送信queue(レート制限付き)
- 画像リsizequeue(進捗報告付き)
- 記事公開のフロー(複数jobのintegration)
- リトライとerror handling
- dashboard用のメトリクスAPI

BullMQके details के लिएBullMQofficial documentationをदेखें。Claude Codeのuse करने का तरीकाはofficial documentationでconfirmでき है।

Summary

jobqueueは本番環境のapplicationに不可欠なインフラ है।Claude Code का उपयोग करके、BullMQのsetup सेリトライ戦略、monitoring तक一貫したasyncprocessing基盤 build किया जा सकता है。

#Claude Code #jobqueue #asyncprocessing #BullMQ #Redis