Advanced

Implementing Job Queues and Async Processing: Claude Code 활용 가이드

implementing job queues and async processing: Claude Code 활용. 실용적인 코드 예시를 포함합니다.

잡큐の役割

Web애플리케이션では、メール전송、이미지処理、レポート생성など시간のかかる処理をバックグラウンドで実行해야 합니다。잡큐を使えば、이것らの処理を신뢰성高く비동기実行할 수 있습니다。

BullMQ のセットアップ

import { Queue, Worker, Job } from "bullmq";
import { Redis } from "ioredis";

const connection = new Redis(process.env.REDIS_URL!, {
  maxRetriesPerRequest: null,
});

// 큐の定義
const emailQueue = new Queue("email", { connection });
const imageQueue = new Queue("image-processing", { connection });
const reportQueue = new Queue("report-generation", { connection });

잡の投入

// メール전송잡
interface EmailJob {
  to: string;
  subject: string;
  template: string;
  data: Record<string, unknown>;
}

async function sendEmail(payload: EmailJob) {
  await emailQueue.add("send", payload, {
    attempts: 3,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
    removeOnComplete: { count: 1000 },
    removeOnFail: { count: 5000 },
  });
}

// 優先度付き잡
async function processImage(imageUrl: string, priority: number = 0) {
  await imageQueue.add(
    "resize",
    {
      url: imageUrl,
      sizes: [
        { width: 320, suffix: "sm" },
        { width: 768, suffix: "md" },
        { width: 1280, suffix: "lg" },
      ],
    },
    {
      priority,
      attempts: 2,
      timeout: 60000,
    }
  );
}

// 遅延잡(スケジュール実行)
async function scheduleReport(userId: string) {
  await reportQueue.add(
    "weekly",
    { userId },
    {
      delay: 60 * 60 * 1000, // 1시간後に実行
      repeat: {
        pattern: "0 9 * * 1", // 毎週月曜9:00
      },
    }
  );
}

ワーカーの구현

// メール전송ワーカー
const emailWorker = new Worker<EmailJob>(
  "email",
  async (job: Job<EmailJob>) => {
    const { to, subject, template, data } = job.data;

    console.log(`Processing email job ${job.id}: ${subject} -> ${to}`);

    // 템플릿を렌더링
    const html = await renderTemplate(template, data);

    // メール전송
    await transporter.sendMail({
      from: "[email protected]",
      to,
      subject,
      html,
    });

    return { sent: true, to };
  },
  {
    connection,
    concurrency: 5,
    limiter: {
      max: 10,
      duration: 1000, // 1秒に10件まで
    },
  }
);

// 이벤트ハンドリング
emailWorker.on("completed", (job) => {
  console.log(`Email sent: ${job.id}`);
});

emailWorker.on("failed", (job, error) => {
  console.error(`Email failed: ${job?.id}`, error.message);
});

에러 핸들링とリトライ

const imageWorker = new Worker(
  "image-processing",
  async (job) => {
    try {
      const { url, sizes } = job.data;

      // 進捗報告
      await job.updateProgress(0);

      const results = [];
      for (let i = 0; i < sizes.length; i++) {
        const resized = await resizeImage(url, sizes[i]);
        results.push(resized);
        await job.updateProgress(((i + 1) / sizes.length) * 100);
      }

      return { results };
    } catch (error) {
      // リトライ可能な에러かどうか判定
      if (error instanceof NetworkError) {
        throw error; // BullMQがリトライ
      }

      // リトライ不要な에러
      throw new UnrecoverableError(
        `Invalid image: ${error.message}`
      );
    }
  },
  {
    connection,
    concurrency: 3,
  }
);

フローの구현(파이프라인)

import { FlowProducer } from "bullmq";

const flowProducer = new FlowProducer({ connection });

// 글公開フロー
async function publishPostFlow(postId: string) {
  await flowProducer.add({
    name: "publish-post",
    queueName: "orchestration",
    data: { postId },
    children: [
      {
        name: "generate-og-image",
        queueName: "image-processing",
        data: { postId, type: "og-image" },
      },
      {
        name: "send-notifications",
        queueName: "email",
        data: { postId, type: "new-post" },
      },
      {
        name: "update-sitemap",
        queueName: "seo",
        data: { postId },
      },
    ],
  });
}

모니터링

import { QueueEvents } from "bullmq";

const queueEvents = new QueueEvents("email", { connection });

// メトリクス収集
async function getQueueMetrics(queue: Queue) {
  const [waiting, active, completed, failed, delayed] =
    await Promise.all([
      queue.getWaitingCount(),
      queue.getActiveCount(),
      queue.getCompletedCount(),
      queue.getFailedCount(),
      queue.getDelayedCount(),
    ]);

  return { waiting, active, completed, failed, delayed };
}

// 定期的にメトリクスを출력
setInterval(async () => {
  const metrics = await getQueueMetrics(emailQueue);
  console.log("Email queue metrics:", metrics);
}, 30000);

Claude Code로の활용

잡큐の구현をClaude Code에依頼する例です。Redisの설계에 대해서는Redis캐시설계、이벤트駆動の설계は이벤트駆動아키텍처도 참고하세요.

BullMQでバックグラウンドジョブを実装して。
- メール送信キュー(レート制限付き)
- 画像リサイズキュー(進捗報告付き)
- 記事公開のフロー(複数ジョブの連携)
- リトライとエラーハンドリング
- ダッシュボード用のメトリクスAPI

BullMQ의 상세 정보는BullMQ공식 문서를 참고하세요.Claude Codeの使い方は공식 문서에서 확인할 수 있습니다.

정리

잡큐は프로덕션 환경の애플리케이션に不可欠な인프라です。Claude Code를 활용하면 BullMQのセットアップからリトライ戦略、모니터링まで一貫した비동기処理基盤を구축할 수 있습니다。

#Claude Code #ジョブキュー #非同期処理 #BullMQ #Redis