Computer >> Hướng Dẫn Máy Tính >  >> Lập Trình >> Redis

Xây dựng hàng đợi tin nhắn tùy chỉnh với Redis Lists &TypeScript

Bạn đã bao giờ thử tạo Hàng đợi Tin nhắn của riêng mình nhưng gặp phải thử thách chưa? Nếu vậy, bạn không đơn độc. Trong hướng dẫn này, chúng ta sẽ xây dựng hàng đợi tin nhắn từ đầu bằng cách sử dụng danh sách Redis. Mặc dù có một số phương pháp để tạo hàng đợi tin nhắn với Redis, chẳng hạn như luồng, danh sách và pub/subs, nhưng chúng ta sẽ tập trung vào cách tiếp cận đơn giản và dễ hiểu nhất:danh sách. Hãy tham gia cùng tôi khi chúng ta đi sâu vào hướng dẫn thực tế này.

Những gì chúng tôi sẽ sử dụng

  • Nâng cấp
  • Đôi bàn tay

Những gì bạn cần

  • Bánh bao
  • Đôi bàn tay

Thiết lập Upstash Redis

Đầu tiên, hãy thiết lập một phiên bản Redis. Để thực hiện việc này, chỉ cần truy cập Upstash và nhấp vào Tạo cơ sở dữ liệu .Sau đó, cuộn xuống để tìm chuỗi kết nối của bạn, đây là chuỗi chúng tôi sẽ sử dụng để kết nối ứng dụng khách của mình. Tôi sẽ không đi sâu vào chi tiết ở đây nhưng về cơ bản đó là những gì bạn cần để bắt đầu.

Ví dụ chuỗi kết nối:

redis://XXXXe@social-XXX-39281.upstash.io:39281

Khởi động dự án

Hãy bắt đầu dự án TypeScript của chúng ta bằng Bun. Sự lựa chọn không chỉ vì nó nhanh hơn Node—mà còn dễ cài đặt hơn nhiều. Và vâng, nó cũng nhanh một cách ấn tượng! 🚀

mkdir upstash-mq
cd upstash-mq
 
bun init
> package name (upstash-mq-tutorial): upstash-mq
> entry point (index.ts):
> Done!
 
bun add ioredis

Cấu trúc dự án

 ┣ 📂src
 ┃ ┣ 📂lua-scripts
 ┃ ┃ ┣ 📜add-job.lua
 ┃ ┃ ┗ 📜remove-job.lua
 ┃ ┣ 📜index.ts
 ┃ ┣ 📜job.ts
 ┃ ┣ 📜queue.ts
 ┃ ┗ 📜utils.ts
 ┣ 📜.env
 ┣ 📜.gitignore
 ┣ 📜README.md
 ┣ 📜bun.lockb
 ┣ 📜index.ts
 ┣ 📜package.json
 ┗ 📜tsconfig.json

Trình bày trực quan về Queue của chúng tôi sẽ như thế này:

Xây dựng hàng đợi tin nhắn tùy chỉnh với Redis Lists &TypeScript

Công việc

Lớp Job của chúng ta cần một số thứ quan trọng. Trước tiên, chúng ta cần theo dõi trạng thái của từng công việc. Điều này giúp chúng ta quyết định nên xử lý chúng, thử lại hay chuyển chúng đi nơi khác nếu chúng đã hoàn thành. Mỗi công việc cũng có một ID và một số dữ liệu cần phải chung chung để chúng ta có thể cung cấp trải nghiệm tuyệt vời cho người dùng. Cuối cùng, chúng ta cần liên kết từng công việc với hàng đợi của nó và bao gồm tên hàng đợi để dễ quản lý.

Đây là cốt lõi của Job của chúng tôi lớp:

type OwnerQueue = {
 redis: Redis;
 queueName: string;
};
export type JobStatuses =
 | "created"
 | "waiting"
 | "active"
 | "succeeded"
 | "failed";
 
export class Job<T> {
 id: string;
 status: JobStatuses;
 config: OwnerQueue;
 data: T;
 
 constructor(ownerConfig: OwnerQueue, data: T, jobId = randomUUID()) {
 this.id = jobId;
 this.status = "created";
 this.data = data;
 this.config = ownerConfig;
 }
}

