在本页
使用队列
Deno 运行时包含一个队列 API,支持卸载较大的工作负载以进行异步处理,并保证至少一次传递排队的消息。队列可用于卸载 Web 应用程序中的任务,或为将来某个时间安排工作单元。
您将在队列中使用的主要 API 位于 Deno.Kv
命名空间中,作为 enqueue
和 listenQueue
。
入队消息 跳转到标题
要将消息入队以供处理,请对 Deno.Kv
的实例使用 enqueue
方法。在下面的示例中,我们展示了将通知入队以供传递可能是什么样子。
// Describe the shape of your message object (optional)
interface Notification {
forUser: string;
body: string;
}
// Get a reference to a KV instance
const kv = await Deno.openKv();
// Create a notification object
const message: Notification = {
forUser: "alovelace",
body: "You've got mail!",
};
// Enqueue the message for immediate delivery
await kv.enqueue(message);
您可以通过在毫秒数中指定 delay
选项来将消息入队以供稍后传递。
// Enqueue the message for delivery in 3 days
const delay = 1000 * 60 * 60 * 24 * 3;
await kv.enqueue(message, { delay });
您也可以在 Deno KV 中指定一个键,如果您的消息由于任何原因未传递,您的消息值将存储在该键中。
// Configure a key where a failed message would be sent
const backupKey = ["failed_notifications", "alovelace", Date.now()];
await kv.enqueue(message, { keysIfUndelivered: [backupKey] });
// ... disaster strikes ...
// Get the unsent message
const r = await kv.get<Notification>(backupKey);
// This is the message that didn't get sent:
console.log("Found failed notification for:", r.value?.forUser);
监听消息 跳转到标题
您可以使用 Deno.Kv
的实例上的 listenQueue
方法配置一个将处理添加到您的队列中的项目的 JavaScript 函数。
// Define the shape of the object we expect as a message in the queue
interface Notification {
forUser: string;
body: string;
}
// Create a type guard to check the type of the incoming message
function isNotification(o: unknown): o is Notification {
return (
((o as Notification)?.forUser !== undefined &&
typeof (o as Notification).forUser === "string") &&
((o as Notification)?.body !== undefined &&
typeof (o as Notification).body === "string")
);
}
// Get a reference to a KV database
const kv = await Deno.openKv();
// Register a handler function to listen for values - this example shows
// how you might send a notification
kv.listenQueue((msg: unknown) => {
// Use type guard - then TypeScript compiler knows msg is a Notification
if (isNotification(msg)) {
console.log("Sending notification to user:", msg.forUser);
// ... do something to actually send the notification!
} else {
// If the message is of an unknown type, it might be an error
console.error("Unknown message received:", msg);
}
});
带 KV 原子事务的队列 API 跳转到标题
您可以将队列 API 与 KV 原子事务 结合使用,以原子方式将消息入队并在同一事务中修改键。
const kv = await Deno.openKv();
kv.listenQueue(async (msg: unknown) => {
const nonce = await kv.get(["nonces", msg.nonce]);
if (nonce.value === null) {
// This messaged was already processed
return;
}
const change = msg.change;
const bob = await kv.get(["balance", "bob"]);
const liz = await kv.get(["balance", "liz"]);
const success = await kv.atomic()
// Ensure this message was not yet processed
.check({ key: nonce.key, versionstamp: nonce.versionstamp })
.delete(nonce.key)
.sum(["processed_count"], 1n)
.check(bob, liz) // balances did not change
.set(["balance", "bob"], bob.value - change)
.set(["balance", "liz"], liz.value + change)
.commit();
});
// Modify keys and enqueue messages in the same KV transaction!
const nonce = crypto.randomUUID();
await kv
.atomic()
.check({ key: ["nonces", nonce], versionstamp: null })
.enqueue({ nonce: nonce, change: 10 })
.set(["nonces", nonce], true)
.sum(["enqueued_count"], 1n)
.commit();
队列行为 跳转到标题
消息传递保证 跳转到标题
运行时保证至少一次传递。这意味着对于大多数入队的消息,listenQueue
处理程序将为每条消息调用一次。在某些故障情况下,处理程序可能会多次调用同一消息以确保传递。重要的是要设计您的应用程序,以便正确处理重复消息。
您可以将队列与 KV 原子事务 原语结合使用,以确保您的队列处理程序 KV 更新对每条消息只执行一次。请参阅 带 KV 原子事务的队列 API。
自动重试 跳转到标题
listenQueue
处理程序在消息准备好传递时被调用以处理您的排队消息。如果您的处理程序抛出异常,运行时将自动重试调用处理程序,直到它成功或达到最大重试次数。一旦 listenQueue
处理程序调用成功完成,消息将被视为已成功处理。如果处理程序在重试时始终失败,消息将被丢弃。
消息传递顺序 跳转到标题
运行时尽最大努力按入队顺序传递消息。但是,没有严格的顺序保证。有时,消息可能会以乱序传递,以确保最大吞吐量。
Deno Deploy 上的队列 跳转到标题
Deno Deploy 提供了队列 API 的全局、无服务器、分布式实现,专为高可用性和吞吐量而设计。您可以使用它来构建可以扩展以处理大型工作负载的应用程序。
即时隔离启动 跳转到标题
在 Deno Deploy 中使用队列时,隔离区会根据需要自动启动,以在消息准备好处理时调用您的 listenQueue
处理程序。定义 listenQueue
处理程序是启用 Deno Deploy 应用程序中队列处理的唯一要求,无需任何其他配置。
队列大小限制 跳转到标题
未传递的队列消息的最大数量限制为 100,000。如果队列已满,enqueue
方法将失败并出现错误。
定价详细信息和限制 跳转到标题
enqueue
与其他Deno.Kv
写操作一样对待。入队的消息会消耗 KV 存储和写入单元。- 通过
listenQueue
传递的消息会消耗请求和 KV 写入单元。 - 有关更多信息,请参阅 定价详细信息。
用例 跳转到标题
队列在许多不同的场景中都很有用,但在构建 Web 应用程序时,您可能会看到一些用例。
卸载异步进程 跳转到标题
有时,由客户端发起的任务(例如发送通知或 API 请求)可能需要很长时间才能完成,您不希望让客户端等待该任务完成才能返回响应。在其他情况下,客户端实际上并不需要响应,例如当客户端向您的应用程序发送 webhook 请求 时,因此无需等待底层任务完成即可返回响应。
在这些情况下,您可以将工作卸载到队列中,以保持您的 Web 应用程序响应并立即向客户端发送反馈。要查看此用例的实际示例,请查看我们的 webhook 处理示例。
将工作安排到将来 跳转到标题
队列(以及像这样的队列 API)的另一个有用应用是将工作安排到将来适当的时间。也许您想在客户下单后的一天向他们发送通知,以发送满意度调查。您可以安排一个队列消息在 24 小时后传递,并设置一个监听器在那个时候发送通知。
要查看将通知安排到将来发送的示例,请查看我们的 通知示例。