Queues và Durable Objects: async messaging và single-writer state

2 primitive khó nhất khi Worker cần state. Queues cho fire-and-forget job với retry và DLQ. Durable Objects cho single-writer coordination. Khi nào dùng cái nào, pattern và gotcha.

· 7 phút đọc · Read in English
Phân biệt Queues và Durable Objects: Queues fire-and-forget với ack/retry/DLQ cho job async, Durable Objects single-writer actor với storage SQL riêng, strong consistency và WebSocket hibernation

TL;DR

Queues: messaging bất đồng bộ fire-and-forget. Producer send(), consumer xử lý batch với ack() / retry(), dead letter queue cho message fail sau N lần.

Durable Objects (DO): single-writer actor. Mỗi ID hash tới 1 instance chạy trong 1 isolate duy nhất, có storage SQL riêng, strong consistency, WebSocket hibernation.

Luận điểm chính:

Dùng Queues khi xử lý có thể trễ (giây-phút). Dùng DO khi cần state nhất quán giữa nhiều request hoặc kết nối (counter, chat room, lock, rate limiter, session). Gộp chung khi quy trình có cả 2: request đến DO để điều phối, task nặng đẩy qua Queue.

Bài này đi qua: luồng Queues (producer, broker, consumer, retry, DLQ), mô hình DO (id, stub, instance, storage, hibernation), 6 pattern thực tế, gotcha từ blog này.


Dành cho ai

  • Dev đang xây quy trình có task nền (email digest, resize ảnh, phát lại webhook).
  • Người cần rate limiter, counter, WebSocket server, quản lý session.
  • Ai đã dùng Lambda + SQS và muốn tương đương trên Workers.

Nên đọc trước: Part 2 (môi trường chạy + giới hạn thời gian CPU), Part 3 (khi nào dùng primitive nào), Part 6 (D1 cho state dùng chung).

Sau bài này bạn sẽ:

  • Viết producer + consumer Queue cơ bản.
  • Tạo DO đầu tiên cho counter / rate limiter.
  • Quyết định Queue vs DO vs cả 2.
  • Hiểu giới hạn và gotcha production.

Bài này không nói về gì

  • WebSocket hibernation API chi tiết: Part 15 (Durable Objects realtime).
  • Tối ưu batch Queue nâng cao: ví dụ cơ bản, Part 17 sâu hơn.
  • Phiên bản migration DO: nâng cao, không phổ biến.

Queues: messaging bất đồng bộ

Queues flow: Worker producer gọi env.QUEUE.send gửi message, broker Cloudflare Queues hold message, batch tối đa 100 message một lần, deliver tới consumer Worker qua queue handler, track ack và retry. Message fail sau max_retries đi vào dead letter queue.

Khi nào dùng Queues

  • Job fire-and-forget: resize ảnh, gửi email, xây lại index — không cần user đợi.
  • Giới hạn tốc độ gửi ra: làm mượt traffic đột biến thành tốc độ đều.
  • Thử lại với backoff: gọi API ngoài hay fail, không muốn chặn user.
  • Fan-out processing: 1 event → N worker xử lý song song.
  • Xử lý dead letter: message fail → chuyển sang DLQ để debug.

Binding

{
  "queues": {
    "producers": [
      { "binding": "MY_QUEUE", "queue": "my-queue" }
    ],
    "consumers": [
      {
        "queue": "my-queue",
        "max_batch_size": 10,
        "max_batch_timeout": 5,
        "max_retries": 3,
        "dead_letter_queue": "my-queue-dlq"
      }
    ]
  }
}

Producer

async fetch(request, env) {
  const body = await request.json();

  // Gửi 1 message
  await env.MY_QUEUE.send({ type: "image-resize", url: body.url });

  // Batch send nhiều message
  await env.MY_QUEUE.sendBatch([
    { body: { type: "email", to: "a@x.com" } },
    { body: { type: "email", to: "b@x.com" } },
  ]);

  return Response.json({ queued: true });
}

send() là subrequest, ~10ms. Không đợi consumer xử lý.

Consumer

export default {
  async fetch(request, env) { /* ... */ },

  async queue(batch: MessageBatch<MyMsg>, env: Env, ctx: ExecutionContext) {
    for (const msg of batch.messages) {
      try {
        if (msg.body.type === "image-resize") {
          await resizeImage(env, msg.body.url);
        } else if (msg.body.type === "email") {
          await sendEmail(env, msg.body.to);
        }
        msg.ack();  // thành công, xóa khỏi queue
      } catch (err) {
        console.error(`Failed msg ${msg.id}:`, err);
        msg.retry({ delaySeconds: 60 });  // thử lại sau 60s
      }
    }
  }
};

