WebSocket定义

  • websocket 是HTML5提供的一种在单个tcp连接上进行全双工通讯的协议,在websocket API中客服端和服务端只需要完成一次握手就可以创建长连接,然后进行双向数据传递。
  • websocket 不是一个全新的协议,而是基于http协议建立连接,创建过程:
- GET   ws://localhost:3000/ws?param 
- HOST:  localhost
-Upgrade: websocket
-Connection:  Upgrade
-Origin:  http:localhost:3000
-Sec-WebSocket-Key:  Client-random-string
-Sec-WebSocket-Version:  13
  • 然后服务端接收到该请求,将http协议升级为WebSocket协议,建立tcp连接,服务端响应如下
     -  HTTP/1.1 101 Switching Protocols
     -  Upgrade:  websocket
     -  Connection:  Upgrade
     -  Sec-WebSocket-Accept: sever-random-string
  • 目前支持WebSocket协议的浏览器

    1. Chrome
    2. Firefox
    3. IE >= 10
    4. Sarafi >= 6
    5. Android >= 4.4
    6. IOS >= 8
  • 目前网站实现推送技术的方式

    1. 拉模式(定时轮询访问接口获取数据)
      • 数据更新频率低
      • 在线用户数量多,则服务器查询负载很高
      • 定时轮询不能满足实时要求
    2. 推模式(服务端向客服端进行数据的推送)
      • 只有在数据更新时才有推送
      • 需要维护大量的在线长连接
      • 数据更新后可以立即推送
  • 基于WebSocket 协议做推送

    • 目前大多数浏览器都支持websocket协议
    • 协议升级后底层继续服用Http协议的底层socket完成通信
    • 以message进行数据传输,message会被切成多个frame帧进行传输
    • 真正全双工通信,建立连接后客服端和服务端是相等的,客户端与服务端建立连接以后不再需要发送http header就可以交换数据
  • Golang 下websocket的下的选择

    • 1.原生包 golang.org/x/net/websocket
      1. github.com/gorilla/websocket
        两者的比较区别


        image.png
  • 具体实现 基于github.com/gorilla/websocket

    1. connection.go
    // WsMessage ...
type WsMessage struct {
    messageType int
    data        []byte
}

// PushMessage 推送消息内容
type PushMessage struct {
    ID          int    `json:"id"`           // 编号
    EmployeeNO  string `json:"employee_no"`  // 员工号
    ReadFlag    int    `json:"read_flag"`    // 已读标记
    Extra       string `json:"extra"`        // 额外信息
    MessageType int    `json:"message_type"` // 消息类型
    Content     string `json:"content"`      // 消息内容
    Count       int    `json:"count"`        // 总数
}

// WsConnection 连接对象
type WsConnection struct {
    WsSocket  *websocket.Conn
    InChan    chan *WsMessage   // 读队列
    OutChan   chan *PushMessage // 写队列
    Mutex     sync.Mutex        // 避免重复关闭通道
    IsClosed  bool              // 是否关闭
    CloseChan chan byte         // 关闭通知
}

// WsReadLoop ...
func (wsConn *WsConnection) WsReadLoop() {
    for {
        msgType, data, err := wsConn.WsSocket.ReadMessage()
        if err != nil {
            goto error
        }
        req := &WsMessage{
            msgType,
            data,
        }
        fmt.Println(req)
        // 放入请求队列
        select {
        case wsConn.InChan <- req:
        case <-wsConn.CloseChan:
            fmt.Println("wsReadLoop close websocket")
            goto closed
        }
    }
error:
    wsConn.wsClose()
closed:
}

// WsWriteLoop ...
func (wsConn *WsConnection) WsWriteLoop() {
    for {
        select {
        //取一个应答
        case msg := <-wsConn.OutChan:
            if err := wsConn.WsSocket.WriteJSON(msg); err != nil {
                goto error
            }
        case <-wsConn.CloseChan:
            goto closed
        }
    }

error:
    wsConn.wsClose()
closed:
}

// WsWrite ...
func (wsConn *WsConnection) WsWrite(message PushMessage) error {
    select {
    case wsConn.OutChan <- &message:
    case <-wsConn.CloseChan:
        return errors.New("websocket closed")
    }
    return nil
}

// WsRead ...
func (wsConn *WsConnection) WsRead() (*WsMessage, error) {
    select {
    case msg := <-wsConn.InChan:
        return msg, nil
    case <-wsConn.CloseChan:
    }
    return nil, errors.New("websocket closed")
}

// ProcLoop 心跳检测
func (wsConn *WsConnection) ProcLoop() {
    // 启动一个goroutine 发送心跳
    go func() {
        for {
            time.Sleep(50 * time.Second)
            if err := wsConn.WsSocket.WriteMessage(websocket.PingMessage, []byte("heartbeat")); err != nil {
                logrus.Errorf("heartbeat fail %v", err.Error())
                wsConn.wsClose()
                break
            }
        }
    }()
}

