早在一年多以前,我就已经开始试图在项目中异步化一些业务,例如系统的行为日志。当时选择的就是大名鼎鼎的RabbitMQ,这也是调查过不少同类产品后最终的选择,直到今天也无怨无悔~

最喜欢的一点并不是它的业务模型丰富,而是它支持的语言很全面,从php到java,c/c++,甚至nodejs,都可以很方便的使用(虽然c/c++下的库文档真的很少~)!

虽然我一直记着它的好,但悲剧的是早先调研它时学习的很多概念,时至今日已经忘不少了~所以感觉还是要写一篇博文记录下来,以备后用!

那就一个一个来吧:

##Message acknowledgment
当队列中的任务被你的消费者进程取走后,如果消费者处理中挂掉了,那这个任务也就丢失了(虽然可能只做了一半)!很多情况下这并不是我们可以接受的,所以 Ack 机制出现了,它给了我们一个很优的解决方案: 当消费者连接断开后,如果RabbitMQ没有收到消费者针对该任务的Ack,那么RabbitMQ就会认为该消费者挂掉了,同时会把该任务分给其他消费者。

这里还要注意的是:任务是没有超时限制的,也就是说只要消费者的连接有效,RabbitMQ就不会把任务再发送给其他消费者,这样可以保证某些需要耗时很久的任务正常执行。

尤其注意的是,千万不要忘记发送Ack,否则RabbitMQ会不停的把任务重复发送并且一直积累,直到崩溃~可以通过下面这个命令来查看当前没有收到Ack的消息个数:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

##Message durability
一般情况下当RabbitMQ退出或崩溃,那么队列和任务将会丢失,这当然是不能容忍的。RabbitMQ提供了持久化方案,只需要把队列声明成持久的即可(注意,RabbitMQ并不允许修改当前已存在队列的持久性)。

此外我们还需要把消息也设置成持久化的,这些都有对应的属性参数让我们来设置。

但注意,RabbitMQ并不能百分之百保证消息一定不会丢失,因为为了提升性能,RabbitMQ会把消息暂存在内存缓存中,直到达到阀值才会批量持久化到磁盘,也就是说如果在持久化到磁盘之前RabbitMQ崩溃了,那么就会丢失一小部分数据,这对于大多数场景来说并不是不可接受的,如果确实需要保证任务绝对不丢失,那么应该使用事务机制。

##Round-robin dispatching
这个机制是RabbitMQ最常用业务模型中的。一般我们选择异步任务,除了降低模块间的依赖外,还有一个理由就是有效规避大并发负载,尤其是针对http。

举个场景,网站上的找回密码功能,系统会向对应用户的邮箱发送修改密码的连接。如果是同步流程的话,大量用户同时请求该功能,由于发送邮件比较耗时,那么你的web服务器会持续等待,这个时候就可能会被大量涌入的请求搞死,即便是没死,也会大大影响其响应速度。

那么如果使用异步的话,我们可以把找回密码的请求都存到队列里,然后由后台进程逐步完成邮件发送的任务,web服务器就可以快速响应用户。

好,说了这么多,那么到底 Round-robin 是做什么的?看图:

上图中我们有两个消费者(C1,C2),它们同时从队列中领取任务并执行,默认情况下RabbitMQ会按照顺序依次把消息发送给C1和C2,这样可以保证每个消费者领到的任务个数都是相同的,这种分配任务的方式就是Round-robin。

任务耗时不均匀的情况下,这种方式可能并不是最佳的。

##Fair dispatch
上面说到了,由于默认情况下RabbitMQ不会去管任务到底是什么类型的(特指其耗时情况),它只会一味的按照 Round-robin 的算法把队列中的消息平均分配给所有消费者。还是上面的那个图,我们假设队列中的任务很奇葩,奇数任务是耗时久的,偶数任务是耗时低的,那么C1可能一直很忙,而C2则几乎没事儿可做!

听上去很不公平是吧?这就是因为 Round-robin 机制并不考虑每个消费者当前正在处理的任务数(换句话说,就是当前该消费者仍没有Ack的任务数)。

我们可以设置 prefetch 来避免上述情况,该设置可以告诉RabbitMQ:直到该消费者处理完当前指定数目的任务之前,不要再给消费者分配新任务(这是依赖统计该消费者的Ack情况来实现的,可见两者必须欧同时开启哦)。

