最近在用的一个 Node.js 消息队列库 - Bull

缘由

最近在工作中需要一个处理定时任务的库,起初找的是 Kue,但是 Kue 已经不维护了,同时还推荐了 Bull 这个库,就看了一下 Bull 的相关介绍,是一个用起来很简单,但是场景又足够覆盖的库。

于是在这里简单记录一下,相关的内容(此篇较水,大佬轻喷)。

Bull

官方说,Bull 是基于 Redis 实现的一个快速且强大的消息系统(队列)。

Bull 提供了可以很简单就能使用的消息队列、延时任务和定时任务。用过之后,只能说,真简单= =。

队列

通过构造函数可以创建一个队列,官方文档里的示例如下:

1
const myFirstQueue = new Bull('my-first-queue');

这个队列,有了队列,Bull 就提供了方法可以往队列里添加任务(Job)了:

1
2
3
const job = await myFirstQueue.add({
foo: 'bar'
});

添加的任务,会在正常的时机供消费者(Process)执行,普通的任务是在队伍排到了它的时候,延时任务和定时任务则是在拍到它且到了设定的执行时间。

1
2
3
4
5
const myFirstQueue = new Bull('my-first-queue');

myFirstQueue.process(async (job) => {
return doSomething(job.data);
});

还可以通过添加监听之后,在任务完成后进行处理

1
2
3
4
5
6
const myFirstQueue = new Bull('my-first-queue');

// Define a local completed event
myFirstQueue.on('completed', (job, result) => {
console.log(`Job completed with result ${result}`);
})

生命周期

贴一张官方的图,其实很清晰明了了。

life circle

其他特性

Bull 的队列还支持限速器,还有命名的任务,延时任务,定时任务,更改进出规则,设置优先级等功能,可谓是麻雀虽小,五脏俱全鸭。

具体的内容在官方文档里写的很详细,这里就不重复了。

Egg-bull

搜了下各种 egg-bull 的实现,基本都是把 bull 挂载到了 app 对象上,最多是根据配置自动创建队列。于是这两天不只是熟悉 Bull,按照自己的设想顺便撸了一个 Egg.js 的插件,主要使用注解来进行队列和处理方法的定义。

设计思路

原有的 Bull 类,会直接挂载到 app 对象上。在此基础上,插件会自动给 app 挂载一个 queue 的属性。

Egg-bull 提供一个基类(BaseQueue),所有的队列定义,都是继承自这个基类的类定义。BaseQueue 是继承自 Bull 类的一个类,同时在构造的时候注入了 Egg Application(app 对象),同时增加了一些内置方法,方便对队列进行一些处理。

使用插件的时候,只需要在特定目录下创建文件,并且默认导出一个继承自 BaseQueue 的类即可。这个类可以什么都没有,插件会根据文件名在 Bull 中创建一个队列。

定义的队列类中可以写任意不与原型链上的成员或方法重名的成员或方法,并且都可以通过 app.queue.name.xxx() 进行调用,name 为队列定义的名称,xxx是在类中定义的方法。

通过注解,定义一个或多个方法为队列中的一个回调函数,在插件启动时自动注册到队列中。

支持通过注解标识各种 Bull 事件的回调函数,如 completed 事件等。

对于 JavaScript 用户,这个等再想一下怎么来进行兼容。

挂载队列

首先使用 app.loadToApp() 方法将 app/queue 目录下的文件并实例化 Queue,将 Queue 对象挂载到 app.queue 对象上,再经过分析处理,自动注册指定名称的 Process,这样在代码里就可以这样写:

1
2
3
4
// Load app/queue/test.ts

app.queue.test.add();
app.queue.test.process();

消费者定义

插件定义了 Process() 这个装饰器,使得插件支持预定义指定名称的任务的处理回调。

给方法添加后,则会在插件启动的时候自动注册该消费者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import { BaseQueue, Process } from "egg-bull";

export default class FirstQueue extends BaseQueue {

/**
* 定义一个设置了 name 和 concurrency 的处理器,在插件启动的时候自动注册
*/
@Process({ name: 'normal-job', concurrency: 2 })
async normalJob(job) {
console.log(job.data);
}

/**
* 定义一个默认处理器,在插件启动的时候自动注册,注册名为方法名
*/
@Process()
async secondJob(job) {
console.log(job.data);
}
}

全局回调

不受进程、服务影响,可在全局都能接收到的回调方法:

  • GlobalCompleted() 任务完成的回调函数,用来处理任务成功的相关操作
  • GlobalProgress() 定义全局的进度回调方法,可跨进程、跨服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import { BaseQueue, Completed } from "egg-bull";

export default class FirstQueue extends BaseQueue {
/**
* 定义队列中任务完成的回调函数,在插件启动的时候自动注册
* 方法可接受两个参数,第一个为 JobId 值,第二个为 Process 返回的结果
*/
@GlobalCompleted()
async globalCompleted(jobId, result) {
const job = await this.getJob(jobId);
console.log(job, result);
}

/**
* 定义队列中监听进度的回调函数,在插件启动的时候自动注册
* 方法可接受两个参数,第一个为 JobId 值,第二个为当前的进度
*/
@GlobalProgress()
async globalProgress(jobId, progress) {
const job = await this.getJob(jobId);
console.log(job, progress);
}
}

队列回调

定义多个回调用于处理队列中的各种事件,区别于全局回调,这些回调方法监听在单一 Worker 上:

  • Completed() 监听队列中完成的任务

  • Progress() 监听队列进度变化的回调函数

  • Error() 监听队列中执行错误的回调函数

  • Waiting() 监听队列中正则等待的事件

  • Active() 监听队列中启动的任务,可以在此取消任务

  • Stalled()

自定义业务封装

由于在类定义中可以任意写方法,且这些可以通过 app.queue.name.xxx() 进行调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
export { BaseQueue } from 'egg-bull';

export default class FirstQueue extends BaseQueue {
/**
* 封装添加任务的方法,甚至可以在里面结合业务。
*/
async addNormoJob(data) {
// 添加任务
this.add('normal-job', {
data,
});
}
}

Bull UI(待添加)

有现成的库提供可视化的 Web 界面供查看,准备集成到插件中。下面是几个现成的库,等挑选一下。

逻辑封装

还需要对一些辅助功能进行封装,比如清除指定范围的成功记录,因为在一篇文章中看到讲日益增多的任务记录会导致网络性能问题和内存爆满的问题,这部分逻辑可以做下封装并暴露 API,方便调用。

小结

其实主要是想讲正在搞的 Egg-bull 插件,前面倒是不少废话了一堆 Bull 文档里的内容。暂时按照自己的理解和想法封装了这个插件,在最近开发业务时候也会进行一定的调整,等调整好之后,再发布到 npm 并开源到 Github。有看了文章之后有指导意见的大佬欢迎来提哦~

感谢。

参考文档: