扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记。
成都创新互联公司专注于巴中网站建设服务及定制,我们拥有丰富的企业做网站经验。 热诚为您提供巴中营销型网站建设,巴中网站制作、巴中网页设计、巴中网站官网定制、微信平台小程序开发服务,打造巴中网络公司原创品牌,更为您提供巴中网站排名全网营销落地服务。上一篇介绍了AMQP消息通信,包括队列、交换器和绑定,通过虚拟主机还可以隔离数据和权限,消息持久化和发送方确认模式确保了消息不丢失。
本篇主要介绍如何运行和管理RabbitMQ,在介绍之前,会有个DEMO演示消息发送和接收,一方面对AMQP的元素有更直观的认识,一方面为后面介绍监控做数据来源。
通过介绍,你会了解到:
消息发送和接收简单实现 服务器管理-启动和停止节点 权限配置 使用统计 消息发送和接收简单实现该Demo主要用于收集日志,消息发送者是各个应用子系统,消息接收者是日志收集服务,使用RabbitMQ可以很容易实现。
基于SpringBoot框架实现,主要类的作用如下:
LogRabbitConfig:创建队列、交换器、绑定等初始化操作; Sender:消息发送者; AllReceiver:所有级别日志接收者,接收所有级别的日志; ErrorReceiver:错误级别日志接受者,只接收错误级别的日志; LogSenderTest:测试用例类;消息模型如下:
配置首先,配置springboot和rabbitmq依赖:
然后在application.properties文件中配置rabbitmq地址:
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.publisher-confirms=true spring.rabbitmq.virtual-host=/ LogRabbitConfig实现
使用Spring的@Configuration定义配置类,可替换xml配置文件,被注解的类内部包含有一个或多个被@Bean注解的方法,用于构建bean定义,初始化Spring容器。
@Configuration publicclassLogRabbitConfig{ finalstaticStringQUEUE_LOG_ERROR="log.error"; finalstaticStringQUEUE_LOG_ALL="log.all"; //创建log.error队列 @Bean publicQueuelogError(){ returnnewQueue(QUEUE_LOG_ERROR); } //创建log.all队列 @Bean publicQueuelogAll(){ returnnewQueue(QUEUE_LOG_ALL); } //创建exchange,命名为log @Bean TopicExchangeexchange(){ returnnewTopicExchange("log"); } //绑定log.error队列到exchange,routingkey为log.error @Bean BindingbindingExchangeError(QueuelogError,TopicExchangeexchange){ returnBindingBuilder.bind(logError).to(exchange).with("log.error"); } //绑定log.all队列到exchange,routingkey为log.# @Bean BindingbindingExchangeAll(QueuelogAll,TopicExchangeexchange){ returnBindingBuilder.bind(logAll).to(exchange).with("log.#"); } } Sender实现
各个子系统向rabbitmq服务器发送消息:
@Component publicclassSender{ @Autowired privateAmqpTemplaterabbitTemplate; publicvoidsend(){ //向mq服务端发送消息,exchange为log,routingkey为log.error Stringcontext="errorlog"; this.rabbitTemplate.convertAndSend("log","log.error",context); //向mq服务端发送消息,exchange为log,routingkey为log.info context="infolog"; System.out.println("sendmsg:"+context); this.rabbitTemplate.convertAndSend("log","log.info",context); //向mq服务端发送消息,exchange为log,routingkey为log.warn context="warnlog"; System.out.println("sendmsg:"+context); this.rabbitTemplate.convertAndSend("log","log.warn",context); } } AllReceiver和ErrorReceiver实现
从rabbitmq服务器接收消息。
AllReceiver从服务器的log.all队列获取消息,因为它绑定的routingkey为"log.#",所以,会收到所有级别的日志:
@Component @RabbitListener(queues="log.all") publicclassAllReceiver{ @RabbitHandler publicvoidprocess(Stringcontext){ System.out.println("receivelog:"+context); } }
ErrorReceiver从服务器的log.error队列获取消息,因为它绑定的routingkey为"log.error",所以,只会收到error级别的日志:
@Component @RabbitListener(queues="log.error") publicclassErrorReceiver{ @RabbitHandler publicvoidprocess(Stringcontext){ System.out.println("receiveerror:"+context); } } LogSenderTest测试用例
测试用例很简单,就是调用Sender发送消息,观察消息的接收情况。
@RunWith(SpringRunner.class) @SpringBootTest @SpringBootApplication publicclassLogSenderTest{ @Autowired Sendersender; @Test publicvoidsendLog(){ sender.send(); } }
运行日志如下:
可以看到,error收到了2次,说明exchange同时分发给了log.all和log.error队列,其他级别的日志分发给了log.all队列。
服务器管理-启动和停止节点RabbitMQ是用Erlang编写的,Erlang天生就能让应用程序无需知道对方是否在同一台机器上即可相互通信,这让集群和可靠的消息路由变得简单。
理解节点和Erlang应用程序和Java有JVM虚拟机类似,Erlang也有虚拟机,虚拟机的每个实例称之为「节点」,不同的是,多个Erlang应用程序可以运行在同一个节点之上,如果应用程序崩溃了,Erlang节点会自动尝试自动重启应用程序。
节点的操作:
后台启动节点:./rabbitmq-server-detached 停止节点:./rabbitmqctlstop 仅停止rabbit应用程序:./rabbitmqctlstop_app 配置文件配置文件的格式本质上是原始的Erlang数据结构,是一个包含了嵌套哈希表的数组,如下:
[ [mnesia,[{dump_log_write_threshold,1000}]], [rabbit,[{vm_memory_high_wateremark,0.4}]] ]
上面配置了2个应用,每个应用会有自己的哈希表来配置选项:
mnesia:是rabbitmq用来存储交换器和队列元数据的; rabbit:是rabbitmq特定的配置选项;每个应用如果有多个选项,用逗号隔开。
权限配置RabbitMQ权限系统中,单个用户可以跨越多个vhost进行授权,而且可以对读、写、配置分别授权。
首先创建一个用户dongqingqing,密码为123456:
./rabbitmqctladd_userdongqingqing123456
授予dongqingqing用户权限,可以读取所有队列和交换器,只可写log.*格式的队列和交换器,无法创建或删除队列和交换器
./rabbitmqctlset_permissionsdongqingqing".*""log.*"""
set_permissions后面的参数分别为用户名、读权限、写权限、配置权限。
其他详细用法可查看文档。
使用统计 查看数据统计可通过rabbitmqctl命令查看数据统计信息,比如队列和消息数目、交换器和绑定等。
查看所有队列,包含上面demo定义的log.all和log.error:
查看所有交换器,包含上面demo定义的log
另外,rabbitmq提供了管理界面插件,更方便的查看各种统计,可以通过下面的命令开启:
sudo./rabbitmq-pluginsenablerabbitmq_management
查看日志可以在文件系统中查看日志,启动rabbitmq后,会显示日志的路径:
另外,可以通过AMQP获取实时日志信息,有一个amq.rabbitmq.log的topic交换器,监听对应的队列即可。
下一篇将介绍消息通信模式和实践,感谢大家持续关注。
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流