C# 基于微服务开发框架的设计思路(四)
不管是独立的一个应用或者是微服务之间的通讯,甚至是分布式事务等,都需要建立一个消息通知机制。为什么要这么说:
一、内部通知。
在之前,已经提到微服务的内部消息通知,BLL横向之间是不能互相直接调用的,业务套业务,算什么事?哪怕有上下关系,也不能业务套业务,毕竟是独立的个体,也就是面向对象中描述的边界定义的问题。因此,需要建立一个单个应用的内部消息机制,这个机制也能很好解决了解耦的问题。举例说明,当时我做一个项目,是一个生产管理系统,在审核或者取消审核生产任务的时候,需要向第三方应用发送一个通知或者记录一个标记,一开始第三方应用是不支持API通讯的,使用数据库中间表作为传递数据的媒介,当时也没多想,就把这个设计成一个插件,在审核或者撤销审核成功后,发布消息
public RValue DoCheck(TaskInfo tsk)
{
//此处审核业务逻辑代码
PublishIM("checkedTask"); //发布内部消息
}
然后建立一个单独的项目,在里边建立一个业务层
public class BLLTaskInfoEx
{
[IMConsume("checkedTask")] public RList IMTaskChecked(object cuttingHdrs) {
//此处处理接口的逻辑代码
}
}
为什么要使用内部消息?其实很简单,毕竟这个接口也只是和生产有关,和其他微服务都没有关系,自己处理好自己的事情就好了。
后来客户更换了第三方软件,改用API接口。如果是以往经验,这次肯定要改程序了,可是因为采用插件开发了对接,这次需求,并没有任何“修改”的必要,只要把之前建立的项目文件(dll文件)直接从bin目录中移除,然后重新创建一个项目,并且同样接收相同的消息就可以了,也降低了维护风险。当然,如果两者都使用也没有问题的。
从这个例子可以看出,内部消息不仅仅是传递消息这么简单,也完美解决解耦的问题。
那么如何才能实现这样的功能?其实很多第三方框架都有MessageCenter之类的消息类,分析过第三方的一些消息类,都需要在应用的某个地方执行订阅的方法,消息才能通畅。而我写的这个消息通知,是不需要做任何订阅方的初始化的。
定义一个IStarter的内部消息类(框架自带的)
public class IMStarter:IStarter
{
public void Start()
{
//1、在程序集范围内搜索所有带有标注IMConsume的方法并且做缓存
//此处省略代码
}
}
这样,所有有订阅的方法都收集完毕了,一个不少,后续就是怎么调用这些方法了
IMConsume(string topic,int index)
topic:订阅的消息名称,只要发布和订阅的消息名称一致(区分大小写),然后把所有方法都拿出来,按index从小到大排序,小的优先级高,只要有一个调用出错,后续就终止调用后续方法。
发布端发布消息时,在缓存中找到消息,然后逐个调用,其实类似委托,只是完全解耦而已。这样的设计,在解决解耦的同时,也提高了调用时候的性能,毕竟要调用的方法都已经缓存成一个列表了,无需每次去程序集搜索。
好吧,内部消息就这么弄好了。
二、微服务间消息
微服务与微服务之间,也存在消息上的通知,这个就需要另外的插件支持了。其原理与内部消息也差不多,就是扩展一下,我这里选用了RabbitMQ,有丰富的文档且支持各种开发语言,使用起来还是很方便的,性能也不错,毕竟也不是高密度使用的,至于怎么使用RabbitMQ,这里不细说,看文档或者上网搜索一下相关知识就可以了。
应用举例,当新加入一个员工,其他微服务为了方便使用员工数据,就在各自微服务中冗余员工数据,这些冗余的数据又不需要前端维护的,那么当员工数据新增或者编辑成功后,就向外发布消息
[MQPublish]
public RValue Insert(Employee item)
{
}
[MQPublish]
public RValue Update(Employee item)
{
}
[MQPublish]
public RValue Delete(Employee item)
{
}
这里使用了AOP,每个BLL都赋予了对应的API资源名称,当成功写入数据后,就通过MQ发送数据,否则只是记录错误日志。
需要知道员工信息变更的微服务,在对应处理变更的方法上打上标签
[MQConsume("basic","employee","insert")]
public RValue mqInsert(thisEmployee item)
{
}
MQConsume的三个参数为:
appid : "basic" 微服务的标识,在哪个微服务传递过来的消息
resource:"employee" API资源名称,从哪个微服务下的资源发出的消息
action:"insert" 发布消息的方法名称
接收消息端的方法和内部消息类似,建立一个MQStarter就可以了,其中,需要根据订阅的方法,在RabbitMQ中建立对应的exchange,queue,为什么要建立exchange?如果订阅的微服务先于发布的微服务启动呢?MQStarter开始订阅会丢失数据的,因为没有绑定exchange和queue。注意,这里所有订阅的消息,都要经过MQStarter中转才能成功转发到每个订阅方法中,毕竟不可能在每个订阅方法启动订阅,为什么?还没有实例,怎么启动订阅?
在MQStarter中建立多个队列,每个队列对应一个Queue,把接收到的消息,原封不动的存到对应的队列中,队列是框架内自带的
var queue = new CYBQueue(msg)=>{
//处理队列取出的方法;
});
接收到的消息,只需要queue.Enqueue(T item)就可以了,队列对象会根据先进先出的规则把队列中的元素弹出。队列中的回调方法就类似于内部消息的方法了,找到对应的MethodInfo然后执行。
为了能够让前端看到队列处理异常,在调用方法失败或者异常后,把这些信息记录日志并且通过API发送到系统管理微服务中统一处理(这里后续再说,截图就明白了)
建立了这样的消息机制后,微服务之间的通讯畅通无阻,具体传递什么参数,这里也不作说明,根据自己设计需求做即可。
详细了解一下:微服务下,简单实现数据变更通知_golang微服务信息变更了怎么通知_kaka9的博客-CSDN博客
三、消息机制的扩展应用
内部消息的应用在内部消息的例子中有说明了。下面举例微服务间的消息,开发生产系统的同时,很多时候需要制作大屏,展示生产进度等信息。这样的应用需求,我在以前是在数据库查询数据后再推送到大屏上的,严重影响生产系统的性能!
通过MQ机制,把消息外发,至于存在多少个接收端,和生产系统没有关系,那是RabbitMQ的问题,外部建立一个大屏微服务,在大屏微服务内把每接收到的消息自行按照大屏展示需求处理数据即可,至于是怎么处理,八仙过海了,如果想了解,可以参考一下基于微服务架构的简单的IOT系统_简易iot数据采集系统架构_kaka9的博客-CSDN博客
信息处理系统和物联网系统就实现了联通,当然,这里也存在一定的安全风险,至于安全怎么做,那就是另外一个话题了。
通过上面说明,进一步说明了框架的扩展性上的方便性,程序员并不需要知道这些消息时怎么传递的,也无需去做任何调试,只需要知道发布和订阅即可,至于是内部还是外部,这个是很明显的事情,当然,内部消息也可以采用外部消息的方法,但是外部消息时不能传递对象的,因为参数传递时通过json格式传递的,而内部消息,是直接把对象作为参数传递的,这里各有好处了,关键是怎么使用。另外,由于调用方法存在复杂性,接收消息的方法只允许一个参数,如果实在要传递多个参数,那么把参数定义成一个对象就好了,类似于API的POST。设计和使用框架都需要有一个规则,不能太过随意,否则更乱。
另外,补充一点,分布式事务和消息还不太一样,需要具有一致性,在框架中,分布式事务是采用api的方法实现的,且分布式事务是一个黑盒子,程序员是完全接触不到的,具体参考
在微服务中,利用webapi实现分布式事务_webapi 微服务_kaka9的博客-CSDN博客