如果当前所有消费者都在忙,那么任务将会阻塞在队列中,你可能需要增加消费者数量来避免大量任务被阻塞。

##Exchanges
上图中的那种架构并不是RabbitMQ推荐的,为什么这么说呢?RabbitMQ核心思想是生产者绝对不应该直接将任务投递到目标队列中,换句话说,生产者根本不需要知道任务最终应该会投递到哪里。

取而代之的,生产者只需要把任务发送到一个 Exchange 中即可,如下图:

Exchange 非常容易理解,它负责根据映射关系和投递模型把任务投递到队列中,有效的投递模型有:direct,topic,headers,fanout。官方提供的例子中就已经把这些模型讲的很清楚了。

剩下要做的就是把Exchange和Queue绑定到一起了,如果向一个没有绑定任何队列的Exchange发送任务,则任务都会被丢弃。

你可以通过下面的命令来查看绑定关系:

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

##Routing
其实这个机制是建立在 Exchanges 上的,有了Route,我们就可以实现根据类别,让Exchange来选择性的分发任务给匹配的队列。

要做到这点,只需要在为Exchange绑定queue时设置一个 routingKey 即可。注意,fanout类型的exchanges会忽略这个值,毕竟这种类型的exchange要实现的是广播机制。

如上图,我们这次使用的是 direct 投递模型的Exchage,这种模型下的路由逻辑非常简单:根据绑定时声明的routingKey来分发任务。

另外值得一提的是,绑定非常灵活,不仅可以像上图那样为一个队列绑定多个不同的routingKey,也可以为Exchage绑定多个队列同时监听相同的routingKey(这等同于fanout模型)。

##Topic exchange
我承认,可能排版上有点乱,因为按道理说这个概念应该合并到 Exchanges 中,但是由于它依赖 Routing ,所以我决定采用官方提供的学习步骤。

我们已经了解过direct,fanout两种投递模型。那么topic到底又是什么呢?

简单来说,topic只是为routingKey设置了一个规则(任意单词来描述主题,以”.”分割为不同层级,长度不能超过255位),有点命名空间的味道,这里称之为主题可能更加合适一些。

在规则中还提供了两个关键字:

  1. *:可以匹配任意1个单词;
  2. #:可以匹配任意0个或多个单词。

有点正则的味道,不过确实在direct模型的基础上进一步提升了灵活性。举个例子,如果我们用这么一个routingKey:*.love.*,那么投递任务时,任何这种模式主题的任务都会投递到对应队列,例如:everyone.love.kazaff。再如:kazaff.#,这意味着会匹配kazaff.me.is.cool,也会匹配kazaff.me,等等。

如果发送任务用的routingKey不能匹配声明的模式,那么任务就会被丢弃。

你可以把模式只定义为#来模仿fanout模型,也可以把模式定义为不包含*和#的具体字符串来模仿direct模型。

##Message properties
在AMQP协议中为每个任务预定义了14个属性,大多数属性都非常少用,常用的4个如下:

  1. deliveryMode:标识任务的持久性;
  2. contentType:用于描述任务数据的MIME-TYPE,例如application/json;
  3. replyTo:用于命名一个callback队列;
  4. correlationId:用于标识RPC任务的请求与响应的配对编号。

可想到为每一次RPC请求都创建一个回调队列是非常低效的。更高效的做法是为每一个发出RPC请求的客户端创建一个回调队列,但这又产生了一个新问题:如何知道回调队列中的响应是对应哪一个请求的呢?

这就是 correlationId 属性存在的意义,我们只要为每个请求设置一个唯一的值,我们就能在从回调队列中取到响应数据后根据correlationId找到对应的请求。

如果我们收到一个未知的correlationId响应,只需要忽略它既可。你可能会想,怎么可能收到未知的响应?确实,这种情况发生的概率不高,如下图:

假设S从rpc_queue中取得RPC任务后,进行处理,然后把响应发送给reply_to队列,此时S挂了,它还没来得及向rpc_queue发送ack!但是你知道的,其实整个RPC已经可以算完成了!

这个时候S重启完毕,它会再次取到刚才的那个RPC任务,再次处理,再次把结果发送给reply_to队列,这次它挺了下来没死机,并把ack发送到rpc_queue。但是,C端早已经在S第一次死机之前就拿到结果了,第二次发来的响应任务C自然找不到对应的correlationId。我这么说,你懂了么?


当目前为止,我已经把我认为重要的概念都提到了,有啥问题,私下讨论吧!