相比于pika库,kombu考虑的更全面,如它支持重连策略、支持connection pool和producer pool、故障转移策略等。

查看kombu版本:

Kombu库用法详解(连接、连接池、生产者、消费者)

如果没有安装kombu的需要安装:pip install kombu

官方文档:

1、连接

        有多种传输方式可供选择(amqp、librabbitmq、redis、qpid、内存中等),甚至可以创建自己的传输方式。默认传输是 amqp。

使用默认传输创建连接:

from kombu import Connection
connection = Connection('amqp://guest:guest@localhost:5672//')

        连接还不会建立,因为连接是在需要的时候建立的。如果要显式建立连接,则必须调用该connect() 方法:

connection.connect()

检查连接是否已连接:

connection.connected

使用后必须始终关闭连接:

connection.close()

但最好的做法是释放连接,如果连接与连接池相关联,这将释放资源,否则关闭连接,并使以后更容易过渡到连接池:

connection.release()

当然,连接可以用作上下文,我们鼓励您这样做,因为这样更难忘记释放开放资源:

with Connection() as connection:
    # work with connection

连接url:

连接参数可以作为 URL 提供,格式如下:

transport://userid:password@hostname:port/virtual_host

没有选项的连接将使用默认连接设置,即使用本地主机、默认端口、用户名guest、密码guest和虚拟主机“/”。不带参数的连接等同于:

Connection('amqp://guest:guest@localhost:5672//')

2、连接池

Kombu 附带两个全局池:一个连接池和一个生产者池。

