最近要在一个go实现的K线服务器上追加一个websocket推送功能。
初步设想是在各品种每分钟的数据完成后立即向各订阅单位推送对应的K线数据。
一、场景抽象
1用户连接上服务器
2用户订阅某品种的某周期
3用户退订某品种某周期
4用户断开服务器
订阅:
推送:
二、程序框架
由于go语言不熟,幸好网上有人做了成熟的框架:github - gorilla/websocket: A fast, well-tested and widely used WebSocket implementation for Go.这个解决了怎样使用websocket的问题
三、主要业务逻辑
每个连接开两个线程
一个线程进行数据写入writePump,一个线程进行数据读取readPump,每当有客户端连接上服务器后,立即开启这两个线程。
读线程逻辑图:
写线程逻辑图:
程序启动时开一个总控线程func (h *Hub) Run()
此线程用来处理客户端的连接,断开,订阅和向订阅信息的客户端作相应信息的广播。
逻辑图如下:www.cppcns.com
处理注册
处理注册的逻辑非常简单,只是将客户端的指针放置于指针map中即可,代码如下:
处理注销
注销需要分两步走,首先把客户端指针集合中的对应指针删除,同时删除对应的发送通道。然后遍历内容订阅集合,删除对应的客户端指针的元素,最后,若发现某订阅内容上已经没有任何客户端指针,将其订阅内容一并删除。
程序逻辑如下:
订阅处理
订阅最主要的是订阅关系,我们可以用合约.周期作为key,使用客户端连接上来的指针集作为value,每当有用户订阅某内容时,将相应内容下面的指针集上加上对应此用户的指针即可,订阅关系如下:
推送处理
推送处理分两种类型,一种为用户订阅的数据推送给到他们,一种为心跳包推送给到30秒内没有数据推送或订阅动作的用户。
推送订阅的内容:当用户订阅的类型数据到达时,系统检查此订阅上的用户指针,将数据推送到对应指针的通道中,由各通道自行VMdlxCXB推送给各自的客户端。
推送心跳包:当某个连接上来的客户端30秒内没有订阅请求或是推送数据,系统将自动推送心跳包,以维持连接不被断开。