跳转到主要内容
在本页上

使用队列

Deno 运行时包含一个队列 API,支持将较大的工作负载卸载以进行异步处理,并保证至少一次传递队列消息。队列可用于卸载 Web 应用程序中的任务,或安排未来某个时间的工作单元。

您将使用的主要队列 API 位于 Deno.Kv 命名空间中,分别是 enqueuelistenQueue

入队消息 跳转到标题

要将消息加入队列以进行处理,请使用 Deno.Kv 实例上的 enqueue 方法。在下面的示例中,我们展示了将通知加入队列以进行传递的情况。

queue_example.ts
// Describe the shape of your message object (optional)
interface Notification {
  forUser: string;
  body: string;
}

// Get a reference to a KV instance
const kv = await Deno.openKv();

// Create a notification object
const message: Notification = {
  forUser: "alovelace",
  body: "You've got mail!",
};

// Enqueue the message for immediate delivery
await kv.enqueue(message);

您可以通过以毫秒为单位指定 delay 选项来将消息加入队列以延迟传递。

// Enqueue the message for delivery in 3 days
const delay = 1000 * 60 * 60 * 24 * 3;
await kv.enqueue(message, { delay });

您还可以指定 Deno KV 中的一个键,如果您的消息由于任何原因未传递,您的消息值将存储在该键中。

// Configure a key where a failed message would be sent
const backupKey = ["failed_notifications", "alovelace", Date.now()];
await kv.enqueue(message, { keysIfUndelivered: [backupKey] });

// ... disaster strikes ...

// Get the unsent message
const r = await kv.get<Notification>(backupKey);
// This is the message that didn't get sent:
console.log("Found failed notification for:", r.value?.forUser);

监听消息 跳转到标题

您可以使用 Deno.Kv 实例上的 listenQueue 方法配置一个 JavaScript 函数来处理添加到队列中的项目。

listen_example.ts
// Define the shape of the object we expect as a message in the queue
interface Notification {
  forUser: string;
  body: string;
}

// Create a type guard to check the type of the incoming message
function isNotification(o: unknown): o is Notification {
  return (
    ((o as Notification)?.forUser !== undefined &&
      typeof (o as Notification).forUser === "string") &&
    ((o as Notification)?.body !== undefined &&
      typeof (o as Notification).body === "string")
  );
}

// Get a reference to a KV database
const kv = await Deno.openKv();

// Register a handler function to listen for values - this example shows
// how you might send a notification
kv.listenQueue((msg: unknown) => {
  // Use type guard - then TypeScript compiler knows msg is a Notification
  if (isNotification(msg)) {
    console.log("Sending notification to user:", msg.forUser);
    // ... do something to actually send the notification!
  } else {
    // If the message is of an unknown type, it might be an error
    console.error("Unknown message received:", msg);
  }
});

使用 KV 原子事务的队列 API 跳转到标题

您可以将队列 API 与 KV 原子事务 结合使用,以在同一事务中原子地将消息加入队列并修改键。

kv_transaction_example.ts
const kv = await Deno.openKv();

kv.listenQueue(async (msg: unknown) => {
  const nonce = await kv.get(["nonces", msg.nonce]);
  if (nonce.value === null) {
    // This messaged was already processed
    return;
  }

  const change = msg.change;
  const bob = await kv.get(["balance", "bob"]);
  const liz = await kv.get(["balance", "liz"]);

  const success = await kv.atomic()
    // Ensure this message was not yet processed
    .check({ key: nonce.key, versionstamp: nonce.versionstamp })
    .delete(nonce.key)
    .sum(["processed_count"], 1n)
    .check(bob, liz) // balances did not change
    .set(["balance", "bob"], bob.value - change)
    .set(["balance", "liz"], liz.value + change)
    .commit();
});

// Modify keys and enqueue messages in the same KV transaction!
const nonce = crypto.randomUUID();
await kv
  .atomic()
  .check({ key: ["nonces", nonce], versionstamp: null })
  .enqueue({ nonce: nonce, change: 10 })
  .set(["nonces", nonce], true)
  .sum(["enqueued_count"], 1n)
  .commit();

队列行为 跳转到标题

消息传递保证 跳转到标题

运行时保证至少一次传递。这意味着对于大多数已入队的消息,listenQueue 处理程序将为每条消息调用一次。在某些故障情况下,处理程序可能会为同一条消息调用多次以确保传递。设计应用程序时,务必正确处理重复消息,这一点至关重要。

您可以将队列与 KV 原子事务 原语结合使用,以确保您的队列处理程序 KV 更新每条消息仅执行一次。请参阅使用 KV 原子事务的队列 API

自动重试 跳转到标题

当排队的消息准备好传递时,将调用 listenQueue 处理程序来处理您的消息。如果您的处理程序抛出异常,运行时将自动重试调用该处理程序,直到成功或达到最大重试次数。一旦 listenQueue 处理程序调用成功完成,则认为消息已成功处理。如果处理程序在重试时持续失败,则该消息将被丢弃。

消息传递顺序 跳转到标题

运行时会尽最大努力按照消息入队的顺序传递消息。但是,没有严格的顺序保证。有时,消息可能会无序传递以确保最大吞吐量。

Deno Deploy 上的队列 跳转到标题

Deno Deploy 提供了队列 API 的全局、无服务器、分布式实现,旨在实现高可用性和高吞吐量。您可以使用它来构建可扩展以处理大型工作负载的应用程序。

即时隔离启动 跳转到标题

将队列与 Deno Deploy 一起使用时,当消息可供处理时,将按需自动启动隔离区以调用您的 listenQueue 处理程序。定义 listenQueue 处理程序是在 Deno Deploy 应用程序中启用队列处理的唯一要求,无需其他配置。

队列大小限制 跳转到标题

未传递的队列消息的最大数量限制为 100,000 条。如果队列已满,enqueue 方法将失败并显示错误。

定价详情和限制 跳转到标题

  • enqueue 与其他 Deno.Kv 写操作一样。已入队的消息会消耗 KV 存储和写入单元。
  • 通过 listenQueue 传递的消息会消耗请求和 KV 写入单元。
  • 更多信息,请参阅定价详情

用例 跳转到标题

队列在许多不同的场景中都很有用,但在构建 Web 应用程序时,您可能会经常看到以下几个用例。

卸载异步进程 跳转到标题

有时,由客户端启动的任务(例如发送通知或 API 请求)可能需要很长时间,以至于您不希望客户端在返回响应之前等待该任务完成。其他时候,客户端实际上根本不需要响应,例如当客户端向您的应用程序发送Webhook 请求时,因此无需等待底层任务完成再返回响应。

在这些情况下,您可以将工作卸载到队列中,以保持 Web 应用程序的响应速度并立即向客户端发送反馈。要查看此用例的实际示例,请查看我们的Webhook 处理示例

安排未来工作 跳转到标题

队列(以及类似这样的队列 API)的另一个有用应用是安排工作在未来适当的时间进行。例如,您可能希望在新客户下单一天后向他们发送通知,以发送满意度调查。您可以安排一条队列消息在 24 小时后传递,并设置一个侦听器在该时间发送通知。

要查看安排未来通知的示例,请查看我们的通知示例