Để tạo data chung chung, trước tiên chúng ta cần tạo Job bản thân nó mang tính chung chung. Phần còn lại được thực hiện một cách đơn giản. Chúng ta có thể tạo một phiên bản Redis riêng cho mỗi Job nhưng việc quản lý phần tử này sẽ phức tạp.

May mắn thay, cách tiếp cận của chúng tôi cho phép dễ dàng cấu hình phiên bản Redis trong hàng đợi và chúng tôi có thể chỉ cần chuyển phiên bản này nếu cần. Nguyên tắc tương tự cũng áp dụng cho queueName . Vì chúng ta thường sử dụng nó để lưu công việc vào hàng đợi nên công việc của chúng ta phải nhận biết được hàng đợi mẹ của chúng. Để có thể lưu công việc vào hàng đợi, chúng ta cần hai thứ:tập lệnh Lua để tương tác với Redis và một số tiện ích.

Trước tiên hãy tạo các tiện ích của chúng tôi:

import { JobStatuses } from "./job";
 
const MQ_PREFIX = "UpstashMQ";
 
export const formatMessageQueueKey = (queueName: string, key: string) => {
 return `${MQ_PREFIX}:${queueName}:${key}`;
};
 
export const convertToJSONString = <T>(
 data: T,
 status: JobStatuses,
): string => {
 return JSON.stringify({
 data,
 status,
 });
};

Vì việc tạo tên hàng đợi theo cách thủ công mỗi lần sử dụng Redis không lý tưởng nên chúng tôi đã tạo một tiện ích có tên formatMessageQueueKey .Tiện ích này chỉ đơn giản là nối các chuỗi lại với nhau. Ngoài ra, chúng tôi cần tuần tự hóa dữ liệu của mình để lưu trữ trong Redis - chúng tôi không thể chuyển các đối tượng JS làm nguồn dữ liệu; trước tiên chúng cần được chuyển đổi thành chuỗi. Do dữ liệu đó là dữ liệu chung nên chúng tôi đã triển khai một hàm chung, convertToJSONString , vì mục đích này.

Bây giờ, hãy thêm tập lệnh lua đầu tiên của chúng ta:

add-job.lua

--[[
key 1 -> [prefix]:name:jobs
key 2 -> [prefix]:name:waiting
arg 1 -> job id
arg 2 -> job data
]]
 
 
local jobId = ARGV[1]
local payload = ARGV[2]
 
if redis.call("hexists", KEYS[1], jobId) == 1 then return nil end
redis.call("hset", KEYS[1], jobId, payload)
redis.call("lpush", KEYS[2], jobId)
 
return jobId

Chúng ta có thể thực hiện các lệnh gọi này với phiên bản Redis riêng lẻ như sau:

  • redis.hexists(jobId)
  • redis.hset(jobId,payload)
  • redis.lpush(jobId,payload)

Tuy nhiên, cách tiếp cận này sẽ dẫn đến ba cuộc gọi riêng biệt. Để giảm thiểu các chuyến đi khứ hồi tới máy chủ Redis, chúng tôi mong muốn hợp nhất toàn bộ quy trình thành một lệnh gọi duy nhất.

Hãy thêm save() của chúng tôi phương pháp

 private createQueueKey(key: string) {
 return formatMessageQueueKey(this.config.queueName, key);
 }
 
 async save(): Promise<string | null> {
 const addJobToQueueScript = await Bun.file("./src/lua-scripts/add-job.lua").text();
 const resJobId = (await this.config.redis.eval(
 addJobToQueueScript,
 2,
 this.createQueueKey("jobs"),
 this.createQueueKey("waiting"),
 this.id,
 convertToJSONString(this.data, this.status)
 )) as string | null;
 
 if (resJobId) {
 this.id = resJobId;
 return resJobId;
 }
 return null;
 }

Mã này rất đơn giản, nhưng hãy để tôi làm rõ hơn. Sau khi tạo tập lệnh Lua, chúng tôi gọi nó bằng redis.eval , cần thiết để thực thi tập lệnh Lua. Các tham số cho redis.eval như sau:

  • Tham số đầu tiên yêu cầu tập lệnh.
  • Tham số thứ hai chỉ định số lượng đối số.
  • Tham số thứ ba và thứ tư dành cho khóa.
  • Cuối cùng, chúng ta chuyển các đối số thực tế.

