前言
这周姐姐入职了新公司,老板想探探他的底,看了一眼他的简历,呦呵,精通kafka,这小姑娘有两下子,既然这样,那你写一个消息队列吧。因为要用go语言写,这可给姐姐愁坏了。赶紧来求助我,我这么坚贞不屈一人,在姐姐的软磨硬泡下还是答应他了,所以接下来我就手把手教姐姐怎么写一个消息队列。下面我们就来看一看我是怎么写的吧~~~。
本代码已上传到我的github:
有需要的小伙伴,可自行下载,顺便给个小星星吧~~~
什么是消息队列
kafka
MQ(Message Queue)QueueMQ
MQ
欠欠的我开始了接下来的耐心讲解......
MQ
看这个图,邮件发送在请求登陆时进行,当密码验证成功后,就发送邮件,然后返回登陆成功。这样是可以的,但是他是有缺陷的。这让我们的登陆操作变得复杂了,每次请求登陆都需要进行邮件发送,如果这里出现错误,整个登陆请求也出现了错误,导致登陆不成功;还有一个问题,本来我们登陆请求调用接口仅仅需要100ms,因为中间要做一次发送邮件的等待,那么调用一次登陆接口的时间就要增长,这就是问题所在,一封邮件他的优先级 不是很高的,用户也不需要实时收到这封邮件,所以这时,就体现了消息队列的重要性了,我们用消息队列进行改进一下。
Mq
MQMQ
channel
channel
channel
channel
channel
- 先从 Channel 读取数据的 Goroutine 会先接收到数据;
- 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;
创建通道
创建通道需要用到关键字 make ,格式如下:
通道实例 := make(chan 数据类型)
- 数据类型:通道内传输的元素类型。
- 通道实例:通过make创建的通道句柄。
无缓冲通道的使用
Go语言中无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。
无缓冲通道的定义方式如下:
通道实例 := make(chan 通道类型)
- 通道类型:和无缓冲通道用法一致,影响通道发送和接收的数据类型。
- 缓冲大小:0
- 通道实例:被创建出的通道实例。
写个例子来帮助大家理解一下吧:
package main
import (
"sync"
"time"
)
func main() {
c := make(chan string)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
c <- `Golang梦工厂`
}()
go func() {
defer wg.Done()
time.Sleep(time.Second * 1)
println(`Message: `+ <-c)
}()
wg.Wait()
}
带缓冲的通道的使用
Go语言中有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也会不同。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。
有缓冲通道的定义方式如下:
通道实例 := make(chan 通道类型, 缓冲大小)
- 通道类型:和无缓冲通道用法一致,影响通道发送和接收的数据类型。
- 缓冲大小:决定通道最多可以保存的元素数量。
- 通道实例:被创建出的通道实例。
来写一个例子讲解一下:
channel
消息队列编码实现
准备篇
终于开始进入主题了,姐姐都听的快要睡着了,我轰隆一嗓子,立马精神,但是呢,asong也是挨了一顿小电炮,代价惨痛呀,呜呜呜............
在开始编写代码编写直接,我需要构思我们的整个代码架构,这才是正确的编码方式。我们先来定义一个接口,把我们需要实现的方法先列出来,后期对每一个代码进行实现就可以了。因此可以列出如下方法:
type Broker interface {
publish(topic string, msg interface{}) error
subscribe(topic string) (<-chan interface{}, error)
unsubscribe(topic string, sub <-chan interface{}) error
close()
broadcast(msg interface{}, subscribers []chan interface{})
setConditions(capacity int)
}
publishtopicmsgsubscribechannelunsubscribeclosebroadCastsetConditions
细心的你们有没有发现什么问题,这些代码我都定义的是内部方法,也就是包外不可用。为什么这么做呢,因为这里属于代理要做的事情,我们还需要在封装一层,也就是客户端能直接调用的方法,这样才符合软件架构。因此可以写出如下代码:
package mq
type Client struct {
bro *BrokerImpl
}
func NewClient() *Client {
return &Client{
bro: NewBroker(),
}
}
func (c *Client)SetConditions(capacity int) {
c.bro.setConditions(capacity)
}
func (c *Client)Publish(topic string, msg interface{}) error{
return c.bro.publish(topic,msg)
}
func (c *Client)Subscribe(topic string) (<-chan interface{}, error){
return c.bro.subscribe(topic)
}
func (c *Client)Unsubscribe(topic string, sub <-chan interface{}) error {
return c.bro.unsubscribe(topic,sub)
}
func (c *Client)Close() {
c.bro.close()
}
func (c *Client)GetPayLoad(sub <-chan interface{}) interface{}{
for val:= range sub{
if val != nil{
return val
}
}
return nil
}
上面只是准好了代码结构,但是消息队列实现的结构我们还没有设计,现在我们就来设计一下。
type BrokerImpl struct {
exit chan bool
capacity int
topics map[string][]chan interface{} // key: topic value : queue
sync.RWMutex // 同步锁
}
exitcapacitytopicstopicchansync.RWMutex
好啦,现在我们已经准备的很充分啦,开始接下来方法填充之旅吧~~~
Publishbroadcast
braodcastpublish
func (b *BrokerImpl) publish(topic string, pub interface{}) error {
select {
case <-b.exit:
return errors.New("broker closed")
default:
}
b.RLock()
subscribers, ok := b.topics[topic]
b.RUnlock()
if !ok {
return nil
}
b.broadcast(pub, subscribers)
return nil
}
func (b *BrokerImpl) broadcast(msg interface{}, subscribers []chan interface{}) {
count := len(subscribers)
concurrency := 1
switch {
case count > 1000:
concurrency = 3
case count > 100:
concurrency = 2
default:
concurrency = 1
}
pub := func(start int) {
for j := start; j < count; j += concurrency {
select {
case subscribers[j] <- msg:
case <-time.After(time.Millisecond * 5):
case <-b.exit:
return
}
}
}
for i := 0; i < concurrency; i++ {
go pub(i)
}
}
publishbroadcast
goroutine
switch
subscribeunsubScribe
我们先来看代码:
func (b *BrokerImpl) subscribe(topic string) (<-chan interface{}, error) {
select {
case <-b.exit:
return nil, errors.New("broker closed")
default:
}
ch := make(chan interface{}, b.capacity)
b.Lock()
b.topics[topic] = append(b.topics[topic], ch)
b.Unlock()
return ch, nil
}
func (b *BrokerImpl) unsubscribe(topic string, sub <-chan interface{}) error {
select {
case <-b.exit:
return errors.New("broker closed")
default:
}
b.RLock()
subscribers, ok := b.topics[topic]
b.RUnlock()
if !ok {
return nil
}
// delete subscriber
var newSubs []chan interface{}
for _, subscriber := range subscribers {
if subscriber == sub {
continue
}
newSubs = append(newSubs, subscriber)
}
b.Lock()
b.topics[topic] = newSubs
b.Unlock()
return nil
}
这里其实就很简单了:
subscribechanneltopicchannelunsubScribechannel
close
func (b *BrokerImpl) close() {
select {
case <-b.exit:
return
default:
close(b.exit)
b.Lock()
b.topics = make(map[string][]chan interface{})
b.Unlock()
}
return
}
b.topics = make(map[string][]chan interface{})
setConditionsGetPayLoad
还差最后两个方法,一个是设置我们的消息队列容量,另一个是封装一个方法来获取我们订阅的消息:
func (b *BrokerImpl)setConditions(capacity int) {
b.capacity = capacity
}
func (c *Client)GetPayLoad(sub <-chan interface{}) interface{}{
for val:= range sub{
if val != nil{
return val
}
}
return nil
}
测试
好啦,代码这么快就被写完了,接下来我们进行测试一下吧。
单元测试
bug
topic
func TestClient(t *testing.T) {
b := NewClient()
b.SetConditions(100)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
topic := fmt.Sprintf("Golang梦工厂%d", i)
payload := fmt.Sprintf("asong%d", i)
ch, err := b.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
wg.Add(1)
go func() {
e := b.GetPayLoad(ch)
if e != payload {
t.Fatalf("%s expected %s but get %s", topic, payload, e)
}
if err := b.Unsubscribe(topic, ch); err != nil {
t.Fatal(err)
}
wg.Done()
}()
if err := b.Publish(topic, payload); err != nil {
t.Fatal(err)
}
}
wg.Wait()
}
测试通过,没问题,接下来我们在写几个方法测试一下
测试
这里分为两种方式测试
测试一:使用一个定时器,向一个主题定时推送消息.
// 一个topic 测试
func OnceTopic() {
m := mq.NewClient()
m.SetConditions(10)
ch,err :=m.Subscribe(topic)
if err != nil{
fmt.Println("subscribe failed")
return
}
go OncePub(m)
OnceSub(ch,m)
defer m.Close()
}
// 定时推送
func OncePub(c *mq.Client) {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <- t.C:
err := c.Publish(topic,"asong真帅")
if err != nil{
fmt.Println("pub message failed")
}
default:
}
}
}
// 接受订阅消息
func OnceSub(m <-chan interface{},c *mq.Client) {
for {
val := c.GetPayLoad(m)
fmt.Printf("get message is %s\n",val)
}
}
测试二:使用一个定时器,定时向多个主题发送消息:
//多个topic测试
func ManyTopic() {
m := mq.NewClient()
defer m.Close()
m.SetConditions(10)
top := ""
for i:=0;i<10;i++{
top = fmt.Sprintf("Golang梦工厂_%02d",i)
go Sub(m,top)
}
ManyPub(m)
}
func ManyPub(c *mq.Client) {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <- t.C:
for i:= 0;i<10;i++{
//多个topic 推送不同的消息
top := fmt.Sprintf("Golang梦工厂_%02d",i)
payload := fmt.Sprintf("asong真帅_%02d",i)
err := c.Publish(top,payload)
if err != nil{
fmt.Println("pub message failed")
}
}
default:
}
}
}
func Sub(c *mq.Client,top string) {
ch,err := c.Subscribe(top)
if err != nil{
fmt.Printf("sub top:%s failed\n",top)
}
for {
val := c.GetPayLoad(ch)
if val != nil{
fmt.Printf("%s get message is %s\n",top,val)
}
}
}
总结
终于帮助姐姐解决了这个问题,姐姐开心死了,给我一顿亲,啊不对,是一顿夸,夸的人家都不好意思了。
这一篇你学会了吗?没学会不要紧,赶快去把源代码下载下来,好好通读一下,很好理解的~~~。
其实这一篇是为了接下来的kafka学习打基础的,学好了这一篇,接下来学习的kafka就会容易很多啦~~~
推荐往期文章: