redis queuego-zeroqueuequeuemq

应用

mq
producerconsumermq
queue

创立 queue

// 生产者创立工厂
producer := newMockedProducer()
// 消费者创立工厂
consumer := newMockedConsumer()
// 将生产者以及消费者的创立工厂函数传递给 NewQueue()
q := queue.NewQueue(func() (Producer, error) {
  return producer, nil
}, func() (Consumer, error) {
  return consumer, nil
})
NewQueue
producer constructorconsumer constructor
queue
mq
type (
    // 开发者须要实现此接口
    Producer interface {
        AddListener(listener ProduceListener)
        Produce() (string, bool)
    }
    ...
    // ProducerFactory定义了生成Producer的办法
    ProducerFactory func() (Producer, error)
)
mqqueue

生产msg

生产音讯当然要回到生产者自身:

type mockedProducer struct {
    total int32
    count int32
  // 应用waitgroup来模仿工作的实现
    wait  sync.WaitGroup
}
// 实现 Producer interface 的办法:Produce()
func (p *mockedProducer) Produce() (string, bool) {
    if atomic.AddInt32(&p.count, 1) <= p.total {
        p.wait.Done()
        return "item", true
    }
    time.Sleep(time.Second)
    return "", false
}
queue
Produce()AddListener()

生产msg

和生产者相似:

type mockedConsumer struct {
    count  int32
}

func (c *mockedConsumer) Consume(string) error {
    atomic.AddInt32(&c.count, 1)
    return nil
}

启动 queue

启动,而后验证咱们上述的生产者和消费者之间的数据是否传输胜利:

func TestQueue(t *testing.T) {
    producer := newMockedProducer(rounds)
    consumer := newMockedConsumer()
    // 创立 queue
    q := NewQueue(func() (Producer, error) {
        return producer, nil
    }, func() (Consumer, error) {
        return consumer, nil
    })
    // 当生产者生产结束,执行 Stop() 敞开生产端生产
    go func() {
        producer.wait.Wait()
    // mq生产端进行生产,不是mq自身 Stop 运行
        q.Stop()
    }()
    // 启动
    q.Start()
    // 验证生产生产端是否音讯生产实现
    assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
}
queue

整体设计

整体流程如上图:

channellistenereventproduceoneProduce()interfaceConsume()
channel

总结

queuechannel
go-zero

我的项目地址

https://github.com/tal-tech/go-zero

欢送应用 go-zero 并 star 反对咱们!

微信交换群

关注『微服务实际』公众号并回复 进群 获取社区群二维码。

go-zero 系列文章见『微服务实际』公众号