Trước khi chúng ta hoàn thành Job lớp hãy thêm một vài phương thức nữa cho tương lai.

fromId = async <T>(jobId: string): Promise<Job<T> | null> => {
 const jobData = await this.config.redis.hget(this.createQueueKey("jobs"), jobId);
 if (jobData) {
 return this.fromData<T>(jobId, jobData);
 }
 return null;
 };
 
private fromData = <T>(jobId: string, stringifiedJobData: string): Job<T> => {
 const parsedData = JSON.parse(stringifiedJobData) as Job<T>;
 const job = new Job<T>(this.config, parsedData.data, jobId);
 job.status = parsedData.status;
 return job;
};

Hiện tại, chúng ta có thể không cần những chức năng này nhưng chúng sẽ trở nên quan trọng khi chúng ta bắt đầu xử lý công việc trong tương lai. Khi đó, chúng ta sẽ chỉ có ID công việc (jobId) và chúng ta sẽ cần một phương pháp để xây dựng lại Công việc của mình từ đầu. Đây chính xác là những gì fromId hoàn thành. Nó lấy dữ liệu công việc từ Redis, chuyển đổi nó thành một phiên bản Công việc và trả về dữ liệu đó, để sau này hàng đợi có thể xử lý công việc này.

Chuyển sang hàng đợi

Đã hoàn thành save() Hãy cùng đi sâu vào chi tiết về Hàng đợi. Đây là mục tiêu của chúng tôi:

  • Chúng tôi mong muốn hàng đợi của mình có thể giữ lại hoặc xóa dữ liệu khi thành công hay thất bại vì có thể có trường hợp cần xử lý lại sau này.
  • Chúng tôi dự định thiết kế hàng đợi chạy đồng thời, cho phép nhiều công việc chạy đồng thời.
  • Mục tiêu của chúng tôi là cho phép truyền hàm gọi lại để xử lý dữ liệu. Hàm này sẽ suy ra loại Công việc để mang lại trải nghiệm tốt hơn cho nhà phát triển.
  • Chúng tôi dự định kích hoạt tính năng gọi điện Job.save() từ trong Hàng đợi, cho phép chúng tôi chuyển phiên bản Redis và queueName .
  • Cuối cùng, chúng tôi muốn đảm bảo khả năng hủy hàng đợi nếu cần thiết và xóa Công việc khỏi hàng đợi.

Hãy bắt đầu với việc xác định Hàng đợi của chúng ta

export type QueueConfig = {
 redis: Redis;
 queueName: string;
 keepOnSuccess?: boolean;
 keepOnFailure?: boolean;
};
 
export class Queue extends EventEmitter {
 config: QueueConfig;
 concurrency = 0;
 worker: any;
 running = 0;
 queued = 0;
 
 constructor(config: QueueConfig) {
 super();
 this.config = {
 redis: config.redis,
 queueName: config.queueName,
 keepOnFailure: config.keepOnFailure ?? true,
 keepOnSuccess: config.keepOnSuccess ?? true,
 };
 }
 
 createQueueKey(key: string) {
 return formatMessageQueueKey(this.config.queueName, key);
 }
}

Lớp Hàng đợi bao gồm một cấu hình để chấp nhận thông tin bên ngoài, chẳng hạn như tên hàng đợi, phiên bản Redis và cài đặt để giữ lại hoặc xóa dữ liệu. Chúng tôi hoan nghênh mọi triển khai Redis mà người dùng ưa thích, mặc dù chúng tôi có ưu tiên đặc biệt cho Upstash 😌. Tính linh hoạt này cho phép người dùng tích hợp hàng đợi vào hệ thống hiện có của họ một cách dễ dàng.

Và lớp của chúng tôi mở rộng Trình phát sự kiện để thông báo cho họ khi có điều gì đó xảy ra.

Dưới đây là ví dụ khởi tạo:

const queue = new Queue({
 redis: new Redis(process.env.UPSTASH_REDIS_URL),
 queueName: "upstash-rocks",
 keepOnFailure: true,
 keepOnSuccess: true,
});

Thêm

async add<T>(payload: T) {
 return new Job<T>(this.config, payload).save();
 }

