WebSocket是什么

全双工
golang
gingorilla/websocketgo getgit bashgit clone
gorilla/websocketGOPATHsrchttp://github.com
git clone https://github.com/gorilla/websocket
gin-gonic/ginGOPATHsrchttp://github.com
git clone https://github.com/gin-gonic/gin

WebSocket处理过程

WebSocket处理逻辑流程图

WebSocket处理逻辑示意图

说明:服务端(server)中的 inChanoutChan 这两个通道的操作都可封装成写入和读取两个接口;同时服务端的两个 for 即是两个必要的协程,它们分别是不断接收客户端数据和不断发送数据给客户端的操作。

WebSocket目录结构

WebSocket项目结构

说明:WebSocketHandler 下有两个 .go 文件,websocketBase.go 里面是做WebSocket处理的封装,serverBase.go 则是 WebSocket 服务的基本服务结构函数。

websocketBase.go 内部处理过程

  • 定义核心结构体
// wsConn TODO:封装的基本结构体
type wsConn struct {
	inChan    chan []byte
	outChan   chan []byte
	closeChan chan []byte
	isClose   bool // 通道closeChan是否已经关闭
	mutex     sync.Mutex
	conn      *websocket.Conn
}
  • 初始化webSocket调用
// InitWebSocket TODO:初始化Websocket
func InitWebSocket(conn *websocket.Conn) (ws *wsConn, err error) {
	ws = &wsConn{
		inChan:    make(chan []byte, 1024),
		outChan:   make(chan []byte, 1024),
		closeChan: make(chan []byte, 1024),
		conn:      conn,
	}
	// 完善必要协程:读取客户端数据协程/发送数据协程
	go ws.readMsgLoop()
	go ws.writeMsgLoop()
	return
}
  • inChan的读取和写入操作封装
// InChanRead TODO:读取inChan的数据
func (conn *wsConn) InChanRead() (data []byte, err error) {
	select {
	case data = <-conn.inChan:
	case <-conn.closeChan:
		err = errors.New("connection is closed")
	}
	return
}

// InChanWrite TODO:inChan写入数据
func (conn *wsConn) InChanWrite(data []byte) (err error) {
	select {
	case conn.inChan <- data:
	case <-conn.closeChan:
		err = errors.New("connection is closed")
	}
	return
}
  • outChan的读取和写入操作封装
// OutChanRead TODO:读取inChan的数据
func (conn *wsConn) OutChanRead() (data []byte, err error) {
	select {
	case data = <-conn.outChan:
	case <-conn.closeChan:
		err = errors.New("connection is closed")
	}
	return
}

// OutChanWrite TODO:inChan写入数据
func (conn *wsConn) OutChanWrite(data []byte) (err error) {
	select {
	case conn.outChan <- data:
	case <-conn.closeChan:
		err = errors.New("connection is closed")
	}
	return
}
  • 关闭WebSocket连接
// CloseConn TODO:关闭WebSocket连接
func (conn *wsConn) CloseConn() {
	// 关闭closeChan以控制inChan/outChan策略,仅此一次
	conn.mutex.Lock()
	if !conn.isClose {
		close(conn.closeChan)
		conn.isClose = true
	}
	conn.mutex.Unlock()
	//关闭WebSocket的连接,conn.Close()是并发安全可以多次关闭
	_ = conn.conn.Close()
}
  • 接受客户端数据协程函数
// readMsgLoop TODO:读取客户端发送的数据写入到inChan
func (conn *wsConn) readMsgLoop() {
	for {
		// 确定数据结构
		var (
			data []byte
			err  error
		)
		// 接收数据
		if _, data, err = conn.conn.ReadMessage(); err != nil {
			goto ERR
		}
		// 写入数据
		if err = conn.InChanWrite(data); err != nil {
			goto ERR
		}
	}
ERR:
	conn.CloseConn()
}
  • 发送数据给客户端协程函数
// writeMsgLoop TODO:读取outChan的数据响应给客户端
func (conn *wsConn) writeMsgLoop() {
	for {
		var (
			data []byte
			err  error
		)
		// 读取数据
		if data, err = conn.OutChanRead(); err != nil {
			goto ERR
		}
		// 发送数据
		if err = conn.conn.WriteMessage(1, data); err != nil {
			goto ERR
		}
	}
ERR:
	conn.CloseConn()
}
  • 完整代码
package WebSocketHandler

import (
	"errors"
	"golang.org/websocket"
	"sync"
)

// wsConn TODO:封装的基本结构体
type wsConn struct {
	inChan    chan []byte
	outChan   chan []byte
	closeChan chan []byte
	isClose   bool // 通道closeChan是否已经关闭
	mutex     sync.Mutex
	conn      *websocket.Conn
}