2.1 创建连接池

        通过kombu.pools.connections可以得到一个连接池,传入一个连接实例,kombu.pools.connections会返回一个连接池实例。如果创建连接实例所传入的参数是一样的,如Connection(‘redis://localhost:6379’),则会返回相同的l连接池。

示例代码:

from kombu import Connection
from kombu.pools import connections

conn1 = connections[Connection('redis://192.168.124.49:6379')]
print(conn1)
conn2 = connections[Connection('redis://192.168.124.49:6379')]
print(conn2)

运行结果:

Kombu库用法详解(连接、连接池、生产者、消费者)

2.2 从连接池获取和释放连接

示例代码:

from kombu import Connection
from kombu.pools import connections

Conn = Connection('redis://192.168.124.49:6379')
print(Conn)

"""
block=True意味着如果连接池中的连接都被占用,则会阻塞。注意如果你的代码中没有正确地释放
连接池的连接,则会造成死锁;为了防止该事情发生,可以设置timeout 参数,具体查看
kombu.connection.Resource.acquire()的使用
"""
with connections[Conn].acquire(block=True) as conn:
    print("Got connection:{0!r}".format(conn.as_uri()))

运行结果:

Kombu库用法详解(连接、连接池、生产者、消费者)

如果需要同时连接到多个代理:

from kombu import Connection
from kombu.pools import connections

c1 = Connection('amqp://')
c2 = Connection('redis://')

with connections[c1].acquire(block=True) as conn1:
    with connections[c2].acquire(block=True) as conn2:
        # ....

3、生产者

        可以通过Connection的实例来创建生产者对象。

from kombu import Connection
from kombu.pools import connections

conn = connections[Connection('redis://192.168.124.49:6379')]
print(conn)

producer = Connection.Producer
print(producer)

运行结果: 

Kombu库用法详解(连接、连接池、生产者、消费者)

也可以通过Producer来实例化一个生产者对象,但需要传入一个channel或connection对象:

from kombu import Connection, Producer

with Connection("redis://192.168.124.49:6379") as conn:
    with conn.channel() as channel:
        producer = Producer(channel)

        print(producer)

运行结果:

Kombu库用法详解(连接、连接池、生产者、消费者)

获得生产者对象后,就可以发布消息到连接的RabbitMQ队列了:

from kombu import Connection, Producer
from kombu import Exchange, Queue

with Connection("amqp://192.168.124.104") as conn:
    with conn.channel() as channel:
        producer = Producer(channel)
        print(producer)

        # 实例一个队列对象,默认durable=True
        task_queue = Queue("tasks", Exchange("tasks"), routing_key="tasks")
        body = {"name": "dgw"}
        producer.publish(
            body=body,  # message to send
            exchange=task_queue.exchange,  # destination exchange
            routing_key=task_queue.routing_key,  # destination routing key,
            # declares exchange, queue and binds before sending message,make sure exchange is declared
            declare=[task_queue],
            retry=True,
            retry_policy={  # 重试策略
                'interval_start': 0,  # First retry immediately,
                'interval_step': 2,  # then increase by 2s for every retry.
                'interval_max': 30,  # but don't exceed 30s between retries.
                'max_retries': 30,  # give up after 30 tries.
            },
        )

注意:实例对象是需要在with里面,否则连接失败!

Kombu库用法详解(连接、连接池、生产者、消费者)

        declare参数允许您传递在发送消息之前必须声明的实体列表。这在使用重试标志时尤其重要,因为代理实际上可能会在重试期间重新启动,在这种情况下,非持久实体将被删除。 

        可以直接传递到队列,绕过代理路由机制,使用” anon-exchange “:将exchange参数设置为空字符串,并将路由键设置为队列的名称

producer.publish(
    {'hello': 'world'},
    exchange='',
    routing_key=task_queue.name,
)

        当传递一个非字符串对象来发布时,Json是默认的序列化器,但你也可以指定一个不同的序列化器:

producer.publish({'hello': 'world'}, serializer='pickle')

生产者池:

from kombu import Connection, Exchange
from kombu.pools import producers

# The exchange we send our news articles to.
news_exchange = Exchange('news')

# The article we want to send
article = {'title': 'No cellular coverage on the tube for 2012',
           'ingress': 'yadda yadda yadda'}

# The broker where our exchange is.
connection = Connection('amqp://guest:guest@localhost:5672//')

with producers[connection].acquire(block=True) as producer:
    producer.publish(
        article,
        exchange=news_exchange,
        routing_key='domestic',
        declare=[news_exchange],
        serializer='json',
        compression='zlib')

4、消费者

        通过Consumer来接收消息,需要传入connection实例对象,需要接收消息的队列(或列表),处理收到消息的回调函数列表,接收消息的格式。

        可以使用连接创建消费者。该消费者正在从名称为“queue”的单个队列中消费:

queue = Queue('queue', routing_key='queue')
consumer = connection.Consumer(queue)

        也可以直接实例化 Consumer,它将通道或连接作为参数。此消费者还使用名称为“queue”的单个队列进行消费:

queue = Queue('queue', routing_key='queue')
with Connection('amqp://192.168.124.104') as conn:
    with conn.channel() as channel:
        consumer = Consumer(channel, queue)

        消费者需要为接收到的数据指定一个处理程序。此处理程序以回调的形式指定。每次收到新消息时,kombu 都会调用回调函数。使用两个参数调用回调:body包含生产者发送的反序列化数据和Message实例message。设置手动确认时,用户负责确认消息。

def callback(body, message):
    print(body)
    message.ack()

consumer.register_callback(callback)

从单个消费者中的事件:

drain_events
with consumer:
    connection.drain_events(timeout=1)

来自多个消费者的情况事件:

每个消费者都有自己的队列列表。每个消费者都接受“json”格式的数据:

from kombu.utils.compat import nested

queues1 = [Queue('queue11', routing_key='queue11'),
               Queue('queue12', routing_key='queue12')]
queues2 = [Queue('queue21', routing_key='queue21'),
               Queue('queue22', routing_key='queue22')]
with connection.channel(), connection.channel() as (channel1, channel2):
    with nested(Consumer(channel1, queues1, accept=['json']),
                Consumer(channel2, queues2, accept=['json'])):
        connection.drain_events(timeout=1)

单个消费者的完整示例代码如下:

from kombu import Queue, Exchange, Connection, Consumer

conn = Connection("amqp://192.168.124.104")
queue = Queue("tasks", Exchange("tasks"), routing_key="tasks")


# 必须接收body和message两个参数
def process_message(body, message):
    print(body)
    print("调用grpc服务")
    # 返回acknowledge,消费者已经处理了该消息
    message.ack()


with Consumer(conn, queues=queue, callbacks=[process_message], accept=["json"]):
    while True:
        print("Start receiving tasks:")
        conn.drain_events()

运行结果:

Kombu库用法详解(连接、连接池、生产者、消费者)

Kombu库用法详解(连接、连接池、生产者、消费者)

Kombu库用法详解(连接、连接池、生产者、消费者)

多个消费者1的完整示例代码如下:

from kombu import Connection, Consumer, Queue


def callback(body, message):
    print('RECEIVED MESSAGE: {0!r}'.format(body))
    message.ack()


queue1 = Queue('queue1', routing_key='queue1')
queue2 = Queue('queue2', routing_key='queue2')


with Connection('amqp://192.168.124.104') as conn:
    with conn.channel() as channel:
        consumer = Consumer(conn, [queue1, queue2], accept=['json'])
        consumer.register_callback(callback)
        with consumer:
            while True:
                conn.drain_events()

运行结果:

Kombu库用法详解(连接、连接池、生产者、消费者)

多个消费者2的完整示例代码如下:

from kombu import Connection, Consumer, Queue
from kombu.utils.compat import nested


def callback(body, message):
    print('RECEIVED MESSAGE: {0!r}'.format(body))
    message.ack()


queues1 = [Queue('queue11', routing_key='queue11'), Queue('queue12', routing_key='queue12')]
queues2 = [Queue('queue21', routing_key='queue21'), Queue('queue22', routing_key='queue22')]

with Connection("amqp://192.168.124.104") as conn:
    with conn.channel() as channel:
        with nested(Consumer(channel, queues1, callbacks=[callback], accept=['json']),
                    Consumer(channel, queues2, callbacks=[callback], accept=['json'])):
            while True:
                conn.drain_events()

运行结果:

Kombu库用法详解(连接、连接池、生产者、消费者)

        Kombu 在模块中提供预定义的 mixin 类mixins。它包含两个类: ConsumerMixin用于创建消费者和ConsumerProducerMixin 用于创建还支持发布消息的消费者。可以通过子类化 mixin 类并覆盖一些方法来创建消费者:

from kombu import Queue, Exchange, Connection
from kombu.mixins import ConsumerMixin

conn = Connection("amqp://192.168.124.104")
queue = Queue("tasks", Exchange("tasks"), routing_key="tasks")


class SelfConsumer(ConsumerMixin):
    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [
            Consumer(queue, callbacks=[self.on_message], accept=['json']),
        ]

    def on_message(self, body, message):
        print('RECEIVED MESSAGE: {0!r}'.format(body))
        message.ack()


SelfConsumer(conn).run()

运行结果:

Kombu库用法详解(连接、连接池、生产者、消费者)

使用多通道:

from kombu import Queue, Exchange, Connection, Consumer
from kombu.mixins import ConsumerMixin

conn = Connection("amqp://192.168.124.104")
queue = Queue("tasks", Exchange("tasks"), routing_key="tasks")

queues1 = [Queue('queue11', routing_key='queue11'), Queue('queue12', routing_key='queue12')]
queues2 = [Queue('queue21', routing_key='queue21'), Queue('queue22', routing_key='queue22')]


class SelfConsumer(ConsumerMixin):
    channel2 = None

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, _, default_channel):
        self.channel2 = default_channel.connection.channel()
        return [
            Consumer(default_channel, queues1, callbacks=[self.on_message], accept=['json']),
            Consumer(self.channel2, queues2, callbacks=[self.on_message2], accept=['json'])
        ]

    def on_message(self, body, message):
        print('queues1 RECEIVED MESSAGE: {0!r}'.format(body))
        message.ack()

    def on_message2(self, body, message):
        print('queues2 RECEIVED MESSAGE: {0!r}'.format(body))
        message.ack()

    def on_consume_end(self, connection, default_channel):
        if self.channel2:
            self.channel2.close()


SelfConsumer(conn).run()

运行结果:

Kombu库用法详解(连接、连接池、生产者、消费者)

accept
Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml'])

消费者优先级:

        RabbitMQ 定义了对 amqp 协议的消费者优先级扩展,可以通过将x-priority参数设置为 basic.consume。RabbitMQ Consumer Priorities:Consumer Priorities — RabbitMQ

在 kombu 中,您可以在 上指定此参数Queue,如下所示:

queue = Queue('name', Exchange('exchange_name', type='direct'),
              consumer_arguments={'x-priority': 10})

参考博文: