RabbitMQ 基于AMQP的开源消息代理软件 一、AMQP简介 1AMQP是什么? AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是进程之间传递异步消息的网络协议 。
2AMQP工作过程 发布者(Publisher)发布消息(Message),经过交换机(Exchange),交换机根据路由规则将收到消息分发给交换机绑定的队列(Queue),最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取 。
Publisher和Consumer就是两个Java项目,AMQP实体后面就是RabbitMQ
3队列 队列是数据结构中概念 。数据存储在一个队列中,数据是有顺序的,先进的先出,后进后出 。其中一侧负责进数据,另一侧负责出数据 。
MQ(消息队列)很多功能都是基于此队列结构实现的,队列也是解决高并发下排队问题的解决方案,即使恰巧出现同一时刻向队列添加的两个数据,也会有CPU帮助判断,放入到队列后,一定会有先后顺序 。
二、RabbitMQ简介 1RabbitMQ介绍 RabbitMQ是由Erlang语言编写的基于AMQP的消息中间件 。而消息中间件作为分布式系统重要组件之一,可以解决应用耦合,异步消息,流量削峰等问题 。
2RabbitMQ适用场景 2.1两大核心特性 RabbitMQ两大核心特性:异步消息、队列 。异步消息:只要异步消息就不阻塞线程,减少了主线程执行时间 。所有需要这种效果场景都可以使用MQ 。队列:进入队列的数据一定有先后之分 。只要应用程序要对内容分先后的场景都可以使用MQ 。 2.2具体场景 2.2.1排队算法 使用队列特性 。把数据发送给MQ,进入到队列就有了排队的效果
2.2.2秒杀活动 使用队列特性 。例如:抢红包、限时秒杀、直播卖货时抢商品 。使用了MQ按照顺序一个一个操作,当商品库存操作到0个时,秒杀结束 。
2.2.3消息分发 在程序中同时向多个其他程序发送消息 。应用了AMQP中交换机,实现消息分发 。
2.2.4异步处理 利用MQ异步消息特性 。大大提升主线程效率 。
2.2.5数据同步 利用异步特性 。我们电商中使用RabbitMQ绝大多数的事情就是在实现数据同步 。
2.2.6处理耗时任务 利用异步特性 。可以把程序中耗时任务(例如:发送邮件、发送验证码)交给MQ去处理,减少当前项目的耗时时间 。
2.2.7流量削峰 在互联网项目中,可能会出现某一段时间范围内,访问流量骤增的情况(如:双11、双12、618等),如果使用监控工具,会发现这段时间访问出现顶峰 。使用MQ可以把这些访问分摊到多个项目中,把流量分摊,去除了顶峰效果,这就叫做流量削锋 。利用RabbitMQ中交换机实现的 。
三、执行原理理解 客户端应用程序向RabbitMQ发送消息Message,在Message会包含路由键Routing Key,交换器Exchange接收到消息Message后会根据交换器类型Exchange Type解决把消息如何发送给绑定的队列Queue中,如果交换器类型是Direct这个消息只放入到路由键对应的队列中,如果是topic交换器消息放入到routing key匹配的多个队列中,如果是fanout交换器消息会放入到所有绑定到交换器的队列中 。等放入到队列中之后RabbitMQ的事情就结束了 。剩下的事情是由Consumer进行完成,Consumer一直在监听队列,当队列里面有消息就会把消息取出,取出后根据程序的逻辑对消息进行处理 。以上这些就是RabbitMQ的运行原理 。
1.Message
消息 。消息是不具名的,它由消息头消息体组成 。消息体是不透明的,而消息头则由
一系列可选属性组成,这些属性包括:routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出消息可能持久性存储)等 。
2.Publisher
消息的生产者 。也是一个向交换器发布消息的客户端应用程序 。
通俗说明:哪些项目向RabbitMQ发送消息,哪些项目就是Publisher
3.Consumer
消息的消费者 。表示一个从消息队列中取得消息的客户端应用程序 。
Consumer会一直监听指定的队列,只要队列中有消息,就会按照顺序依次取出 。
使用MQ做耗时任务时,耗时任务就交给Consumer进行完成 。
4.Exchange
交换器 。用来接收生产者发送的消息并将这些消息路由给服务器中的队列 。
一共支持四种的交换器类型
1.direct(发布与订阅 完全匹配)
2.fanout(广播)
3.topic(主题,规则匹配)
4.header(使用较少,相比direct就多了一些头信息)
5.Binding
绑定 。用于消息队列和交换器之间的关联 。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表 。
一个交换器里面可以绑定多个队列 。一个队列一般都是只绑定到一个交换器上 。消息发送给交换器,交换器会把效果按照特定规则发送给绑定的队列 。
6.Queue
消息队列 。用来保存消息直到发送给消费者 。它是消息的容器,也是消息的终点 。一个消息可投入一个或多个队列 。消息一直在队列里面,等待消费者连接到这个队列将其取走 。
7.Routing-key
路由键 。RabbitMQ决定消息该投递到哪个队列的规则 。(也可以理解为队列的名称,路由键是key,队列是value)
队列通过路由键绑定到交换器 。
消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ也会将其和绑定使用的路由键进行匹配 。
如果相匹配,消息将会投递到该队列 。
如果不匹配,消息将会进入黑洞 。
通俗理解:队列绑定到交换器时有路由键,这个路邮件就相当于key-value中的key,value是队列 。当Publisher发送消息时一定会携带路由键,有了路由键就让交换器知道了这个消息要发送给哪个队列 。
四、支持的四种交换机 交换器负责接收客户端传递过来的消息,并转发到对应的队列中 。在RabbitMQ中支持四种交换器 1.Direct Exchange:直连交换器(默认) 。通过路由键明确指定存储消息的一个队列 。
2.Fanout Exchange:扇形交换器 。把消息发送给所有绑定的队列 。
3.Topic Exchange:主题交换器 。按照路由规则,把消息发送给多个队列 。
4.Header Exchange:首部交换器 。比Direct多了一些头部消息,平时使用较少 。
在RabbitMq的Web管理界面中Exchanges选项卡就可以看见这四个交换器:
五、执行流程 MQ常规处理流程:
代码示例:
@RabbitListener(bindings = {@QueueBinding(exchange = @Exchange(value = https://tazarkount.com/read/ClientGrpcServerRabbitMqConstant.SUBSCRIBE_RECEIPT_EXCHANGE_DIRECT),value = @Queue(value = ClientGrpcServerRabbitMqConstant.SUBSCRIBE_RECEIPT_QUEUE, durable ="true"),key = ClientGrpcServerRabbitMqConstant.SUBSCRIBE_RECEIPT_KEY)})@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)public void consume(@Payload SubscribeCustomsDecMessage subscribeCustomsDecMessage, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {try {// 业务逻辑编写channel.basicAck(deliveryTag, false); } catch (Exception ex) {log.error("");try {channel.basicReject(deliveryTag, false);} catch (IOException e) {log.error("拒绝消息失败 subscribeCustomsDecMessage:{}", JsonUtils.toJson(subscribeCustomsDecMessage), e);} }} channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);【false为手动确认,true为自动确认】log.info("消息已经确认");channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);【true为重新放入队列,false不放入队列】log.info("消息拒绝"); 升级版MQ处理流程,在常规基础上增加死信队列: 将异常的消费可采取以下步骤进行处理: 1、消费异常,进入死信队列
2、死信固定ttl为1分钟
3、按业务评估最大重试次数
4、超过最大重试次数后发送告警到企业微信
5、独立服务处理失败消息,并依赖xxx 服务发送告警到企业微信,告警可配置开关
6、开发人员根据业务,与消息id快速定位日志,人工介入处理
为什么要使用mq的死信队列? 事务问题,事务没提交就消费了,业务代码错误,直接丢掉,很难发现问题
如果不丢掉,重新入队,陷入死循环,导致日志暴增 。
不能正常消费的消息叫死信,消费异常进入死信队列
利用TTL机制,一分钟重试一次,直到到达最大重试次数 。再发送告警到企业微信
MQ不适合延迟队列,为什么? 【RabbitMQ 基于AMQP的开源消息代理软件】例如:第一条延迟1分钟,第二条延迟5秒,但队列是排队的,第二条并不会先消费,只能等第一条消费之后才消费
- 春季老年人吃什么养肝?土豆、米饭换着吃
- 三八妇女节节日祝福分享 三八妇女节节日语录
- 老人谨慎!选好你的“第三只脚”
- 校方进行了深刻的反思 青岛一大学生坠亡校方整改校规
- 脸皮厚的人长寿!有这特征的老人最长寿
- 长寿秘诀:记住这10大妙招 100%增寿
- 春季老年人心血管病高发 3条保命要诀
- 眼睛花不花要看四十八 老年人怎样延缓老花眼
- 香槟然能防治老年痴呆症? 一天三杯它人到90不痴呆
- 老人手抖的原因 为什么老人手会抖
