扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
下载镜像docker pull rabbitmq:3.8-management
让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:域名与空间、雅安服务器托管、营销软件、网站建设、吉利网站维护、网站推广。
执行下面的命令来运行MQ容器:docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=zhangbo123456* \
-v mq-plugins:/plugins \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
什么是消息队列MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
RabbitMQ快速入门RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com
SpringAMQP1,Basic Queue 简单队列模型
2,Work Queue 工作队列模型
3,发布订阅模型 fanout
4,发布订阅模型 Direct
5,发布订阅模型 Topic
6,消息转换器
概念:
AMQP:是用于在应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求
SpringAMQP:是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现
AMQP和JMS区别和联系MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
两者间的区别和联系:
JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
JMS规定了两种消息模型;而AMQP的消息模型更加丰富
HelloWorld案例快速开始官方的helloword是基于最基础的消息队列模型来实现的,其中包括三个角色
1,publisher:消息发布者,要将消息发布到队列queue
2,queue:消息队列,负责接收并缓存消息
3,consumer:订阅队列,处理队列中的消息
基本消息队列的消息发送流程
1,建立connection
2,创建channel
3,利用channel声名队列
4,利用channel向队列发送消息
基本消息队列的消息接收流程
1,建立connection
2,创建channel
3,利用channel声名队列
4,定义consumer的消费行为handleDelivery
5,利用channel将消费者与队列绑定
第一步导入依赖
org.springframework.boot
spring-boot-starter-amqp
第二步编写配置文件
spring: rabbitmq: host: 47.99.139.160 #主机 port: 5672 #端口号 virtual-host: / #虚拟主机 username: itcast #用户名 password: zhangbo123456* #密码
第三步编写测试方法
@Autowired RabbitTemplate rabbitTemplate; @Test void contextLoads() { String queueName = "simple.queue"; String message = "hello , spring amqp"; rabbitTemplate.convertAndSend(queueName,message); }
小注:这个消息不会 创建队列,所以要手动创建队列
第四步在Consumer中编写消费逻辑,监听队列
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimplateQueueMessage(String msg) throws InterruptedException{ System.out.println("spring消费者接收到消息:"+msg); } }
消息预取限制
修改application.yml,设置preFetch这个值,可以控制预取消息的上线
spring:
rabbitmq:
host: 47.99.139.160 #主机
port: 5672 #端口号
virtual-host: / #虚拟主机
username: itcast #用户名
password: zhangbo123456* #密码
listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一条消息
发布 订阅发布订阅模式允许将同一消息发送个多个消费者,实现方式是加入了exchange
常见exchange类型包括
Fanout:广播
Direct:路由
Topic:话题
发布订阅-Fanout ExchangeFanout Exchange会将接收到的消息路由到每一个跟其绑定的queue(可以用于实现广播模式)
实现思路:
1,在consumer服务中,利用代码声明队列,交换机,并将两者绑定
2,在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
3,在publisher中编写测试方法,向itcast.fanout发送消息
步骤一 :在consumer服务声名exchange,queue,binding,在consumer服务声名一个配置类,添加@Configuration注解,并声明FanoutExchange,queue和绑定关系对象binding
@Configuration public class FanoutConfig { //声名FanoutChange交换机 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); } //声名第一个队列 @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } //绑定队列一和交换机 @Bean public Binding bindingQueue1(Queue fanoutQueue1 , FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } //...略,以相同的方式声名第二个队列,并完成绑定 }
consumer代码
//fanout 模式 @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueueMessage(String msg) throws InterruptedException{ System.out.println("spring消费者接收到fanout.queue1消息:"+msg); } //fanout 模式 @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueueMessage2(String msg) throws InterruptedException{ System.out.println("spring消费者接收到fanout.queue2消息:"+msg); }
publisher代码
//fanout 模式 @Test public void testSendFanoutExchange(){ //交换机名称 String exchangeName = "itcast.fanout"; //消息 String message = "hello , every one"; //发送消息 rabbitTemplate.convertAndSend(exchangeName,"",message); }
总结:
交换机的作用?
1,接收publisher发送的消息
2,将消息按照路由规则路由到与之绑定的队列
3,不能缓存消息,路由失败,消息丢失
4,FanoutExchange的会将消息路由到每个绑定的队列
声名队列,交换机,绑定关系的bean是什么?
queue
fanoutExchange
Binding
发布订阅-DirectExchangeDirect Exchange会将接收到的消息根据规则路由到指定的queue,因此称之为路由模式(routes)
每一个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
案例实现思路
1,利用@RabbitListener声名Exchange,Queue,RoutingKey
2,zaiconsumer服务中,编写两个消费者方法,分别监听direct.queue和direct.queue2
3,在publisher中编写测试方法,向itcast.direct发送消息
consumer
//direct模式 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), key = {"red","blue"} )) public void listenDirectQueue(String msg){ System.out.println("spring消费者接收到direct.queue1消息:"+msg); } //direct模式 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), key = {"red","yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("spring消费者接收到direct.queue2消息:"+msg); }
publisher
//direct 模式 @Test public void testSendDirectExchange(){ //交换机名称 String exchangeName = "itcast.direct"; //消息 String message = "hello , smoky"; //发送消息 参数分别是:交换机名称 RoutingKey(暂时为空,路由key),消息 rabbitTemplate.convertAndSend(exchangeName,"smoky",message); }
总结:
描述direct交换机和fanout交换机的差异?
fanout交换机将消息发送给每一个与之绑定的队列
directii交换机根据RoutingKey判断路由给那个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于@RabbitListener注解声名队列和交换机有哪些常见注解?
@Queue
@Exchange
发布订阅-TopicExchangeTopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割
Queue与Exchange指定BIndingKey时可以指定通配符
#:代指0个或多个单词
*:代指一个单词
案例实现思路
1,利用@RabbitListener声名Exchange Queue RoutingKey
2,在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
3,在publisher中编写测试方法,向itcast.topic发送消息
consumer
//topic模式 @RabbitListener(bindings = @QueueBinding( value = @Queue("topic.queue1"), exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC), key = "chain.#" )) public void listenTopictQueue1(String msg){ System.out.println("spring消费者接收到topic.queue1消息:"+msg); } //topic模式 @RabbitListener(bindings = @QueueBinding( value = @Queue("topic.queue2"), exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC), key = "*.news" )) public void listenTopictQueue2(String msg){ System.out.println("spring消费者接收到topic.queue2消息:"+msg); }
publisher
//direct 模式 @Test public void testSendTopictExchange(){ //交换机名称 String exchangeName = "itcast.topic"; //消息 String message = "今天天气很好呀"; //发送消息 参数分别是:交换机名称 RoutingKey(暂时为空,路由key),消息 rabbitTemplate.convertAndSend(exchangeName,"chain.weather",message); }
测试发送Object类型消息,消息转换器说明:在SpringAMQP的发送方法中,接收到的消息类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送,用的jdk的序列化器
补充: 使用jdk的序列化器的缺点:1,性能比较差 2,安全性不好,容易出现注入的问题 3,数据长度长,占用额外内存
测试代码
//测试Object类型消息 @Test public void sendObjectQueue(){ Map
msg = new HashMap<>(); msg.put("name","柳岩"); msg.put("age",21); rabbitTemplate.convertAndSend("object.queue",msg); }
Spring的对消息对象的处理是由import org.springframework.messaging.converter.MessageConverter;来处理的,而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化
如果要修改只需要定义一个MessageConverter类型的bean即可,推荐使用JSON的方式序列化
引入依赖
com.fasterxml.jackson.core
jackson-databind
声名一个MessageConverter类型的bean
@Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }
consumer
引入依赖
com.fasterxml.jackson.core
jackson-databind
consumer服务定义MessageConverter
@Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }
SpringAMQP中消息的序列化和反序列化是怎么实现的
利用MessageConverter实现的,默认是JDK的序列化
注意发送方接收必须使用相同的MessageConverter
MQ的一些常见问题1,消息可靠性:如何确保发送的消息至少被消费一次
2,延迟消息问题:如何实现消息的延迟投递
3,高可用问题:如何避免单点的MQ故障而导致的不可用问题
4,消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题
消息可靠性问题
消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?
发送时丢失,
生产者发送的消息未到达exchange
消息到达exchange后未到达queue
MQ宕机,queue将消息丢失
consumer接收到消息后未消费就宕机
生产者确认机制
RabbitMq提供了publisher confirm机制避免消息发送到MQ的过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功,结果有两种请求
publisher-confirm,发送者确认
消息成功投递到交换机返回ack
消息未投递到交换机,返回nack
publisher-return
消息投递到交换机了,但是没有路由到队列,返回ACK,及路由失败原因
注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同的消息,避免ack冲突
消费者确认
RabbitMQ支持消费者确认机制,即消费者成功处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息,
而SpringAMQP允许配置三种确认模式
manual:手动ack,需要在业务代码结束后,调用api发送ack
auto:自动ack,由spring检测listener代码是否出现异常,没有异常则返回ack,抛出异常则返回nack
none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后会立即被删除
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流