queue handler nhận batch tối đa max_batch_size message hoặc đợi max_batch_timeout giây.

Thử lại và DLQ

  • Mặc định max_retries: 3. Sau 3 lần fail, message tự chuyển vào DLQ.
  • Cấu hình dead_letter_queue trong khối consumer.
  • DLQ là queue riêng, consumer khác đọc để debug/cảnh báo/phát lại.
// DLQ consumer riêng
async queue(batch, env) {
  for (const msg of batch.messages) {
    await env.DB.prepare("INSERT INTO failed_jobs (msg_id, body, retry_count) VALUES (?, ?, ?)")
      .bind(msg.id, JSON.stringify(msg.body), 3)
      .run();
    msg.ack();  // đã ghi xuống DB, ack để xóa khỏi DLQ
  }
}

Giới hạn

  • Batch tối đa: 100 message / batch.
  • Số lần thử lại tối đa: 100 (mặc định 3).
  • Message body tối đa: 128 KB (mặc định), lên 1 MB với cấu hình.
  • Delivery: at-least-once. Message có thể được gửi 2 lần → consumer phải idempotent.

At-least-once = consumer phải idempotent

// SAI: nếu thử lại, user nhận email 2 lần
async function handleMsg(msg) {
  await sendEmail(msg.body.email);
  msg.ack();
}

// ĐÚNG: kiểm tra đã xử lý chưa
async function handleMsg(msg, env) {
  const processed = await env.DB.prepare("SELECT 1 FROM processed WHERE msg_id = ?")
    .bind(msg.id).first();
  if (processed) { msg.ack(); return; }

  await sendEmail(msg.body.email);
  await env.DB.prepare("INSERT INTO processed (msg_id) VALUES (?)")
    .bind(msg.id).run();
  msg.ack();
}

Hoặc thiết kế tác dụng phụ idempotent (UPSERT, PUT thay INSERT, đặt cờ thay increment).


Durable Objects: stateful actor

Durable Object model: Worker client binding tới namespace, gọi idFromName tạo stable ID, ns.get trả stub, stub.fetch hoặc RPC call đi tới DO instance. DO instance chạy trong 1 isolate duy nhất, có in-memory state, storage SQL riêng, WebSocket hibernation khi idle.

Mô hình tư duy

  • Namespace: kiểu class Worker đăng ký với Cloudflare. Bind vào env.
  • ID: định danh cho 1 instance. Lấy qua idFromName(string) (hash ổn định) hoặc newUniqueId().
  • Instance: 1 isolate duy nhất trên toàn mạng Cloudflare cho ID đó. Single-writer, strong consistency.
  • Stub: handle client dùng để gọi instance (stub.fetch() hoặc RPC method).
  • Storage: SQL database riêng cho mỗi instance, có transaction.

Khi nào dùng DO

  • Counter, rate limiter: cần tăng nguyên tử.
  • WebSocket server: chat room, multiplayer game, live collab.
  • Lock / mutex: điều phối lên tài nguyên bên ngoài.
  • Session với state trong bộ nhớ: shopping cart, form wizard.
  • Transaction cross-key: D1 khó làm, DO storage dễ.

Define một DO

// src/durable-objects/counter.ts
export class Counter implements DurableObject {
  state: DurableObjectState;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    let count = (await this.state.storage.get<number>("count")) ?? 0;

    const url = new URL(request.url);
    if (url.pathname === "/increment") {
      count++;
      await this.state.storage.put("count", count);
    }

    return Response.json({ count });
  }
}

// src/index.ts
export { Counter } from "./durable-objects/counter";

export default {
  async fetch(request, env) {
    const id = env.COUNTER.idFromName("global");
    const stub = env.COUNTER.get(id);
    return stub.fetch(request);
  }
};

Binding

{
  "durable_objects": {
    "bindings": [
      { "name": "COUNTER", "class_name": "Counter" }
    ]
  },
  "migrations": [
    { "tag": "v1", "new_sqlite_classes": ["Counter"] }
  ]
}

new_sqlite_classes (mới) cho DO dùng SQLite storage (mặc định bây giờ). Cũ dùng new_classes với storage kiểu KV.