Công việc của chúng tôi lấy chi tiết cấu hình của hàng đợi gốc và tải trọng, đây là dữ liệu sẽ được lưu trữ trong Redis. Sau đó chúng tôi chỉ cần lưu nó.

Bây giờ chúng ta có thể làm điều đó:

const queue = new Queue({
 redis: new Redis(process.env.UPSTASH_REDIS_URL!),
 queueName: "mytest-queue",
 keepOnFailure: true,
 keepOnSuccess: true,
});
 
const payload = {
 upstash: "best-redis-ever",
};
 
await queue.add(payload);

Bây giờ, chúng ta cần một cách để xử lý chúng.

Đang xử lý

Đây là phần thử thách nhất trong quá trình triển khai hàng đợi của chúng tôi. Chúng tôi yêu cầu người dùng chỉ định số lượng quy trình đồng thời và cung cấp một nhân viên – chức năng gọi lại để xử lý các công việc sẽ suy ra loại công việc. Ngoài ra, chúng tôi cần một cơ chế để theo dõi số lượng công việc hiện đang chạy và xếp hàng đợi, cho phép chúng tôi chọn công việc tiếp theo từ hàng đợi một cách an toàn.

 async process<TJobPayload>(
 worker: (job: TJobPayload) => void,
 concurrency: number
 ): Promise<void> {
 this.concurrency = concurrency;
 this.worker = worker;
 this.running = 0;
 this.queued = 1;
 
 this.jobTick();
 }

Mục đích chính của việc chấp nhận TJobPayload chung là để nâng cao trải nghiệm của nhà phát triển cho người dùng của chúng tôi. Chúng tôi mong muốn cho phép họ hưởng lợi từ intellisense khi sử dụng hàng đợi của chúng tôi. Người dùng biết rằng họ đã lưu trữ dữ liệu như {hello: "world"} trong Công việc, nhưng TypeScript cần hỗ trợ để cung cấp intellisense chính xác. Đây là lý do tại sao chúng tôi áp dụng cơ chế này để thông báo cho TypeScript và buộc nó suy luận để mang lại trải nghiệm tốt hơn cho nhà phát triển.

Trước khi chuyển sang jobTick() , hãy xem xét kỹ quá trình:

  • Vì hàng đợi của chúng tôi hoạt động trên cơ sở FIFO (Vào trước ra trước), chúng tôi cần bắt đầu bằng cách hiển thị một công việc từ bên phải hàng đợi.
  • Tiếp theo, chúng ta chạy hàm worker cho công việc này.
  • Sau khi công việc hoàn thành, chúng tôi sẽ gửi kết quả cho người dùng của mình.
  • Cuối cùng, chúng tôi gọi jobTick() một lần nữa để xử lý công việc tiếp theo.

Vì vậy, jobTick() sẽ bao gồm ba phần quan trọng này."

private jobTick() {
 this.getNextJob()
 .then(async (jobId) => {
 this.running += 1;
 this.queued -= 1;
 if (this.running + this.queued < this.concurrency) {
 this.queued += 1;
 setImmediate(this.jobTick);
 }
 
 if (!jobId) {
 return;
 }
 
 const jobCreatedById = await new Job(this.config, null).fromId(jobId);
 if (jobCreatedById) {
 await this.executeJob(jobCreatedById);
 } else {
 console.error(`Job not found with ID: ${jobId}`);
 }
 })
 .catch((error) => {
 console.error("Error in jobTick:", error);
 })
 .finally(() => {
 setImmediate(() => this.jobTick());
 });
 }

Tôi sẽ cố gắng giải thích từng chức năng. Hãy bắt đầu với getNextJob()

 private async getNextJob() {
 try {
 const jobId = await this.config.redis.brpoplpush(
 this.createQueueKey("waiting"),
 this.createQueueKey("active"),
 0
 );
 return jobId;
 } catch (error) {
 console.error("Error fetching the next job:", error);
 throw error;
 }
 }
 

Chúng tôi chỉ đơn giản thực hiện lệnh gọi tới Redis nhưng với cách tiếp cận chiến lược:chúng tôi sử dụng lệnh gọi chặn kết hợp với lpush để giảm thiểu số chuyến đi khứ hồi. Việc sử dụng chặn cuộc gọi là có chủ ý; chúng tôi muốn ngăn chặn những công nhân khác xử lý cùng một công việc cùng một lúc, do đó tránh được tình trạng cạnh tranh. Ngoài ra, chúng tôi chuyển công việc từ trạng thái 'chờ' sang 'đang hoạt động', chuẩn bị hiệu quả cho các bước tiếp theo trong quy trình.

