为 NestJS 打造一个 MQTT 功能模块

因为业务上的需要,在 NestJS 开发的后端中,需要一个 MQTT Client,来订阅并接收来自设备的消息。于是便诞生了这个插件。可能细心的同学会发现在 NestJS 的官方文档中,有一个 MQTT 的模块,但是很可惜,那个 MQTT 是 NestJS 微服务架构下的可选的一种传输介质。

我们所使用的的 Broker 是 EMQX,它支持共享订阅,考虑到后续服务的横向扩容,MQTT Client 需要能对共享订阅做特殊处理,各种原因,诞生了这个插件。具体的制作思路和过程,记录于此。

目标

  • 可以随项目启动一个 MQTT 实例
  • 能够订阅 MQTT 的主题并接收消息
  • 支持调用(或间接调用) MQTT.js 中的方法

实现

NestJS 独立的模块开发其实跟写一个普通的 NestJS 模块完全一致,这也使得 NestJS 的模块依赖管理都很方便,使用起来很舒服。

不过开发过程中也遇到了一些问题,主要在于需要对装饰器 Provider 进行遍历,而涉及到 @nestjs/core 中的一些方法,又没有足够的文档。参考着其他模块的源码,解决了这个问题。还有对动态模块的支持,也费了一些功夫,最终查到一篇 NestJS 开发团队的一个开发者写的一篇文章才解决,译文在这里

实例化 MQTT

这个项目中是通过一个 Provider 来实现一个单例的 MQTT 客户端,并提供给其他部分使用。

定义装饰器

这个模块中的装饰器主要有两种,一个是用作定义订阅 Topic 的回调方法,另一种用于在回调方法中用于注入参数。

TS中的装饰器本质上就是一个有固定格式的函数的引用,在使用时候如果有参数,就封装成一个函数返回去这个固定格式的函数,再配合 Metadata,来给类、方法、参数进行标记,最终在项目启动的时候,对标记的内容做特殊处理。

例如 @Subscribe() 装饰器,用来标记一个方法是处理订阅回调,同时接受参数来对一些属性进行定制,比如主题名称、是否是共享订阅等。

@Topic()@Payload() 等装饰器标记了参数的含义,会记录方法的参数列表中的指定下标的参数为对应的含义,这样在调用这个方法的时候,可以直接在对应的位置传入正确的参数,甚至还可以通过装饰器的参数来对注入的值进行预处理,具体的调用示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Injectable()
class MqttService {
// @Subscribe() 标记 test() 方法会在收到 testtopic 的消息时被调用
@Subscribe('testtopic')
async test(
// 标记第一个参数是主题名称
@Topic() topic: string,
// 标记第二个参数为数据,并且传入了一个方法将 buffer 转为字符串
@Payload(buffer => buffer.toString()) payload: string,
) {
// do something here...
}
}

既然做了标记,那么何时使用呢?

扫描并处理标记的方法

这里用到的是一个叫 DiscoveryModule 的模块,来自 @nestjs/core 。它的作用是获取项目中的 Provider,再配合 MetadataScanner 对每一个 Provider 进行扫描,判断是不是有被 @Subscribe() 标记过的方法,如果有就加到待处理列表和路由表中,根据待处理列表对扫描出来的主题进行订阅。

订阅主题后,就能接收 MQTT 发来的消息了,所有的消息都会触发 mqttClient.on('message', callback) 这个时间,这时候再通过内置的路由表找到正确的回调函数,再传入正确的参数,对这个方法进行调用。

至此,一个 MQTT 的消息到达后的工作流程就完成了,MQTT 模块已经能正常的接收消息了。

那么还有发送消息怎么办呢?

封装一个 Service

这里我的做法是封装了一个 Provider,对外提供了三个方法:订阅,取消订阅和发布。其实还应该有一个方法来获取 MQTT 实例的,这是一个疏忽。

这三个方法对 MQTT 的对应方法进行了简单的封装,并返回了 Promise。

模块设为全局模块,并且导出了这个 Provider。只需要在一个地方引入模块,就可以在任意注入这个 Provider 了。

配置项的传入

这个模块实现了两种配置项的传入方式:

  • forRoot(options) 支持直接传入参数来进行配置,调用形式为

    1
    2
    3
    4
    5
    6
    7
    8
    @Module({
    imports: [
    MqttModule.forRoot({
    host: '127.0.0.1'
    })
    ],
    })
    export TestModule {}
  • forRootAsync(dynamicOptions) 支持通过注入 Provider 来提供配置,这里的具体原理可以参考文首提到的文章,并且是固定套路,学会了基本所有的模块都可以这么写。使用 useFactory 来示例如何使用这种方式:

    1
    2
    3
    4
    5
    6
    7
    @Module({
    imports: [MqttModule.forRootAsync({
    useFactory: (configService: ConfigService) => configService.mqtt,
    inject: [ConfigService]
    })]
    })
    export class TestModule {}

配置项中,我还对 EMQX 进行了一些增强,比如支持全局配置为共享订阅,自定义 Logger 等。

小结

至此,这个模块就完成了。一篇文章写下来,发现其实并没有太多内容。不过踩坑的时候怪折腾人的。也算是总结在这里,之后再遇到问题,也好有个参考的地方。

感兴趣的小伙伴可以封装个自己需要的库来试试了。