Storage API

// Kiểu KV
await this.state.storage.get("key");
await this.state.storage.put("key", value);
await this.state.storage.delete("key");
await this.state.storage.list({ prefix: "user:" });

// SQL (SQLite backed, strong consistency per-instance)
this.state.storage.sql.exec(
  "CREATE TABLE IF NOT EXISTS rooms (id TEXT PRIMARY KEY, users TEXT)"
);
const rows = this.state.storage.sql.exec(
  "SELECT * FROM rooms WHERE id = ?", roomId
).toArray();

// Transaction
await this.state.storage.transaction(async (txn) => {
  const current = await txn.get<number>("balance");
  await txn.put("balance", current + 100);
});

Đảm bảo single-writer

1 instance ID = 1 isolate chạy tại 1 thời điểm. Mọi request đến ID đó xếp hàng trong instance, xử lý tuần tự.

Hệ quả:

  • Nguyên tử: get + put trong cùng request là nguyên tử, không race.
  • Ghim vị trí: instance chạy ở PoP gần request đầu tiên, gắn ở đó. Request sau từ region xa = độ trễ cao hơn.
  • Nghẽn cổ chai: 1 ID là 1 actor đơn luồng. Throughput giới hạn.

Shard nếu throughput cao:

// Thay vì 1 counter global
env.COUNTER.idFromName("global");

// Shard thành 16 counter
const shard = userId.charCodeAt(0) % 16;
env.COUNTER.idFromName(`shard-${shard}`);
// Sum lại khi đọc

Hibernation cho WebSocket

WebSocket giữ kết nối lâu → cần DO chạy lâu → tốn tài nguyên. Hibernation API: idle → đẩy khỏi bộ nhớ, giữ storage, thức dậy khi có message.

export class ChatRoom implements DurableObject {
  async fetch(request: Request): Promise<Response> {
    if (request.headers.get("Upgrade") !== "websocket") {
      return new Response("Expected WebSocket", { status: 426 });
    }

    const [client, server] = Object.values(new WebSocketPair());

    // Chấp nhận với hibernation
    this.state.acceptWebSocket(server);

    return new Response(null, { status: 101, webSocket: client });
  }

  async webSocketMessage(ws: WebSocket, message: string) {
    // Phát tới tất cả kết nối
    for (const peer of this.state.getWebSockets()) {
      peer.send(message);
    }
  }

  async webSocketClose(ws: WebSocket, code: number, reason: string) {
    // Dọn dẹp
  }
}

Không dùng server.addEventListener("message", ...) truyền thống — đó là không hibernation. acceptWebSocket() + webSocketMessage handler mới cho hibernation.


6 pattern thực tế

① Rate limiter (DO)

export class RateLimiter implements DurableObject {
  state: DurableObjectState;

  constructor(state, env) {
    this.state = state;
  }

  async fetch(request: Request) {
    const now = Date.now();
    const windowMs = 60_000;

    let timestamps = (await this.state.storage.get<number[]>("requests")) ?? [];
    timestamps = timestamps.filter(t => now - t < windowMs);

    if (timestamps.length >= 60) {
      return Response.json({ allowed: false, reset: timestamps[0] + windowMs }, { status: 429 });
    }

    timestamps.push(now);
    await this.state.storage.put("requests", timestamps);

    return Response.json({ allowed: true, remaining: 60 - timestamps.length });
  }
}

// Client Worker
async fetch(request, env) {
  const ip = request.headers.get("CF-Connecting-IP") ?? "unknown";
  const id = env.RATE_LIMITER.idFromName(`ip:${ip}`);
  const res = await env.RATE_LIMITER.get(id).fetch(request.url);
  const { allowed } = await res.json();
  if (!allowed) return new Response("Rate limited", { status: 429 });
  // ... handle request
}

1 DO instance per IP. Tăng nguyên tử, không race.

② Job queue với thử lại (Queue)

// Producer
async fetch(request, env) {
  await env.JOBS.send({ type: "digest", userId: body.userId });
  return Response.json({ queued: true });
}

// Consumer
async queue(batch, env) {
  for (const msg of batch.messages) {
    try {
      if (msg.body.type === "digest") {
        await sendWeeklyDigest(env, msg.body.userId);
      }
      msg.ack();
    } catch (err) {
      msg.retry({ delaySeconds: Math.pow(2, msg.attempts) * 60 });
    }
  }
}