// InitWebSocket TODO:初始化Websocket
func InitWebSocket(conn *websocket.Conn) (ws *wsConn, err error) {
	ws = &wsConn{
		inChan:    make(chan []byte, 1024),
		outChan:   make(chan []byte, 1024),
		closeChan: make(chan []byte, 1024),
		conn:      conn,
	}
	// 完善必要协程:读取客户端数据协程/发送数据协程
	go ws.readMsgLoop()
	go ws.writeMsgLoop()
	return
}

// InChanRead TODO:读取inChan的数据
func (conn *wsConn) InChanRead() (data []byte, err error) {
	select {
	case data = <-conn.inChan:
	case <-conn.closeChan:
		err = errors.New("connection is closed")
	}
	return
}

// InChanWrite TODO:inChan写入数据
func (conn *wsConn) InChanWrite(data []byte) (err error) {
	select {
	case conn.inChan <- data:
	case <-conn.closeChan:
		err = errors.New("connection is closed")
	}
	return
}

// OutChanRead TODO:读取inChan的数据
func (conn *wsConn) OutChanRead() (data []byte, err error) {
	select {
	case data = <-conn.outChan:
	case <-conn.closeChan:
		err = errors.New("connection is closed")
	}
	return
}

// OutChanWrite TODO:inChan写入数据
func (conn *wsConn) OutChanWrite(data []byte) (err error) {
	select {
	case conn.outChan <- data:
	case <-conn.closeChan:
		err = errors.New("connection is closed")
	}
	return
}

// CloseConn TODO:关闭WebSocket连接
func (conn *wsConn) CloseConn() {
	// 关闭closeChan以控制inChan/outChan策略,仅此一次
	conn.mutex.Lock()
	if !conn.isClose {
		close(conn.closeChan)
		conn.isClose = true
	}
	conn.mutex.Unlock()
	//关闭WebSocket的连接,conn.Close()是并发安全可以多次关闭
	_ = conn.conn.Close()
}

// readMsgLoop TODO:读取客户端发送的数据写入到inChan
func (conn *wsConn) readMsgLoop() {
	for {
		// 确定数据结构
		var (
			data []byte
			err  error
		)
		// 接受数据
		if _, data, err = conn.conn.ReadMessage(); err != nil {
			goto ERR
		}
		// 写入数据
		if err = conn.InChanWrite(data); err != nil {
			goto ERR
		}
	}
ERR:
	conn.CloseConn()
}

// writeMsgLoop TODO:读取outChan的数据响应给客户端
func (conn *wsConn) writeMsgLoop() {
	for {
		var (
			data []byte
			err  error
		)
		// 读取数据
		if data, err = conn.OutChanRead(); err != nil {
			goto ERR
		}
		// 发送数据
		if err = conn.conn.WriteMessage(1, data); err != nil {
			goto ERR
		}
	}
ERR:
	conn.CloseConn()
}

serverBase.go 服务基本结构函数

package WebSocketHandler

import (
	"github.com/gin-gonic/gin"
	"golang.org/websocket"
	"net/http"
	"time"
)

// websocket 升级并跨域
var (
	upgrade = &websocket.Upgrader{
		// 允许跨域
		CheckOrigin: func(r *http.Request) bool {
			return true
		},
	}
)

// WebSocketBase TODO:服务基本函数
func WebSocketBase(c *gin.Context) {
	var (
		err error
		conn *websocket.Conn
		ws *wsConn
	)
	if conn, err = upgrade.Upgrade(c.Writer,c.Request,nil); err != nil{
		return
	}
	if ws, err = InitWebSocket(conn); err != nil{
		return
	}
	// 使得inChan和outChan耦合起来
	for {
		var data []byte
		if data, err = ws.InChanRead(); err != nil{
			goto ERR
		}
		if err = ws.OutChanWrite(data); err != nil{
			goto ERR
		}
	}
ERR:
	ws.CloseConn()
}
  • mian()函数结构
package main

import (
	"github.com/gin-gonic/gin"
	"testWebSocket/WebSocketHandler"
)

func main(){
	r := gin.Default()
	r.GET("/ws",WebSocketHandler.WebSocketBase)
	err := r.Run(":9999")
	if err != nil{
		return
	}
}

额外说明

这样处理便可实现并发安全。如果细心观察的话,大家会发现该 WebSocket 升级跨域处理中的 CheckOrigin 都默认 true,也就是说只要地址暴露,任何人不需要任何允准就可以随意连接,这免不了非法链接和恶行攻击的风险。如何杜绝这种风险呢,如果大家感兴趣,可以关注,后续我会再次基础上进行升级。当然欢迎大家评论转发。