gotify 架构设计
Gotify
官方的文档里有说这是一个简单的推送系统,在看完代码后发现,发现 gotify 关于推送功能的实现确实有些简单,不适合复杂的线上业务推送场景. gotify 在功能上有一些的缺失,比如它未实现消息回执ack确认,不支持离线消息,不支持按照 revison 来增量推送, 未解决慢消费者引起的阻塞问题等等. 另外他也不是分布式架构,当前属于单点架构.
虽然 gotify/server 有些功能缺失,但作者花费时间做开源还是很值得敬佩的,另外还对前端接入及安卓端做了实现,还有文档也较为完整. 😁
ntfy.sh
代码地址:
阻塞引发的问题
功能缺失还好,但有一个问题相对严重点. 调用 notify 接口给一个 userID 的所有客户端写消息时,如果某个客户端弱网慢消费, 短时间没法写入到 chan 缓冲, 那么就会陷入阻塞等待. 问题来了,在阻塞期间不仅其他人没法 notify 给其他客户端投递消息, 对于客户端的增删改也都是 hang 住的. 直到客户端 conn 写超时触发 close(write) 唤醒 notify 阻塞端.
但让人疑惑的是,client.write 管道已经关闭,那么继续写入必然 panic, 这个逻辑无疑是个bug,好奇为啥没人反应.
解决阻塞有几个方法:
- 加大 chan 缓冲, 单个缓冲对于高并发不友好.
- 写入 chan 时加入超时, 减少阻塞时长.
- 把数据放到 list 链表里,只使用 chan 做事件通知.
- 把 clients 结构做成多分片,增加性能, 尽减少锁竞争.
type API struct {
clients map[uint][]*client
lock sync.RWMutex
pingPeriod time.Duration
pongTimeout time.Duration
upgrader *websocket.Upgrader
}
func (a *API) Notify(userID uint, msg *model.MessageExternal) {
a.lock.RLock()
defer a.lock.RUnlock()
if clients, ok := a.clients[userID]; ok {
for _, c := range clients {
c.write <- msg
}
}
}
推送功能实现
主要就两点,客户端如何监听消息,消息如何投递.
客户端如何监听
websocket 客户端连接服务端后,server 针对该客户端进行协议升级,然后创建一个 client 对象, 然后注册到一个 map 里.
type API struct {
clients map[uint][]*client
lock sync.RWMutex
pingPeriod time.Duration
pongTimeout time.Duration
upgrader *websocket.Upgrader
}
func (a *API) register(client *client) {
a.lock.Lock()
defer a.lock.Unlock()
a.clients[client.userID] = append(a.clients[client.userID], client)
}
对 client 对象开两个协程,读和写协程. 读协程只管排空,不做业务逻辑. 写协程就是从 write 管道里监听数据,然后写到 conn 里.
func (a *API) Handle(ctx *gin.Context) {
conn, err := a.upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
if err != nil {
ctx.Error(err)
return
}
client := newClient(conn, auth.GetUserID(ctx), auth.GetTokenID(ctx), a.remove)
a.register(client)
go client.startReading(a.pongTimeout)
go client.startWriteHandler(a.pingPeriod)
}
func (c *client) startReading(pongWait time.Duration) {
defer c.NotifyClose()
c.conn.SetReadLimit(64)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(appData string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
if _, _, err := c.conn.NextReader(); err != nil {
printWebSocketError("ReadError", err)
return
}
}
}
func (c *client) startWriteHandler(pingPeriod time.Duration) {
pingTicker := time.NewTicker(pingPeriod)
defer func() {
c.NotifyClose()
pingTicker.Stop()
}()
for {
select {
case message, ok := <-c.write:
if !ok {
return
}
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := writeJSON(c.conn, message); err != nil {
printWebSocketError("WriteError", err)
return
}
case <-pingTicker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := ping(c.conn); err != nil {
printWebSocketError("PingError", err)
return
}
}
}
}
func (c *client) NotifyClose() {
c.once.Do(func() {
c.conn.Close()
close(c.write)
c.onClose(c)
})
}
用户怎么投递信息
用户可以通过 http post 接口来投递 message 消息, gotify server 收到消息后进行基本的鉴权和消息格式适配后,把消息再广播给 userID 对应的 ws 客户端.
一个 userID 可以绑定多个 websocket client.
g.Group("/").Use(authentication.RequireApplicationToken()).POST("/message", messageHandler.CreateMessage)
func (a *MessageAPI) CreateMessage(ctx *gin.Context) {
message := model.MessageExternal{}
if err := ctx.Bind(&message); err == nil {
application, err := a.DB.GetApplicationByToken(auth.GetTokenID(ctx))
if success := successOrAbort(ctx, 500, err); !success {
return
}
...
a.Notifier.Notify(auth.GetUserID(ctx), toExternalMessage(msgInternal))
ctx.JSON(200, toExternalMessage(msgInternal))
}
}