相比于pika库,kombu考虑的更全面,如它支持重连策略、支持connection pool和producer pool、故障转移策略等。
查看kombu版本:
如果没有安装kombu的需要安装:pip install kombu
官方文档:
1、连接
有多种传输方式可供选择(amqp、librabbitmq、redis、qpid、内存中等),甚至可以创建自己的传输方式。默认传输是 amqp。
使用默认传输创建连接:
from kombu import Connection
connection = Connection('amqp://guest:guest@localhost:5672//')
连接还不会建立,因为连接是在需要的时候建立的。如果要显式建立连接,则必须调用该connect() 方法:
connection.connect()
检查连接是否已连接:
connection.connected
使用后必须始终关闭连接:
connection.close()
但最好的做法是释放连接,如果连接与连接池相关联,这将释放资源,否则关闭连接,并使以后更容易过渡到连接池:
connection.release()
当然,连接可以用作上下文,我们鼓励您这样做,因为这样更难忘记释放开放资源:
with Connection() as connection:
# work with connection
连接url:
连接参数可以作为 URL 提供,格式如下:
transport://userid:password@hostname:port/virtual_host
没有选项的连接将使用默认连接设置,即使用本地主机、默认端口、用户名guest、密码guest和虚拟主机“/”。不带参数的连接等同于:
Connection('amqp://guest:guest@localhost:5672//')
2、连接池
Kombu 附带两个全局池:一个连接池和一个生产者池。
2.1 创建连接池
通过kombu.pools.connections可以得到一个连接池,传入一个连接实例,kombu.pools.connections会返回一个连接池实例。如果创建连接实例所传入的参数是一样的,如Connection(‘redis://localhost:6379’),则会返回相同的l连接池。
示例代码:
from kombu import Connection
from kombu.pools import connections
conn1 = connections[Connection('redis://192.168.124.49:6379')]
print(conn1)
conn2 = connections[Connection('redis://192.168.124.49:6379')]
print(conn2)
运行结果:
2.2 从连接池获取和释放连接
示例代码:
from kombu import Connection
from kombu.pools import connections
Conn = Connection('redis://192.168.124.49:6379')
print(Conn)
"""
block=True意味着如果连接池中的连接都被占用,则会阻塞。注意如果你的代码中没有正确地释放
连接池的连接,则会造成死锁;为了防止该事情发生,可以设置timeout 参数,具体查看
kombu.connection.Resource.acquire()的使用
"""
with connections[Conn].acquire(block=True) as conn:
print("Got connection:{0!r}".format(conn.as_uri()))
运行结果:
如果需要同时连接到多个代理:
from kombu import Connection
from kombu.pools import connections
c1 = Connection('amqp://')
c2 = Connection('redis://')
with connections[c1].acquire(block=True) as conn1:
with connections[c2].acquire(block=True) as conn2:
# ....
3、生产者
可以通过Connection的实例来创建生产者对象。
from kombu import Connection
from kombu.pools import connections
conn = connections[Connection('redis://192.168.124.49:6379')]
print(conn)
producer = Connection.Producer
print(producer)
运行结果:
也可以通过Producer来实例化一个生产者对象,但需要传入一个channel或connection对象:
from kombu import Connection, Producer
with Connection("redis://192.168.124.49:6379") as conn:
with conn.channel() as channel:
producer = Producer(channel)
print(producer)
运行结果:
获得生产者对象后,就可以发布消息到连接的RabbitMQ队列了:
from kombu import Connection, Producer
from kombu import Exchange, Queue
with Connection("amqp://192.168.124.104") as conn:
with conn.channel() as channel:
producer = Producer(channel)
print(producer)
# 实例一个队列对象,默认durable=True
task_queue = Queue("tasks", Exchange("tasks"), routing_key="tasks")
body = {"name": "dgw"}
producer.publish(
body=body, # message to send
exchange=task_queue.exchange, # destination exchange
routing_key=task_queue.routing_key, # destination routing key,
# declares exchange, queue and binds before sending message,make sure exchange is declared
declare=[task_queue],
retry=True,
retry_policy={ # 重试策略
'interval_start': 0, # First retry immediately,
'interval_step': 2, # then increase by 2s for every retry.
'interval_max': 30, # but don't exceed 30s between retries.
'max_retries': 30, # give up after 30 tries.
},
)
注意:实例对象是需要在with里面,否则连接失败!
declare参数允许您传递在发送消息之前必须声明的实体列表。这在使用重试标志时尤其重要,因为代理实际上可能会在重试期间重新启动,在这种情况下,非持久实体将被删除。
可以直接传递到队列,绕过代理路由机制,使用” anon-exchange “:将exchange参数设置为空字符串,并将路由键设置为队列的名称
producer.publish(
{'hello': 'world'},
exchange='',
routing_key=task_queue.name,
)
当传递一个非字符串对象来发布时,Json是默认的序列化器,但你也可以指定一个不同的序列化器:
producer.publish({'hello': 'world'}, serializer='pickle')
生产者池:
from kombu import Connection, Exchange
from kombu.pools import producers
# The exchange we send our news articles to.
news_exchange = Exchange('news')
# The article we want to send
article = {'title': 'No cellular coverage on the tube for 2012',
'ingress': 'yadda yadda yadda'}
# The broker where our exchange is.
connection = Connection('amqp://guest:guest@localhost:5672//')
with producers[connection].acquire(block=True) as producer:
producer.publish(
article,
exchange=news_exchange,
routing_key='domestic',
declare=[news_exchange],
serializer='json',
compression='zlib')
4、消费者
通过Consumer来接收消息,需要传入connection实例对象,需要接收消息的队列(或列表),处理收到消息的回调函数列表,接收消息的格式。
可以使用连接创建消费者。该消费者正在从名称为“queue”的单个队列中消费:
queue = Queue('queue', routing_key='queue')
consumer = connection.Consumer(queue)
也可以直接实例化 Consumer,它将通道或连接作为参数。此消费者还使用名称为“queue”的单个队列进行消费:
queue = Queue('queue', routing_key='queue')
with Connection('amqp://192.168.124.104') as conn:
with conn.channel() as channel:
consumer = Consumer(channel, queue)
消费者需要为接收到的数据指定一个处理程序。此处理程序以回调的形式指定。每次收到新消息时,kombu 都会调用回调函数。使用两个参数调用回调:body包含生产者发送的反序列化数据和Message实例message。设置手动确认时,用户负责确认消息。
def callback(body, message):
print(body)
message.ack()
consumer.register_callback(callback)
从单个消费者中的事件:
drain_events
with consumer:
connection.drain_events(timeout=1)
来自多个消费者的情况事件:
每个消费者都有自己的队列列表。每个消费者都接受“json”格式的数据:
from kombu.utils.compat import nested
queues1 = [Queue('queue11', routing_key='queue11'),
Queue('queue12', routing_key='queue12')]
queues2 = [Queue('queue21', routing_key='queue21'),
Queue('queue22', routing_key='queue22')]
with connection.channel(), connection.channel() as (channel1, channel2):
with nested(Consumer(channel1, queues1, accept=['json']),
Consumer(channel2, queues2, accept=['json'])):
connection.drain_events(timeout=1)
单个消费者的完整示例代码如下:
from kombu import Queue, Exchange, Connection, Consumer
conn = Connection("amqp://192.168.124.104")
queue = Queue("tasks", Exchange("tasks"), routing_key="tasks")
# 必须接收body和message两个参数
def process_message(body, message):
print(body)
print("调用grpc服务")
# 返回acknowledge,消费者已经处理了该消息
message.ack()
with Consumer(conn, queues=queue, callbacks=[process_message], accept=["json"]):
while True:
print("Start receiving tasks:")
conn.drain_events()
运行结果:
多个消费者1的完整示例代码如下:
from kombu import Connection, Consumer, Queue
def callback(body, message):
print('RECEIVED MESSAGE: {0!r}'.format(body))
message.ack()
queue1 = Queue('queue1', routing_key='queue1')
queue2 = Queue('queue2', routing_key='queue2')
with Connection('amqp://192.168.124.104') as conn:
with conn.channel() as channel:
consumer = Consumer(conn, [queue1, queue2], accept=['json'])
consumer.register_callback(callback)
with consumer:
while True:
conn.drain_events()
运行结果:
多个消费者2的完整示例代码如下:
from kombu import Connection, Consumer, Queue
from kombu.utils.compat import nested
def callback(body, message):
print('RECEIVED MESSAGE: {0!r}'.format(body))
message.ack()
queues1 = [Queue('queue11', routing_key='queue11'), Queue('queue12', routing_key='queue12')]
queues2 = [Queue('queue21', routing_key='queue21'), Queue('queue22', routing_key='queue22')]
with Connection("amqp://192.168.124.104") as conn:
with conn.channel() as channel:
with nested(Consumer(channel, queues1, callbacks=[callback], accept=['json']),
Consumer(channel, queues2, callbacks=[callback], accept=['json'])):
while True:
conn.drain_events()
运行结果:
Kombu 在模块中提供预定义的 mixin 类mixins。它包含两个类: ConsumerMixin用于创建消费者和ConsumerProducerMixin 用于创建还支持发布消息的消费者。可以通过子类化 mixin 类并覆盖一些方法来创建消费者:
from kombu import Queue, Exchange, Connection
from kombu.mixins import ConsumerMixin
conn = Connection("amqp://192.168.124.104")
queue = Queue("tasks", Exchange("tasks"), routing_key="tasks")
class SelfConsumer(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [
Consumer(queue, callbacks=[self.on_message], accept=['json']),
]
def on_message(self, body, message):
print('RECEIVED MESSAGE: {0!r}'.format(body))
message.ack()
SelfConsumer(conn).run()
运行结果:
使用多通道:
from kombu import Queue, Exchange, Connection, Consumer
from kombu.mixins import ConsumerMixin
conn = Connection("amqp://192.168.124.104")
queue = Queue("tasks", Exchange("tasks"), routing_key="tasks")
queues1 = [Queue('queue11', routing_key='queue11'), Queue('queue12', routing_key='queue12')]
queues2 = [Queue('queue21', routing_key='queue21'), Queue('queue22', routing_key='queue22')]
class SelfConsumer(ConsumerMixin):
channel2 = None
def __init__(self, connection):
self.connection = connection
def get_consumers(self, _, default_channel):
self.channel2 = default_channel.connection.channel()
return [
Consumer(default_channel, queues1, callbacks=[self.on_message], accept=['json']),
Consumer(self.channel2, queues2, callbacks=[self.on_message2], accept=['json'])
]
def on_message(self, body, message):
print('queues1 RECEIVED MESSAGE: {0!r}'.format(body))
message.ack()
def on_message2(self, body, message):
print('queues2 RECEIVED MESSAGE: {0!r}'.format(body))
message.ack()
def on_consume_end(self, connection, default_channel):
if self.channel2:
self.channel2.close()
SelfConsumer(conn).run()
运行结果:
accept
Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml'])
消费者优先级:
RabbitMQ 定义了对 amqp 协议的消费者优先级扩展,可以通过将x-priority参数设置为 basic.consume。RabbitMQ Consumer Priorities:Consumer Priorities — RabbitMQ
在 kombu 中,您可以在 上指定此参数Queue,如下所示:
queue = Queue('name', Exchange('exchange_name', type='direct'),
consumer_arguments={'x-priority': 10})
参考博文: