为 NestJS 打造一个 MQTT 功能模块
因为业务上的需要,在 NestJS 开发的后端中,需要一个 MQTT Client,来订阅并接收来自设备的消息。于是便诞生了这个插件。可能细心的同学会发现在 NestJS 的官方文档中,有一个 MQTT 的模块,但是很可惜,那个 MQTT 是 NestJS 微服务架构下的可选的一种传输介质。
我们所使用的的 Broker 是 EMQX,它支持共享订阅,考虑到后续服务的横向扩容,MQTT Client 需要能对共享订阅做特殊处理,各种原因,诞生了这个插件。具体的制作思路和过程,记录于此。
目标
- 可以随项目启动一个 MQTT 实例
- 能够订阅 MQTT 的主题并接收消息
- 支持调用(或间接调用) MQTT.js 中的方法
实现
NestJS 独立的模块开发其实跟写一个普通的 NestJS 模块完全一致,这也使得 NestJS 的模块依赖管理都很方便,使用起来很舒服。
不过开发过程中也遇到了一些问题,主要在于需要对装饰器 Provider 进行遍历,而涉及到 @nestjs/core 中的一些方法,又没有足够的文档。参考着其他模块的源码,解决了这个问题。还有对动态模块的支持,也费了一些功夫,最终查到一篇 NestJS 开发团队的一个开发者写的一篇文章才解决,译文在这里。
实例化 MQTT
这个项目中是通过一个 Provider 来实现一个单例的 MQTT 客户端,并提供给其他部分使用。
定义装饰器
这个模块中的装饰器主要有两种,一个是用作定义订阅 Topic 的回调方法,另一种用于在回调方法中用于注入参数。
TS中的装饰器本质上就是一个有固定格式的函数的引用,在使用时候如果有参数,就封装成一个函数返回去这个固定格式的函数,再配合 Metadata,来给类、方法、参数进行标记,最终在项目启动的时候,对标记的内容做特殊处理。
例如 @Subscribe()
装饰器,用来标记一个方法是处理订阅回调,同时接受参数来对一些属性进行定制,比如主题名称、是否是共享订阅等。
而 @Topic()
、@Payload()
等装饰器标记了参数的含义,会记录方法的参数列表中的指定下标的参数为对应的含义,这样在调用这个方法的时候,可以直接在对应的位置传入正确的参数,甚至还可以通过装饰器的参数来对注入的值进行预处理,具体的调用示例如下:
1 | () |
既然做了标记,那么何时使用呢?
扫描并处理标记的方法
这里用到的是一个叫 DiscoveryModule
的模块,来自 @nestjs/core
。它的作用是获取项目中的 Provider,再配合 MetadataScanner
对每一个 Provider 进行扫描,判断是不是有被 @Subscribe()
标记过的方法,如果有就加到待处理列表和路由表中,根据待处理列表对扫描出来的主题进行订阅。
订阅主题后,就能接收 MQTT 发来的消息了,所有的消息都会触发 mqttClient.on('message', callback)
这个时间,这时候再通过内置的路由表找到正确的回调函数,再传入正确的参数,对这个方法进行调用。
至此,一个 MQTT 的消息到达后的工作流程就完成了,MQTT 模块已经能正常的接收消息了。
那么还有发送消息怎么办呢?
封装一个 Service
这里我的做法是封装了一个 Provider,对外提供了三个方法:订阅,取消订阅和发布。其实还应该有一个方法来获取 MQTT 实例的,这是一个疏忽。
这三个方法对 MQTT 的对应方法进行了简单的封装,并返回了 Promise。
模块设为全局模块,并且导出了这个 Provider。只需要在一个地方引入模块,就可以在任意注入这个 Provider 了。
配置项的传入
这个模块实现了两种配置项的传入方式:
forRoot(options) 支持直接传入参数来进行配置,调用形式为
1
2
3
4
5
6
7
8({
imports: [
MqttModule.forRoot({
host: '127.0.0.1'
})
],
})
export TestModule {}
forRootAsync(dynamicOptions) 支持通过注入 Provider 来提供配置,这里的具体原理可以参考文首提到的文章,并且是固定套路,学会了基本所有的模块都可以这么写。使用 useFactory 来示例如何使用这种方式:
1
2
3
4
5
6
7({
imports: [MqttModule.forRootAsync({
useFactory: (configService: ConfigService) => configService.mqtt,
inject: [ConfigService]
})]
})
export class TestModule {}
配置项中,我还对 EMQX 进行了一些增强,比如支持全局配置为共享订阅,自定义 Logger 等。
小结
至此,这个模块就完成了。一篇文章写下来,发现其实并没有太多内容。不过踩坑的时候怪折腾人的。也算是总结在这里,之后再遇到问题,也好有个参考的地方。
感兴趣的小伙伴可以封装个自己需要的库来试试了。