Backoff theo hàm mũ: 1m, 2m, 4m… Dead letter sau max_retries.

③ WebSocket chat room (DO)

export class ChatRoom implements DurableObject {
  state: DurableObjectState;

  async fetch(request: Request) {
    if (request.headers.get("Upgrade") === "websocket") {
      const [client, server] = Object.values(new WebSocketPair());
      this.state.acceptWebSocket(server);
      return new Response(null, { status: 101, webSocket: client });
    }
    return new Response("WebSocket only", { status: 426 });
  }

  async webSocketMessage(ws: WebSocket, message: string) {
    const peers = this.state.getWebSockets();
    for (const peer of peers) peer.send(message);

    // Persist last 50 messages
    this.state.storage.sql.exec(
      "INSERT INTO messages (ts, body) VALUES (?, ?)",
      Date.now(), message
    );
    this.state.storage.sql.exec(
      "DELETE FROM messages WHERE id NOT IN (SELECT id FROM messages ORDER BY ts DESC LIMIT 50)"
    );
  }
}

Client Worker route /chat/:roomIdenv.CHAT_ROOMS.idFromName(roomId).fetch().

④ Fan-out task nền (Queue)

// Producer
async scheduled(event, env) {
  const users = await env.DB.prepare("SELECT id FROM subscribers WHERE active = 1").all();
  const messages = users.results.map(u => ({ body: { userId: u.id } }));
  await env.DIGEST_QUEUE.sendBatch(messages);
}

// Consumer (parallel workers)
async queue(batch, env) {
  await Promise.all(batch.messages.map(async msg => {
    try {
      await sendDigestEmail(env, msg.body.userId);
      msg.ack();
    } catch (err) {
      msg.retry();
    }
  }));
}

10k subscriber → 10k message → consumer xử lý song song theo batch.

⑤ Counter với shard (DO)

const SHARDS = 32;

async function incrementViewCount(env: Env, slug: string) {
  const shard = hash(slug + Math.random()) % SHARDS;
  const id = env.COUNTER.idFromName(`view:${slug}:shard:${shard}`);
  await env.COUNTER.get(id).fetch("https://do/increment");
}

async function getViewCount(env: Env, slug: string) {
  let total = 0;
  await Promise.all([...Array(SHARDS)].map(async (_, i) => {
    const id = env.COUNTER.idFromName(`view:${slug}:shard:${i}`);
    const res = await env.COUNTER.get(id).fetch("https://do/read");
    const { count } = await res.json();
    total += count;
  }));
  return total;
}

Tại sao shard: 1 DO xử lý ~1k req/s. View count trang phổ biến > 1k/s = nghẽn cổ chai. Shard 32 → 32k req/s.

⑥ Lock mutex (DO)

export class Lock implements DurableObject {
  async fetch(request: Request) {
    const url = new URL(request.url);
    if (url.pathname === "/acquire") {
      const held = await this.state.storage.get<number>("heldUntil");
      const now = Date.now();
      if (held && held > now) {
        return Response.json({ acquired: false, waitMs: held - now });
      }
      await this.state.storage.put("heldUntil", now + 30_000);  // 30s TTL
      return Response.json({ acquired: true, ttl: 30 });
    }
    if (url.pathname === "/release") {
      await this.state.storage.delete("heldUntil");
      return Response.json({ released: true });
    }
  }
}

// Client
const id = env.LOCK.idFromName("publish-cron");
const res = await env.LOCK.get(id).fetch("https://do/acquire");
const { acquired } = await res.json();
if (!acquired) return; // Worker khác đang chạy, bỏ qua
try {
  await runExpensivePublish(env);
} finally {
  await env.LOCK.get(id).fetch("https://do/release");
}

Dùng khi nhiều Worker có thể chạy cùng một job (cron trigger trải rộng, phát lại webhook) — tránh chạy trùng.


Queues vs Durable Objects: chọn cái nào

Tình huốngPrimitive
Job nền fire-and-forgetQueue
Rate limiter per user/IPDO
Thử lại với backoff cho API ngoàiQueue
WebSocket chat / realtimeDO
Counter nguyên tửDO
Gửi email hàng loạtQueue
Session với state trong bộ nhớDO
Phát lại webhook khi downtimeQueue
Lock / mutexDO
Transaction cross-keyDO
Fanout theo lịch (cron → workers)Queue (producer trong scheduled)

Kết hợp

Pattern mạnh: DO điều phối + Queue task.

