WebSocket是什么
全双工
golang
gingorilla/websocketgo getgit bashgit clone
gorilla/websocketGOPATHsrchttp://github.com
git clone https://github.com/gorilla/websocket
gin-gonic/ginGOPATHsrchttp://github.com
git clone https://github.com/gin-gonic/gin
WebSocket处理过程
WebSocket处理逻辑流程图
说明:服务端(server)中的 inChan 和 outChan 这两个通道的操作都可封装成写入和读取两个接口;同时服务端的两个 for 即是两个必要的协程,它们分别是不断接收客户端数据和不断发送数据给客户端的操作。
WebSocket目录结构
说明:WebSocketHandler 下有两个 .go 文件,websocketBase.go 里面是做WebSocket处理的封装,serverBase.go 则是 WebSocket 服务的基本服务结构函数。
websocketBase.go 内部处理过程
- 定义核心结构体
// wsConn TODO:封装的基本结构体
type wsConn struct {
inChan chan []byte
outChan chan []byte
closeChan chan []byte
isClose bool // 通道closeChan是否已经关闭
mutex sync.Mutex
conn *websocket.Conn
}
- 初始化webSocket调用
// InitWebSocket TODO:初始化Websocket
func InitWebSocket(conn *websocket.Conn) (ws *wsConn, err error) {
ws = &wsConn{
inChan: make(chan []byte, 1024),
outChan: make(chan []byte, 1024),
closeChan: make(chan []byte, 1024),
conn: conn,
}
// 完善必要协程:读取客户端数据协程/发送数据协程
go ws.readMsgLoop()
go ws.writeMsgLoop()
return
}
- inChan的读取和写入操作封装
// InChanRead TODO:读取inChan的数据
func (conn *wsConn) InChanRead() (data []byte, err error) {
select {
case data = <-conn.inChan:
case <-conn.closeChan:
err = errors.New("connection is closed")
}
return
}
// InChanWrite TODO:inChan写入数据
func (conn *wsConn) InChanWrite(data []byte) (err error) {
select {
case conn.inChan <- data:
case <-conn.closeChan:
err = errors.New("connection is closed")
}
return
}
- outChan的读取和写入操作封装
// OutChanRead TODO:读取inChan的数据
func (conn *wsConn) OutChanRead() (data []byte, err error) {
select {
case data = <-conn.outChan:
case <-conn.closeChan:
err = errors.New("connection is closed")
}
return
}
// OutChanWrite TODO:inChan写入数据
func (conn *wsConn) OutChanWrite(data []byte) (err error) {
select {
case conn.outChan <- data:
case <-conn.closeChan:
err = errors.New("connection is closed")
}
return
}
- 关闭WebSocket连接
// CloseConn TODO:关闭WebSocket连接
func (conn *wsConn) CloseConn() {
// 关闭closeChan以控制inChan/outChan策略,仅此一次
conn.mutex.Lock()
if !conn.isClose {
close(conn.closeChan)
conn.isClose = true
}
conn.mutex.Unlock()
//关闭WebSocket的连接,conn.Close()是并发安全可以多次关闭
_ = conn.conn.Close()
}
- 接受客户端数据协程函数
// readMsgLoop TODO:读取客户端发送的数据写入到inChan
func (conn *wsConn) readMsgLoop() {
for {
// 确定数据结构
var (
data []byte
err error
)
// 接收数据
if _, data, err = conn.conn.ReadMessage(); err != nil {
goto ERR
}
// 写入数据
if err = conn.InChanWrite(data); err != nil {
goto ERR
}
}
ERR:
conn.CloseConn()
}
- 发送数据给客户端协程函数
// writeMsgLoop TODO:读取outChan的数据响应给客户端
func (conn *wsConn) writeMsgLoop() {
for {
var (
data []byte
err error
)
// 读取数据
if data, err = conn.OutChanRead(); err != nil {
goto ERR
}
// 发送数据
if err = conn.conn.WriteMessage(1, data); err != nil {
goto ERR
}
}
ERR:
conn.CloseConn()
}
- 完整代码
package WebSocketHandler
import (
"errors"
"golang.org/websocket"
"sync"
)
// wsConn TODO:封装的基本结构体
type wsConn struct {
inChan chan []byte
outChan chan []byte
closeChan chan []byte
isClose bool // 通道closeChan是否已经关闭
mutex sync.Mutex
conn *websocket.Conn
}
// InitWebSocket TODO:初始化Websocket
func InitWebSocket(conn *websocket.Conn) (ws *wsConn, err error) {
ws = &wsConn{
inChan: make(chan []byte, 1024),
outChan: make(chan []byte, 1024),
closeChan: make(chan []byte, 1024),
conn: conn,
}
// 完善必要协程:读取客户端数据协程/发送数据协程
go ws.readMsgLoop()
go ws.writeMsgLoop()
return
}
// InChanRead TODO:读取inChan的数据
func (conn *wsConn) InChanRead() (data []byte, err error) {
select {
case data = <-conn.inChan:
case <-conn.closeChan:
err = errors.New("connection is closed")
}
return
}
// InChanWrite TODO:inChan写入数据
func (conn *wsConn) InChanWrite(data []byte) (err error) {
select {
case conn.inChan <- data:
case <-conn.closeChan:
err = errors.New("connection is closed")
}
return
}
// OutChanRead TODO:读取inChan的数据
func (conn *wsConn) OutChanRead() (data []byte, err error) {
select {
case data = <-conn.outChan:
case <-conn.closeChan:
err = errors.New("connection is closed")
}
return
}
// OutChanWrite TODO:inChan写入数据
func (conn *wsConn) OutChanWrite(data []byte) (err error) {
select {
case conn.outChan <- data:
case <-conn.closeChan:
err = errors.New("connection is closed")
}
return
}
// CloseConn TODO:关闭WebSocket连接
func (conn *wsConn) CloseConn() {
// 关闭closeChan以控制inChan/outChan策略,仅此一次
conn.mutex.Lock()
if !conn.isClose {
close(conn.closeChan)
conn.isClose = true
}
conn.mutex.Unlock()
//关闭WebSocket的连接,conn.Close()是并发安全可以多次关闭
_ = conn.conn.Close()
}
// readMsgLoop TODO:读取客户端发送的数据写入到inChan
func (conn *wsConn) readMsgLoop() {
for {
// 确定数据结构
var (
data []byte
err error
)
// 接受数据
if _, data, err = conn.conn.ReadMessage(); err != nil {
goto ERR
}
// 写入数据
if err = conn.InChanWrite(data); err != nil {
goto ERR
}
}
ERR:
conn.CloseConn()
}
// writeMsgLoop TODO:读取outChan的数据响应给客户端
func (conn *wsConn) writeMsgLoop() {
for {
var (
data []byte
err error
)
// 读取数据
if data, err = conn.OutChanRead(); err != nil {
goto ERR
}
// 发送数据
if err = conn.conn.WriteMessage(1, data); err != nil {
goto ERR
}
}
ERR:
conn.CloseConn()
}
serverBase.go 服务基本结构函数
package WebSocketHandler
import (
"github.com/gin-gonic/gin"
"golang.org/websocket"
"net/http"
"time"
)
// websocket 升级并跨域
var (
upgrade = &websocket.Upgrader{
// 允许跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
}
)
// WebSocketBase TODO:服务基本函数
func WebSocketBase(c *gin.Context) {
var (
err error
conn *websocket.Conn
ws *wsConn
)
if conn, err = upgrade.Upgrade(c.Writer,c.Request,nil); err != nil{
return
}
if ws, err = InitWebSocket(conn); err != nil{
return
}
// 使得inChan和outChan耦合起来
for {
var data []byte
if data, err = ws.InChanRead(); err != nil{
goto ERR
}
if err = ws.OutChanWrite(data); err != nil{
goto ERR
}
}
ERR:
ws.CloseConn()
}
- mian()函数结构
package main
import (
"github.com/gin-gonic/gin"
"testWebSocket/WebSocketHandler"
)
func main(){
r := gin.Default()
r.GET("/ws",WebSocketHandler.WebSocketBase)
err := r.Run(":9999")
if err != nil{
return
}
}
额外说明
这样处理便可实现并发安全。如果细心观察的话,大家会发现该 WebSocket 升级跨域处理中的 CheckOrigin 都默认 true,也就是说只要地址暴露,任何人不需要任何允准就可以随意连接,这免不了非法链接和恶行攻击的风险。如何杜绝这种风险呢,如果大家感兴趣,可以关注,后续我会再次基础上进行升级。当然欢迎大家评论转发。