func (wsConn *WsConnection) wsClose() {
    wsConn.WsSocket.Close()
    wsConn.Mutex.Lock()
    defer wsConn.Mutex.Unlock()
    if !wsConn.IsClosed {
        wsConn.IsClosed = true
        close(wsConn.CloseChan)
    }
}

2.建立连接,实现对每个用户推送,采用map缓存用户和连接之间的关系

var upGrader = websocket.Upgrader{
    // 允许跨域
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

//读写锁
var mutex sync.RWMutex

// Relation save websocket connection
var Relation = make(map[string]helpers.WsConnection)

// DoConnection websocket 协议
func DoConnection(ctx *gin.Context) {
    employeeNo := ctx.GetString("employeeNo")
    if employeeNo == "" {
        ctx.JSON(http.StatusUnprocessableEntity, http.StatusText(http.StatusUnprocessableEntity))
        return
    }
    // 升级为WebSocket协议
    wsSocket, err := upGrader.Upgrade(ctx.Writer, ctx.Request, nil)
    if err != nil {
        logrus.Errorf("upgrade websocket fail")
        return
    }
    wsConn := helpers.WsConnection{
        WsSocket:  wsSocket,
        InChan:    make(chan *helpers.WsMessage, 1000),
        OutChan:   make(chan *helpers.PushMessage, 1000),
        CloseChan: make(chan byte),
        IsClosed:  false,
    }
    // 添加连接对应关系
    handleConnect(employeeNo, wsConn)
}

func handleConnect(employeeNo string, wsConn helpers.WsConnection) {
    mutex.Lock()
    Relation[employeeNo] = wsConn
    mutex.Unlock()
    go wsConn.WsWriteLoop()
    go wsConn.ProcLoop()
}

3.测试客服端代码 index.html

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <script>
        window.addEventListener("load", function(evt) {
            var output = document.getElementById("output");
            var input = document.getElementById("input");
            var ws;
            var print = function(message) {
                var d = document.createElement("div");
                d.innerHTML = message;
                output.appendChild(d);
            };
            document.getElementById("open").onclick = function(evt) {
                if (ws) {
                    return false;
                }
                ws = new WebSocket("ws://localhost:5000/ws?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJjaXR5Ijp7ImlkIjpudWxsLCJuYW1lIjoiIn0sImNvbXBhbnkiOnsiaWQiOjEwMDAxLCJuYW1lIjoi5oiR5p2l5ZWm5oC76YOoIn0sImRlcGFydG1lbnQiOnsiaWQiOjEwMDE3MCwibmFtZSI6IuWuouacjemDqCJ9LCJlbWFpbCI6InpodWJvQHN1ZGl5aS5jbiIsImVtcGxveWVlX25vIjoiNjAzNTc3IiwiZXhwIjoxNTY1MTYwMTQ4LCJleHRlbnNpb25fbm8iOiIiLCJpZCI6MzAxNzEyLCJtb2JpbGUiOiIxNzYwMjQ0OTUwNyIsIm1vYmlsZV8xIjoiMTc2MDI0NDk1MDciLCJtb2JpbGVfMiI6IiIsIm5hbWUiOiLmnLHms6IiLCJuYW1lX3Nob3J0Ijoiemh1Ym8iLCJwaG9uZV9ubyI6IiIsInBvc2l0aW9uIjp7ImlkIjoyMDAzNzMsIm5hbWUiOiLlrqLmnI3nu4_nkIYifX0.7Em1UIKJs0q8Ns6tNzHn3FRccZ-xiPMnSMFVQq2OU3U");
                ws.onopen = function(evt) {
                    print("OPEN");
                }
                ws.onclose = function(evt) {
                    print("CLOSE");
                    ws = null;
                }
                ws.onmessage = function(evt) {
                    print("RESPONSE: " + evt.data);
                }
                ws.onerror = function(evt) {
                    print("ERROR: " + evt.data);
                }
                return false;
            };
            document.getElementById("send").onclick = function(evt) {
                if (!ws) {
                    return false;
                }
               // print("SEND: " + input.value);
                ws.send(input.value);
                return false;
            };
            document.getElementById("close").onclick = function(evt) {
                if (!ws) {
                    return false;
                }
                ws.close();
                return false;
            };
        });
    </script>
</head>
<body>
<table>
    <tr><td valign="top" width="50%">
            <p>Click "Open" to create a connection to the server,
                "Send" to send a message to the server and "Close" to close the connection.
                You can change the message and send multiple times.
            </p>
            <form>
                <button id="open">Open</button>
                <button id="close">Close</button>
                <input id="input" type="text" value="Hello world!">
                <button id="send">Send</button>
            </form>
        </td><td valign="top" width="50%">
            <div id="output"></div>
        </td></tr></table>
</body>
</html>
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;