// DO nhận event real-time, validate, enqueue
export class OrderRoom implements DurableObject {
  async fetch(request: Request) {
    const order = await request.json();
    // Xác thực + lưu state
    await this.state.storage.put(`order:${order.id}`, order);
    // Đưa task bất đồng bộ vào queue
    await this.env.EMAIL_QUEUE.send({ type: "order-confirm", orderId: order.id });
    await this.env.SHIPPING_QUEUE.send({ type: "ship", orderId: order.id });
    return Response.json({ status: "queued" });
  }
}

DO = nguồn chân lý + điều phối; Queue = chạy task.


Gotcha

① Giới hạn kích thước message Queue

Mặc định 128 KB. Lớn hơn → lưu vào R2/D1, gửi pointer qua Queue:

// SAI: gửi image binary qua Queue
await env.QUEUE.send({ type: "resize", image: bigImage });  // overflow

// ĐÚNG
await env.BUCKET.put(`tmp/${id}`, bigImage);
await env.QUEUE.send({ type: "resize", bucketKey: `tmp/${id}` });

② Hết thời gian consumer

Mỗi batch handler có giới hạn wall time (mặc định 30s). Batch 100 message × 500ms mỗi cái = 50s → timeout.

Giảm max_batch_size hoặc tăng song song với Promise.all.

③ Khóa vị trí DO

Instance ghim ở PoP gần request đầu tiên. User ở VN tạo room → DO chạy ở SIN. User từ Mỹ join → 250ms RTT.

Sửa:

  • Shard theo region nếu trường hợp sử dụng cho phép.
  • Hoặc chấp nhận đánh đổi (real-time đa region tự nó khó).
  • Hoặc dùng DO locationHint khi tạo (tính năng beta).

④ Migration DO phá storage

// SAI: xóa class trong migration
"migrations": [
  { "tag": "v1", "new_classes": ["Counter"] },
  { "tag": "v2", "deleted_classes": ["Counter"] }  // XÓA HẾT DATA
]

deleted_classes xóa sạch storage. Chỉ dùng khi chắc chắn không còn dữ liệu cần.

⑤ At-least-once nhưng consumer throw → thử lại vô hạn?

Không. Cloudflare theo dõi số lần thử. Sau max_retries, tự chuyển DLQ.

Nhưng nếu consumer không ack() và không retry(), message sẽ bị tự thử lại khi batch timeout. Pattern an toàn: luôn explicit ack() hoặc retry().

⑥ Giới hạn kích thước storage DO

  • Storage kiểu KV: không giới hạn (chặn bằng chi phí).
  • SQLite storage (mới): 10 GB per DO.
  • Vượt → shard.

⑦ RPC method vs fetch

API cũ: stub.fetch(request). API mới: gọi RPC method trực tiếp.

// Old
await stub.fetch("https://do/increment", { method: "POST" });

// New (from worker 2024)
export class Counter extends DurableObject {
  async increment() {
    // ...
  }
}

// Client
await stub.increment();  // type-safe, no URL construction

RPC đơn giản hơn. Check Cloudflare docs cho compatibility date cần thiết.


Production checklist

Queue

  • Consumer idempotent (at-least-once delivery).
  • max_retries hợp lý (3-5), có DLQ.
  • Message body < 128 KB, dữ liệu lớn qua R2 pointer.
  • Kích thước batch + timeout cân nhắc wall time 30s.
  • Consumer DLQ cảnh báo/log, không âm thầm bỏ.

Durable Object

  • Chiến lược ID rõ ràng (idFromName ổn định hoặc newUniqueId).
  • Shard nếu throughput > 1k req/s.
  • WebSocket dùng hibernation API cho kết nối dài hạn.
  • Theo dõi kích thước storage, shard trước khi chạm 10 GB.
  • Migration không xóa class đang production.

Kết

Queues và DO là 2 primitive “stateful” duy nhất của Workers. Queue cho bất đồng bộ, DO cho điều phối đồng bộ. Biết dùng 2 cái này mở khóa phần lớn trường hợp sử dụng full-stack còn lại.

Pattern mạnh: DO làm điều phối + Queue làm chạy task. Một đơn hàng e-commerce, một app chat, một hệ thống thông báo đều có pattern này.

Block Storage (Parts 5-8) kết thúc. Block 3 (Frameworks) bắt đầu với Part 9: Router choice (Hono, Itty, vanilla).


Tham khảo