扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
最近由于业务需要进行性能升级,将原来需要经过http进行数据交互的方式修改为消息队列的形式。于是原来的同步处理的方式变成了异步处理,在一定程度上提升我们系统的性能,不过debug的时候,不免哭了出来。因为每个环节都需要进行详细检查。
对于RabbitMQ,我们知道,其是AMQP的一种代理服服务器,具有一套严格的通信方式,即在核心产品进行通信的各个方面几乎都采用了RPC(Remote Procedure Call, 远程过程调用)
模式。
RabbitMQ通信时用到的类和方法与AMQP协议层面的类和方法一一对应。因此AMQP本质上是RPC的一种传输机制
AMQ(Advanced Message Queuing)
模型,这个模型是针对代理服务器软件例如(RabbitMQ)设计的,该模型在逻辑上定义了三种抽象组件用于指定消息的路由行为,分别是:
Exchange
,消息代理服务器中用于把消息路由到队列的组件Queue
,用来存储消息的数据结构,位于硬盘或内存中,以FIFO的顺序进行投递Binding
,一套规则,用于告诉交换器消息应该被存储到哪个队列
在将消息发布到队列之前,我们需要经历过以下若干个步骤。至少,必须要设置交换器和队列,然后将他们绑定再一起。接下来我们将通过python来实现AMQP机制。
我用到了pika这个库,需要的话,需要通过以下指令安装。该库实现了绝大部分rabbitmq的api以及提供了相关的调优参数,后续有机会不妨可以详谈。
pip install pika
交换器在AMQ模型中是非常重要的角色存在。因此,在AMQP规范中都有自己的类。声明一个交换器,我们可以直接在控制台界面进行创建。
不过这样仅仅是在极少数的情况下才适合,动手调戏鼠标对开发工程师的来说实在是太蠢啦,能玩键盘就别玩鼠标啊,我们不妨通过以下代码来声明(创建)一个交换器。pika内置函数会事先通过get的方式来检查我们待声明的交换器是否存在,如果存在则不创建,否则创建一个新的交换器。
self.channel.exchange_declare(
exchange=exchange,
exchange_type="direct",
passive=False,
durable=True,
auto_delete=False)
一旦交换器创建成功,就可以通过发送类似queue.declare命令让rabbitmq创建一个队列。同样的,我们仍然可以在图形化界面里面创建队列。
还是那句话,动手调戏鼠标对开发工程师的来说实在是太蠢啦,能玩键盘就别玩鼠标啊,我们不妨通过以下代码来声明(创建)若干个队列。pika内置函数会事先通过get的方式来检查我们待声明的队列是否存在,如果存在则不创建,否则创建一个新的队列。
self.channel.queue_declare(queue=queue, durable=True)
当队列同名时,即如果我们多次发送同一个queue.declare命令并不会有任何副作用,因为RabbitMQ并不会处理后续的队列声明,究其原因,每次创建都会先通过get的方式调用消息队列引擎查询队列是否存在。如果需要返回队列相关的有用信息,则将会返回队列中待处理消息的数量以及该队列的消费者数量。当然了如果队列同名,而且新队列的属性与原有的队列不一样,那么RabbitMQ将关闭发出的RPC请求的信道,返回403错误
一旦创建了交换器和队列,之后就可以将它们绑定在一起了,如同queue.declare命令,将队列绑定到交换器Queue.Bind每次只能指定一个队列。我们既可以通过图形化界面进行绑定,也可以通过代码实现这个效果
self.channel.queue_bind(
queue=queue, exchange=exchange, routing_key=rk)
发布消息到RabbitMQ时,多个帧封装了发送到服务器的消息数据。在实际的消息内容到达rabbitMQ之前,客户端应用程序会发送一个basic.publish方法帧、一个内容头帧和至少一个消息体帧。
默认情况下,只要没有消费者正在监听队列,消息就会被存储在队列中。当添加更多消息时,队列大小也会随之增加。RabbitMQ可以将这些消息保存在内存或者写入磁盘。
def produce(self, body):
self.channel.basic_publish(exchange=self.exchange, routing_key=self.route_key, body=body,
properties=pika.BasicProperties(content_type='text/plain', delivery_mode=1)
)
一旦发布消息被路由并且保存在一个或者多个队列中,剩下的就是如何对其进行消费。注意到,发送和消费是异步的。 消费时,可以让RabbitMQ知道如何消费他们
Basic.Consume命令中
no_ack为true时,RabbitMQ将连续发送消息直到消费者发送一个Basic.Cancel命令或者断开连接为止
如果为false,则需要发送一个Basic.Ack来确认收到每条消息的请求
def on_message(chan, method_frame, _header_frame, body, userdata=None):
"""Called when a message is received. Log message and ack it."""
# LOGGER.info('Userdata: %s Message body: %s', userdata, body)
# print(" [x] Received %r" % body.decode())
data = body.decode()
result = alarmFun(data)
publish = Publish(exchange='spider', queue='alarm', rk='rk-alarm')
publish.produce(result)
# chan.basic_ack(delivery_tag=method_frame.delivery_tag)
on_message_callback = functools.partial(on_message)
self.channel.basic_consume(on_message_callback=on_message_callback,
queue=self.queue,
auto_ack=True
)
经过前面的描述,我们需要理论联系实践,让我们通过python开发消费者角色和发布者角色。
按照配置流程,我们需要初始化连接、配置交换器、队列、绑定,然后才能通过连接件信息推送(publish)到队列中。
import logging
from random import randint
import pika
BROKER_USER = os.environ.get('BROKER_USER', 'guest')
BROKER_PASSWD = os.environ.get('BROKER_PASSWD', 'guest')
BROKER_IP = os.environ.get('BROKER_IP', '127.0.0.1')
BROKER_PORT = os.environ.get('BROKER_PORT', '5672')
BROKER_VHOST = os.environ.get('BROKER_VHOST', 'my_vhost')
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
BROKER_URL = 'amqp://{}:{}@{}:{}/{}'.format(BROKER_USER, BROKER_PASSWD, BROKER_IP, BROKER_PORT, BROKER_VHOST)
# logging.basicConfig(level=logging.DEBUG)
# LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
# '-35s %(lineno) -5d: %(message)s')
# LOGGER = logging.getLogger(__name__)
class Publish(object):
def __init__(self, exchange, queue, rk):
# LOGGER.info('Connecting to %s', BROKER_URL)
# logging.basicConfig(level=logging.DEBUG)
self.credentials = pika.PlainCredentials(BROKER_USER, BROKER_PASSWD)
# 通过这个方式设置备用链路,保证connection稳定性
self.parameters = (
pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials),
pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials, connection_attempts=5,
retry_delay=1))
self.connection = pika.BlockingConnection(self.parameters)
self.channel = self.connection.channel()
self.exchange = exchange
self.channel.exchange_declare(
exchange=exchange,
exchange_type="direct",
passive=False,
durable=True,
auto_delete=False)
self.channel.queue_declare(queue=queue, durable=True)
self.route_key = rk
def produce(self, body):
self.channel.basic_publish(exchange=self.exchange, routing_key=self.route_key, body=body,
properties=pika.BasicProperties(content_type='text/plain', delivery_mode=1)
)
def close(self):
self.connection.close()
def test():
publish = Publish(exchange='test_yerik', queue='test_test', rk='rk-test_test')
for i in range(1, 10000):
publish.produce(randint(1, 100).__str__())
publish.close()
if __name__ == '__main__':
test()
消费者的设计和生产者在初始化的时候设计大致相同,都是通过建立连接、开启channel、exange、queue、bind等过程,主要的区别在于commsum
import functools
import logging
import pika
BROKER_USER = os.environ.get('BROKER_USER', 'guest')
BROKER_PASSWD = os.environ.get('BROKER_PASSWD', 'guest')
BROKER_IP = os.environ.get('BROKER_IP', '127.0.0.1')
BROKER_PORT = os.environ.get('BROKER_PORT', '5672')
BROKER_VHOST = os.environ.get('BROKER_VHOST', 'my_vhost')
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
BROKER_URL = 'amqp://{}:{}@{}:{}/{}'.format(BROKER_USER, BROKER_PASSWD, BROKER_IP, BROKER_PORT, BROKER_VHOST)
# print('pika version: %s' % pika.__version__)
# logging.basicConfig(level=logging.DEBUG)
# LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
# '-35s %(lineno) -5d: %(message)s')
# LOGGER = logging.getLogger(__name__)
from apps.alarm.alarmfun import alarmFun
from apps.utils.rabbitmq.publish import Publish
class Consummer(object):
def __init__(self, exchange, queue, rk):
# LOGGER.info('Connecting to %s', BROKER_URL)
self.credentials = pika.PlainCredentials(BROKER_USER, BROKER_PASSWD)
self.parameters = (
pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials),
pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials, connection_attempts=5,
retry_delay=1))
self.connection = pika.BlockingConnection(self.parameters)
self.channel = self.connection.channel()
self.exchange = exchange
self.channel.basic_qos(prefetch_count=1)
self.exchange = exchange
self.queue = queue
self.channel.exchange_declare(
exchange=exchange,
exchange_type="direct",
passive=False,
durable=True,
auto_delete=False)
self.channel.queue_declare(queue=queue, durable=True)
self.channel.queue_bind(
queue=queue, exchange=exchange, routing_key=rk)
self.channel.basic_qos(prefetch_count=1)
def consum_message(self):
# LOGGER.info('Comsummer by {}'.format(name))
def on_message(chan, method_frame, _header_frame, body, userdata=None):
"""Called when a message is received. Log message and ack it."""
# LOGGER.info('Userdata: %s Message body: %s', userdata, body)
# print(" [x] Received %r" % body.decode())
data = body.decode()
result = alarmFun(data)
publish = Publish(exchange='spider', queue='alarm', rk='rk-alarm')
publish.produce(result)
# chan.basic_ack(delivery_tag=method_frame.delivery_tag)
on_message_callback = functools.partial(on_message)
self.channel.basic_consume(on_message_callback=on_message_callback,
queue=self.queue,
auto_ack=True
)
try:
self.channel.start_consuming()
except KeyboardInterrupt:
self.channel.stop_consuming()
def cancel(self):
self.connection.close()
def test():
consummer = Consummer('test_yerik', 'test_test', 'rk-test_test')
consummer.consum_message()
print(consummer.receive)
if __name__ == '__main__':
test()
参考文档:
- 深入RabbitMQ, Gavin M.Roy 著 汪佳南 郑天民 译
另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流