redis queuego-zeroqueuequeuemq
应用
mq
producerconsumermq
queue
创立 queue
// 生产者创立工厂
producer := newMockedProducer()
// 消费者创立工厂
consumer := newMockedConsumer()
// 将生产者以及消费者的创立工厂函数传递给 NewQueue()
q := queue.NewQueue(func() (Producer, error) {
return producer, nil
}, func() (Consumer, error) {
return consumer, nil
})
NewQueue
producer constructorconsumer constructor
queue
mq
type (
// 开发者须要实现此接口
Producer interface {
AddListener(listener ProduceListener)
Produce() (string, bool)
}
...
// ProducerFactory定义了生成Producer的办法
ProducerFactory func() (Producer, error)
)
mqqueue
生产msg
生产音讯当然要回到生产者自身:
type mockedProducer struct {
total int32
count int32
// 应用waitgroup来模仿工作的实现
wait sync.WaitGroup
}
// 实现 Producer interface 的办法:Produce()
func (p *mockedProducer) Produce() (string, bool) {
if atomic.AddInt32(&p.count, 1) <= p.total {
p.wait.Done()
return "item", true
}
time.Sleep(time.Second)
return "", false
}
queue
Produce()AddListener()
生产msg
和生产者相似:
type mockedConsumer struct {
count int32
}
func (c *mockedConsumer) Consume(string) error {
atomic.AddInt32(&c.count, 1)
return nil
}
启动 queue
启动,而后验证咱们上述的生产者和消费者之间的数据是否传输胜利:
func TestQueue(t *testing.T) {
producer := newMockedProducer(rounds)
consumer := newMockedConsumer()
// 创立 queue
q := NewQueue(func() (Producer, error) {
return producer, nil
}, func() (Consumer, error) {
return consumer, nil
})
// 当生产者生产结束,执行 Stop() 敞开生产端生产
go func() {
producer.wait.Wait()
// mq生产端进行生产,不是mq自身 Stop 运行
q.Stop()
}()
// 启动
q.Start()
// 验证生产生产端是否音讯生产实现
assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
}
queue
整体设计
整体流程如上图:
channellistenereventproduceoneProduce()interfaceConsume()
channel
总结
queuechannel
go-zero
我的项目地址
https://github.com/tal-tech/go-zero
欢送应用 go-zero 并 star 反对咱们!
微信交换群
关注『微服务实际』公众号并回复 进群 获取社区群二维码。
go-zero 系列文章见『微服务实际』公众号