用Golang实现百万级Websocket服务
前言: 本文为国外大佬的一篇文章,因为我最近在研究和学习使用go写一个消息服务器,所以找到这篇文章,于是将它翻译过来,希望能够帮到其他的同学。这是我的处女作翻译作品。希望大神能够帮助指导修正。以后可能每周都会有一篇国外技术文章的翻译,有兴趣的同学可以加QQ群共同讨论(511247400)。
Websocket
如果你很熟悉websocket,但是不了解Go, 我希望你先掌握一些Go的知识
1.介绍
要定义我们的故事背景,应该说下我们为什么需要这样的一个服务。
我们有一个状态系统,用户邮箱存储就是其中之一,这里有一些方法去跟踪系统状态的变化和系统事件。大多是通过定期轮询或系统通知来改变状态。
两种方法各有利弊,但是提到email,用户越快接受到邮件越好。
50000HTTP60%304
email发布者订阅者
之前
现在:
第一个方案展示的就我们之前说的第一个,浏览器定期轮询Api并且询问邮箱存储服务的改变。
Websocket发布者
所以,今天我们来讨论这个API和Websocket服务,展望未来,我将告诉你这个服务将又有300万在线连接。
2. 通常解决方法
让我们看下如何在没有任何优化的情况下使用普通GO功能实现服务器的某些功能。
net/http
channel
2.1 channel struct
// Packet represents application level data.
type Packet struct {
...
}
// Channel wraps user connection.
type Channel struct {
conn net.Conn // WebSocket connection.
send chan Packet // Outgoing packets queue.
}
func NewChannel(conn net.Conn) *Channel {
c := &Channel{
conn: conn,
send: make(chan Packet, N),
}
go c.reader()
go c.writer()
return c
}
Websocket channel 实现
我想,你应该注意到两个读写的goroutines, 每个goroutines拥有一个堆内存,并且初始大小为2K到8K大小,这取决于操作系统和Go的版本。
关于上面我们提到的300万在线连接,我们就需要24GB的内存(按每个goroutine 4Kb 堆内存计算),而且没有为channel结构体分配内存。
2.2 I/O goroutines
reader
func (c *Channel) reader() {
// We make a buffered read to reduce read syscalls.
buf := bufio.NewReader(c.conn)
for {
pkt, _ := readPacket(buf)
c.handle(pkt)
}
}
这里我们使用bufio.Reader减少read()系统调用的次数,并读取buf缓冲区大小允许的数量, 在一个无限循环内,我们接受一个新的数据进来。 请记住:(等待新的数据到来)。我们稍后将返回它。
bufwriter
func (c *Channel) writer() {
// We make buffered write to reduce write syscalls.
buf := bufio.NewWriter(c.conn)
for pkt := range c.send {
_ := writePacket(buf, pkt)
buf.Flush()
}
}
c.send
2.3 HTTP
channelwebsockt
TCP
import (
"net/http"
"some/websocket"
)
http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
conn, _ := websocket.Upgrade(r, w)
ch := NewChannel(conn)
//...
})
http.ResponseWriterbufio.Readerbufio.Writer*http.Request
WebsocketresponseWriter.Hijack()I/O
提示 :在某些情况下,go:linkname可用于通过调用 net / http.putBufio {Reader,Writer} 将缓冲区返回到net / http内的sync.Pool。
从而,我们300万的在线连接又需要额外24GB的内存, 所以,我们的应用程序就需要72GB内存,并且什么也没做。
3. 优化
Websocketping/pong
连接的生命周期可能是几秒到几天
Channel.reader()Channel.writer()
现在我们已经清楚哪些能够做的更好,哪些不能。
3.1. Netpoll
Channel.reader()bufio.Reader.Read()conn.Read()goroutinegoroutinegoroutine
conn.Read()net.netFD.Read()
func (fd *netFD) Read(p []byte) (n int, err error) {
//...
for {
n, err = syscall.Read(fd.sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN {
if err = fd.pd.waitRead(); err == nil {
continue
}
}
}
//...
break
}
//...
}
Go 使用非阻塞模式,“ EAGAIN ” 说没有数据在socket连接里并且不要锁定读取空的Websocket连接。OS将控制权返回给我们
read" EAGAIN "pollDesc.waitRead()
// net/fd_poll_runtime.go
func (pd *pollDesc) waitRead() error {
return pd.wait('r')
}
func (pd *pollDesc) wait(mode int) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
//...
}
netpollLinuxepollBSDkqueuegoroutinesocket
netpoll
3.2 摆脱goroutines
Channel.reader()goroutine
并订阅连接中可读数据的事件
ch := NewChannel(conn)
// Make conn to be observed by netpoll instance.
poller.Start(conn, netpoll.EventRead, func() {
// We spawn goroutine here to prevent poller wait loop
// to become locked during receiving packet from ch.
go Receive(ch)
})
// Receive reads a packet from conn and handles it somehow.
func (ch *Channel) Receive() {
buf := bufio.NewReader(ch.conn)
pkt := readPacket(buf)
c.handle(pkt)
}
Channel.writer()
func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
go ch.writer()
}
ch.send <- p
}
write()EAGAIN
ch.sendwritergoroutine
I/O
3.3 控制资源
大量的连接并不只是涉及高内存消耗,当开发服务时,我们经历了反复的竞争条件和死锁之后通常会出现所谓的自我DDoS--应用程序客户端试图连接到服务器从而进一步破坏它的情况。
ping/pong
如果锁定或重载的服务器刚刚停止接受新连接,并且它之前的平衡器(例如,nginx)将请求传递给下一个服务器实例,那将是很好的。
此外,服务器负载如何,如果所有客户突然想以任何理由向我们发送数据包,先前保存的48 GB将再次使用,我们实际上会回到每个连接的goroutine和缓冲区的初始状态。
goroutine 池
goroutine
package gopool
func New(size int) *Pool {
return &Pool{
work: make(chan func()),
sem: make(chan struct{}, size),
}
}
func (p *Pool) Schedule(task func()) error {
select {
case p.work <- task:
case p.sem <- struct{}{}:
go p.worker(task)
}
}
func (p *Pool) worker(task func()) {
defer func() { <-p.sem }
for {
task()
task = <-p.work
}
}
现在我们的netpoll代码如下:
pool := gopool.New(128)
poller.Start(conn, netpoll.EventRead, func() {
// We will block poller wait loop when
// all pool workers are busy.
pool.Schedule(func() {
Receive(ch)
})
})
socketgoroutine
Send():
pool := gopool.New(128)
func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
pool.Schedule(ch.writer)
}
ch.send <- p
}
go ch.writer()goroutinesgoroutinesAccept()Upgrade()
3.4 零拷贝升级
WebsocketWebsocket
GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocket
HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket
Websockethttp.Request
http.Request
但是该怎么返回呢?
WebSocket 实现
我们的服务器优化时存在的所有库都允许我们仅为标准的net / http服务器进行升级, 而且,(两个)库都不能使用所有上述读写优化,使这些优化工作,我们必须有一个相当低级的API来处理WebSocket。重用缓冲区,我们需要procotol函数看起来像这样:
func ReadFrame(io.Reader) (Frame, error)
func WriteFrame(io.Writer, Frame) error
如果我们有一个带有这种API的库,我们可以按如下方式从连接中读取数据包(数据包写入看起来一样):
// getReadBuf, putReadBuf are intended to
// reuse *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader
func putReadBuf(*bufio.Reader)
// readPacket must be called when data could be read from conn.
func readPacket(conn io.Reader) error {
buf := getReadBuf()
defer putReadBuf(buf)
buf.Reset(conn)
frame, _ := ReadFrame(buf)
parsePacket(frame.Payload)
//...
}
简而言之,是时候建立自己的库了。
github.com/gobwas/ws
io.Readerio.WriterI / O
net/httpwsws.Upgrade()io.ReadWriternet.Connnet.Listenln.Accept()ws.Upgrade()CookieSession
下面是升级请求处理的基准:标准net / http服务器与net.Listen()以及零拷贝升级:
BenchmarkUpgradeHTTP 5156 ns/op 8576 B/op 9 allocs/op
BenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op
wsnet / httpI / O
3.5 概要
让我们构建一下我告诉你的优化
goroutinenetpollgoroutinenetpoll
这就是服务器代码:
import (
"net"
"github.com/gobwas/ws"
)
ln, _ := net.Listen("tcp", ":8080")
for {
// Try to accept incoming connection inside free pool worker.
// If there no free workers for 1ms, do not accept anything and try later.
// This will help us to prevent many self-ddos or out of resource limit cases.
err := pool.ScheduleTimeout(time.Millisecond, func() {
conn := ln.Accept()
_ = ws.Upgrade(conn)
// Wrap WebSocket connection with our Channel struct.
// This will help us to handle/send our app's packets.
ch := NewChannel(conn)
// Wait for incoming bytes from connection.
poller.Start(conn, netpoll.EventRead, func() {
// Do not cross the resource limits.
pool.Schedule(func() {
// Read and handle incoming packet(s).
ch.Recevie()
})
})
})
if err != nil {
time.Sleep(time.Millisecond)
}
}
4. 总结
过早优化是编程中所有邪恶(或至少大部分)的根源。唐纳德克努特
当然,上述优化是相对应的,并非在所有情况下。例如,如果自由资源(内存,CPU)与在线连接数之间的比率相当高,则优化可能没有意义。但是,通过了解改进的位置和内容,您可以从中受益匪浅。
感谢您的关注!
5. 参考
本作品采用《CC 协议》,转载必须注明作者和本文链接