this.getNextJob().then(async (jobId) => {
 this.running += 1;
 this.queued -= 1;
 if (this.running + this.queued < this.concurrency) {
 this.queued += 1;
 setImmediate(this.jobTick);
 }
 
 if (!jobId) {
 return;
 }
 
 const jobCreatedById = await new Job(this.config, null).fromId(jobId);
 if (jobCreatedById) {
 await this.executeJob(jobCreatedById);
 } else {
 console.error(`Job not found with ID: ${jobId}`);
 }
});

Bây giờ chúng ta có jobId , chúng tôi tăng số lượng công việc đang chạy lên một và giảm số lượng công việc được xếp hàng đợi xuống một. Chúng tôi cũng cố gắng bắt đầu nhiều công việc mới nhất có thể trong khi vẫn tôn trọng giới hạn đồng thời:

if (this.running + this.queued < this.concurrency) {
 this.queued += 1;
 setImmediate(this.jobTick);
}

Nếu mọi việc suôn sẻ, chúng tôi tiến hành xây dựng Job của mình sử dụng fromId .Sau khi xây dựng lại thành công công việc của mình bằng ID của nó, chúng tôi sẽ chuyển sang thực thi công việc bằng hàm worker của mình.

Hãy chuyển sang executeJob

private async executeJob<TJobPayload>(jobCreatedById: Job<TJobPayload>) {
 let hasError = false;
 try {
 await this.worker(jobCreatedById.data);
 this.running -= 1;
 this.queued += 1;
 } catch (error) {
 hasError = true;
 } finally {
 const [jobStatus, job] = await this.finishJob<TJobPayload>(jobCreatedById, hasError);
 this.emit(jobStatus, job.id);
 return;
 }
 }

Bây giờ chúng ta đã có dữ liệu của Công việc, chúng ta chuyển dữ liệu này tới worker của mình . Nếu nó thực thi thành công, chúng tôi sẽ tăng số lượng công việc được xếp hàng đợi lên một và giảm số lượng đang chạy đi một. Bước này rất quan trọng; nếu không được xử lý đúng cách, nó có thể ảnh hưởng đến khả năng chúng tôi triển khai Công việc mới cùng lúc. Trong trường hợp xảy ra lỗi trong quá trình thực thi của worker, chúng ta chỉ cần chuyển đổi hasError cờ. Cuối cùng, chúng tôi gọi finishJob với jobCreatedById của chúng tôi và hasError gắn cờ và phát ra trạng thái với jobId .

Lưu ý bên lề:Người dùng hiện có thể nghe các bản cập nhật được phát ra như thế này.

queue.on("succeeded", (jobId) => console.log("Succeeded jobId", jobId));

Hãy chuyển sang finishJob

private async finishJob<TJobPayload>(
 job: Job<TJobPayload>,
 hasFailed?: boolean
 ): Promise<[JobStatuses, Job<TJobPayload>]> {
 const multi = this.config.redis.multi();
 
 multi.lrem(this.createQueueKey("active"), 0, job.id);
 
 if (hasFailed) {
 if (this.config.keepOnFailure) {
 multi.hset(this.createQueueKey("jobs"), job.id, convertToJSONString(job.data, job.status));
 multi.sadd(this.createQueueKey("failed"), job.id);
 } else {
 multi.hdel(this.createQueueKey("jobs"), job.id);
 }
 job.status = "failed";
 } else {
 if (this.config.keepOnSuccess) {
 multi.hset(this.createQueueKey("jobs"), job.id, convertToJSONString(job.data, job.status));
 multi.sadd(this.createQueueKey("succeeded"), job.id);
 } else {
 multi.hdel(this.createQueueKey("jobs"), job.id);
 }
 job.status = "succeeded";
 }
 
 await multi.exec();
 return [job.status, job];
 }

