官方已经给出如何消费者的简要案例:
func main() {
c, err := rocketmq.NewPushConsumer(consumer.WithGroupName("ID-01"), // 消费者组
consumer.WithNameServer([]string{"192.168.132.101:9876"}), // nameserver
consumer.WithRetry(2))
if err != nil {
fmt.Println("消费者实例创建失败")
}
c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Println("Receive New messages : ", ext)
return consumer.ConsumeSuccess, nil // 回调函数
})
c.Start()
defer c.Shutdown()
time.Sleep(1000 * time.Second)
}
消费者组是消费者实例的集合,在官方文档中有这么一张图
在一个消费者组中,含有两个消费组实例都订阅了同一个Topic,Topic通过负载均衡将Topic中的队列和消费者实例对应起来,并且一个Topic的队列在同一个消费者组中对应一个消费者实例。
问题是,如何在Golang中创建同组的2个消费者实例呢? 你可能想使用goroutine开两个协程不就行了? 我们做以下实验,简称 同进程同组同Topic
func main() {
c, err := rocketmq.NewPushConsumer(consumer.WithGroupName("ID-01"),
consumer.WithNameServer([]string{"192.168.132.101:9876"}),
consumer.WithRetry(2))
if err != nil {
fmt.Println("消费者实例创建失败")
}
go ReceiveMsgTest(c, 0) // 第一个消费组实例
go ReceiveMsgTest(c, 1) // 第二个消费组实例
time.Sleep(1000 * time.Second)
}
func ReceiveMsgTest(c rocketmq.PushConsumer, i int) {
// 同Topic "test"
c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Println("Receive New messages : ", ext)
fmt.Printf("Thread %v Consumer ReceiveMsg\n", i)
return consumer.ConsumeSuccess, nil
})
c.Start()
defer c.Shutdown()
fmt.Printf("Thread %v Consumer Start\n", i)
time.Sleep(1000 * time.Second)
}
生产者代码省略。结果如下,
会发现只有协程0 在工作,第二个创建的协程并没有消费。
然后我们尝试不同进程,同组同Topic的情况
进程0 :
进程1 :
可以看到两个进程创建了两个消费组实例。
如此一来,可以知道一个消费者组的消费者实例对应的是一个进程。它可以是一台机器中的多个进程,也可以是多个机器中的进程。
我们还可以构建如下情况,一个消费者实例订阅多个Topic。
func main() {
c, err := rocketmq.NewPushConsumer(consumer.WithGroupName("ID-01"),
consumer.WithNameServer([]string{"192.168.132.101:9876"}),
consumer.WithRetry(2))
if err != nil {
fmt.Println("消费者实例创建失败")
}
go ReceiveMsgTest(c, 0)
time.Sleep(1000 * time.Second)
}
func ReceiveMsgTest(c rocketmq.PushConsumer, i int) {
c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Println("Receive New messages : ", ext)
fmt.Printf("Thread %v Consumer ReceiveMsg\n", i)
return consumer.ConsumeSuccess, nil
}) // 订阅 test
c.Subscribe("test1", consumer.MessageSelector{}, func(ctx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Println("Receive New messages : ", ext)
fmt.Printf("Thread %v Consumer ReceiveMsg\n", i)
return consumer.ConsumeSuccess, nil
}) // 订阅test1
c.Start()
defer c.Shutdown()
fmt.Printf("Thread %v Consumer Start\n", i)
time.Sleep(1000 * time.Second)
}
结果如下 :
可以接受到两个topic的消息。
此外,在一个进程中,可以通过协程并发运行不同组的消费者实例。
func main() {
c, err := rocketmq.NewPushConsumer(consumer.WithGroupName("ID-01"),
consumer.WithNameServer([]string{"192.168.132.101:9876"}),
consumer.WithRetry(2))
if err != nil {
fmt.Println("消费者实例创建失败")
}
go ReceiveMsgTest(c, 0) // "ID-01"组的一个消费者实例 订阅 "test"和"test1"
d, err := rocketmq.NewPushConsumer(consumer.WithGroupName("ID-02"),
consumer.WithNameServer([]string{"192.168.132.101:9876"}),
consumer.WithRetry(2))
if err != nil {
fmt.Println("消费者实例创建失败")
}
go ReceiveMsgTest(d, 1) // "ID-02"组的一个消费者实例 订阅 "test"和"test1"
time.Sleep(1000 * time.Second)
}
func ReceiveMsgTest(c rocketmq.PushConsumer, i int) {
c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Println("Receive New messages : ", ext)
fmt.Printf("Thread %v Consumer ReceiveMsg\n", i)
return consumer.ConsumeSuccess, nil
})
c.Subscribe("test1", consumer.MessageSelector{}, func(ctx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Println("Receive New messages : ", ext)
fmt.Printf("Thread %v Consumer ReceiveMsg\n", i)
return consumer.ConsumeSuccess, nil
})
c.Start()
defer c.Shutdown()
fmt.Printf("Thread %v Consumer Start\n", i)
time.Sleep(1000 * time.Second)
}
结果如下:
两个协程可以同时接受消息,意味着一个进程可以并发运行不同组的一个消费者实例。
总结来说,
- 一个消费者组中的功能应该一致,防止遗漏消费
- 一个Topic中的所有队列和一个消费者组中的消费者是多对一的关系,防止重复消费
- 一个消费者组对应消费者集群,一个消费者对应一个进程。同组中的消费者可以是一个机器中的多个进程;也可以是多个机器中的进程(建立集群,防止宕机导致服务不可用)
- 在一个进程中,可以并发运行不同消费者组的消费者实例,但不能并发运行同组的消费者线程。Go SDK内部也没有提供单例线程数的设置。
- Java SDK 可以设置,提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现
- 一个消费者组可以订阅多个Topic
- 所有消费者组由nameserve统一管理