扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
这期内容当中小编将会给大家带来有关如何进行RabbitMq的简单使用,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
创新互联是一家从事企业网站建设、做网站、成都网站建设、行业门户网站建设、网页设计制作的专业的建站公司,拥有经验丰富的网站建设工程师和网页设计人员,具备各种规模与类型网站建设的实力,在网站建设领域树立了自己独特的设计风格。自公司成立以来曾独立设计制作的站点超过千家。
1.pom文件中加入依赖
org.springframework.boot spring-boot-starter-amqp 2.3.3.RELEASE
2.配置文件,配置mq
自动配置信息 这里我开启ACK消息确认server.port=8088#服务器配置spring.application.name=rabbitmq-test-sending#rabbitmq连接参数spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guest spring.rabbitmq.password=guest# 开启发送确认spring.rabbitmq.publisher-confirms=true# 开启发送失败退回spring.rabbitmq.publisher-returns=true# 开启ACKspring.rabbitmq.listener.direct.acknowledge-mode=manual
3.Rabbit配置类,使用topic交换器,使用通配符,一个交换器对应多个queue
import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class RabbitmqConfig {//队列 @Bean public Queue queueTest1(){return new Queue("queueTest1",true); }/* * 设置消息队列的TTL(过期时间) * */ @Bean public Queue queueTest2(){/** * 队列的名称,是否持久化,是否独享、排外的,是否自动删除,队列的其他属性参数 * (1)x-message-ttl:消息的过期时间,单位:毫秒; * (2)x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒; * (3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息; * (4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息; * (5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。 * (6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中; * (7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值 * (8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false) * (9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级; * (10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息; * (11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。 */ Maparguments = new HashMap<>(); arguments.put("x-message-ttl", 5000);return new Queue("queueTest2", true, false, false, arguments); }//交换机 @Bean public TopicExchange exchangeTest(){//可以传exchange名字,是否支持持久化,是否可以自动删除 return new TopicExchange("exchangeTest",true,false); }@Bean public Binding bindQueueTest1AndExchange(){return BindingBuilder.bind(queueTest1()).to(exchangeTest()).with("phone.routing.*"); }@Bean public Binding bindQueueTest2AndExchange(){return BindingBuilder.bind(queueTest2()).to(exchangeTest()).with("web.routing.*"); } }
4.生产者
import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.util.Date;import java.util.UUID;/** 生产者,带消息确认* */@Servicepublic class PruSender implements RabbitTemplate.ReturnCallback {@Autowired private RabbitTemplate rabbitTemplate;//routing_key,把消息发送到相应的队列中 public void sendMessage(String routing_key){//发送内容 String context = "你好现在是 " + new Date();this.rabbitTemplate.setReturnCallback(this);//发送失败退回 this.rabbitTemplate.setConfirmCallback((correlationData,ack,message)->{//手动发送消息确认 if(!ack){ System.out.println("消息发送失败" + message + correlationData.toString()); }else{ System.out.println("消息发送成功" + correlationData.toString()); } }); CorrelationData correlationData = new CorrelationData(); correlationData.setId(UUID.randomUUID().toString());//交换机名称、routingKey、内容、消息Id this.rabbitTemplate.convertAndSend("exchangeTest",routing_key, context,correlationData); }@Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("sender return success" + message.toString() + "===" + i + "===" + s1 + "===" + s2); } }
5.消费者
import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Service;import java.io.IOException;/** 消费者,带消息确认** */@Service@RabbitListener(queues = "queueTest")public class Receiver {//消息内容,通道,消息的属性信息 @RabbitHandler public void immediateProcess(String text,Channel channel,Message message) throws IOException {try { System.out.println("Receiver" + text);/** * 手动确认,通知mq已经成功消费改条信息,可以删除了 * //消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息 */ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace();/* *消费消息失败 * 第二个参数是否应用于多消息,第三个参数是否从新计入队列 * */ channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true); } } }
交换机类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
上述就是小编为大家分享的如何进行RabbitMq的简单使用了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注创新互联行业资讯频道。
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流