Một khía cạnh quan trọng ở đây là việc sử dụng multi() vì mục tiêu của chúng tôi là luôn giảm thiểu các chuyến đi khứ hồi. Bằng cách sử dụng multi , Redis trì hoãn việc thực thi cho đến khi chúng ta gọi exec() .Nếu người dùng đã đặt keepOnFailurekeepOnSuccess để bảo toàn dữ liệu, chúng tôi sẽ tạo hai bộ:một bộ có dữ liệu của công việc và bộ kia có danh sách ID công việc để truy cập dữ liệu này. Cách tiếp cận này áp dụng cho cả kịch bản thành công và thất bại. Đương nhiên, chúng ta điều chỉnh trạng thái công việc cho phù hợp. Cuối cùng, chúng ta thực thi lệnh multi bằng exec và trả về trạng thái của công việc cũng như chính công việc đó cho mục đích phát tán sự kiện.

Cuối cùng, chúng ta có hai phương pháp còn lại mà tôi sẽ không trình bày chi tiết vì chúng sử dụng các khái niệm mà chúng ta đã quen thuộc.

 async removeJob(jobId: string) {
 const addJobToQueueScript = await Bun.file("./src/lua-scripts/remove-job.lua").text();
 return await this.config.redis.eval(
 addJobToQueueScript,
 5,
 this.createQueueKey("succeeded"),
 this.createQueueKey("failed"),
 this.createQueueKey("waiting"),
 this.createQueueKey("active"),
 this.createQueueKey("jobs"),
 jobId
 );
 }
 
 async destroy() {
 const args = ["id", "jobs", "waiting", "active", "succeeded", "failed"].map((key) =>
 this.createQueueKey(key)
 );
 const res = await this.config.redis.del(...args);
 return res;
 }

remove-job.lua

--[[
key 1 -> [prefix]:test:succeeded
key 2 -> [prefix]:test:failed
key 3 -> [prefix]:test:waiting
key 4 -> [prefix]:test:active
key 5 -> [prefix]:test:jobs
arg 1 -> jobId
]]
 
local jobId = ARGV[1]
 
if (redis.call("sismember", KEYS[1], jobId) + redis.call("sismember", KEYS[2], jobId)) == 0 then
 redis.call("lrem", KEYS[3], 0, jobId)
 redis.call("lrem", KEYS[4], 0, jobId)
end
 
redis.call("srem", KEYS[1], jobId)
redis.call("srem", KEYS[2], jobId)
redis.call("hdel", KEYS[5], jobId)
 

destroy() là xóa hoàn toàn toàn bộ hàng đợi và hai là xóa một công việc cụ thể khỏi hàng đợi.

Hãy xem mọi thứ hoạt động như thế nào

import { sleep } from "bun";
import Redis from "ioredis";
 
import { Queue } from "./queue";
 
type Payload = {
 id: number;
 data: string;
};
 
const queue = new Queue({
 redis: new Redis(process.env.UPSTASH_REDIS_URL),
 queueName: "mytest-queue",
});
 
async function main() {
 await generateQueueItems(queue, 20);
 console.log("Sleep starting for 5 sec");
 await sleep(5000);
 
 queue.on("succeeded", (jobId) => console.log("Succeeded jobId", jobId));
 await queue.process<Payload>((job) => {
 console.log("Processing job:", job.data);
 sleep(1000);
 }, 3);
}
 
main();
 
async function generateQueueItems(queue: Queue, itemCount: number) {
 for (let i = 0; i < itemCount; i++) {
 const payload = {
 id: i,
 data: `dummy-data-${i}`,
 // Add more properties as needed for your testing
 };
 const jobId = await queue.add(payload);
 console.log(`Added item ${i} with jobId: ${jobId}`);
 }
}

Thử thách thưởng

  • Triển khai logic thử lại với thời gian đợi theo cấp số nhân cho cả quy trình truy cập Redis và quy trình công nhân.
  • Phát triển cơ chế đảm bảo 'ít nhất một lần'.
  • Cố gắng thực thi các công nhân trong Service Workers để có hiệu suất tốt hơn
  • Thêm công việc đã lên lịch

Kết thúc

Cách tốt nhất để học điều gì đó là xây dựng nó và cách tiếp cận tốt hơn nữa là sử dụng Upstash Redis. Hãy tiếp tục rung chuyển.

🔗 Địa chỉ Github của dự án