在Server和Client通讯中,由于网络等原因很可能会发生数据丢包的现象。如果数据缺失,服务端接收的信息不完整,就会造成混乱。

我们需要在Server和Client之间建立一个通讯协议,通过协议中的规则,判断当前接收到的信息是否完整。根据信息的完整情况,采取不同的处理方式。

通讯协议protocol的核心就是设计一个头部。如果传来的信息不包含这个头部,就说明当前信息和之前的信息是同一条。那么就把当前信息和之前的那条信息合并成一条。

而协议主要包含的功能是封装(Enpack)和解析(Depack)。Enpack是客户端对信息进行数据封装。封装之后可以传递给服务器。Depack是服务器对信息进行数据解析。

其中有个Const部分,用于定义头部、头部长度、客户端传入信息长度。

在代码中,我们这样定义:

const (

ConstHeader = "Headers"

ConstHeaderLength = 7

ConstMLength = 4

)

头部的内容为"Headers",长度为7。所以ConstHeaderLenth=7.

而信息传递过程中,我们会把int类型转换成byte类型。一个int的长度等于4个byte的长度。因此,我们设置ConstMLength=4.代表客户端的传来的信息大小。

自定义协议protocal的代码示例如下:

/**

* protocol

* @Author: Jian Junbo

* @Email: junbojian@qq.com

* @Create: 2017/9/14 11:49

*

* Description: 通讯协议处理

*/

package protocol

import (

"bytes"

"encoding/binary"

)

const (

ConstHeader = "Headers"

ConstHeaderLength = 7

ConstMLength = 4

)

//封包

func Enpack(message []byte) []byte {

return append(append([]byte(ConstHeader), IntToBytes(len(message))...), message...)

}

//解包

func Depack(buffer []byte) []byte {

length := len(buffer)

var i int

data := make([]byte, 32)

for i = 0; i < length; i++ {

if length < i + ConstHeaderLength + ConstMLength{

break

}

if string(buffer[i:i+ConstHeaderLength]) == ConstHeader {

messageLength := ByteToInt(buffer[i+ConstHeaderLength : i+ConstHeaderLength+ConstMLength])

if length < i+ConstHeaderLength+ConstMLength+messageLength {

break

}

data = buffer[i+ConstHeaderLength+ConstMLength : i+ConstHeaderLength+ConstMLength+messageLength]

}

}

if i == length {

return make([]byte, 0)

}

return data

}

//字节转换成整形

func ByteToInt(n []byte) int {

bytesbuffer := bytes.NewBuffer(n)

var x int32

binary.Read(bytesbuffer, binary.BigEndian, &x)

return int(x)

}

//整数转换成字节

func IntToBytes(n int) []byte {

x := int32(n)

bytesBuffer := bytes.NewBuffer([]byte{})

binary.Write(bytesBuffer, binary.BigEndian, x)

return bytesBuffer.Bytes()

}

Server端主要通过协议来解析客户端发送来的信息。建立一个函数,用来完成连接对接收信息的处理。其中建立了通道readerChannel,并把接收来的信息放在通道里。

在放入通道之前,使用protocol和Depack对信息进行解析。

//连接处理

func handleConnection(conn net.Conn) {

//缓冲区,存储被截断的数据

tmpBuffer := make([]byte, 0)

//接收解包

readerChannel := make(chan []byte, 10000)

go reader(readerChannel)

buffer := make([]byte, 1024)

for{

n, err := conn.Read(buffer)

if err != nil{

Log(conn.RemoteAddr().String(), "connection error: ", err)

return

}

tmpBuffer = protocol.Depack(append(tmpBuffer, buffer[:n]...))

readerChannel

}

defer conn.Close()

}

如果信息读取发生错误(包括读取到信息结束符EOF),都会打印错误信息,并跳出循环。

Log(conn.RemoteAddr().String(), "connection error: ", err)

return

由于通道内的数据是[]byte型的。需要转换成string。这个工作有专门的获取通道数据的reader(readerChannel chan []byte)来完成。

//获取通道数据

func reader(readerchannel chan []byte) {

for{

select {

case data :=

Log(string(data)) //打印通道内的信息

}

}

}

查看Server端代码示例:

/**

* MySocketProtocalServer

* @Author: Jian Junbo

* @Email: junbojian@qq.com

* @Create: 2017/9/14 13:54

* Copyright (c) 2017 Jian Junbo All rights reserved.

*

* Description: 服务端,接收客户端传来的信息

*/

package main

import (

"net"

"fmt"

"os"

"log"

"protocol"

)

func main() {

netListen, err := net.Listen("tcp", "localhost:7373")

CheckErr(err)

defer netListen.Close()

Log("Waiting for client ...") //启动后,等待客户端访问。

for{

conn, err := netListen.Accept() //监听客户端

if err != nil {

Log(conn.RemoteAddr().String(), "发了了错误:", err)

continue

}

Log(conn.RemoteAddr().String(), "tcp connection success")

go handleConnection(conn)

}

}

//连接处理

func handleConnection(conn net.Conn) {

//缓冲区,存储被截断的数据

tmpBuffer := make([]byte, 0)

//接收解包

readerChannel := make(chan []byte, 10000)

go reader(readerChannel)

buffer := make([]byte, 1024)

for{

n, err := conn.Read(buffer)

if err != nil{

Log(conn.RemoteAddr().String(), "connection error: ", err)

return

}

tmpBuffer = protocol.Depack(append(tmpBuffer, buffer[:n]...))

readerChannel

}

defer conn.Close()

}

//获取通道数据

func reader(readerchannel chan []byte) {

for{

select {

case data :=

Log(string(data)) //打印通道内的信息

}

}

}

//日志处理

func Log(v ...interface{}) {

log.Println(v...)

}

//错误处理

func CheckErr(err error) {

if err != nil {

fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())

os.Exit(1)

}

}

客户端使用Enpack封装要发送到服务端的信息后,写入连接conn中。

/**

* MySocketProtocalClient

* @Author: Jian Junbo

* @Email: junbojian@qq.com

* @Create: 2017/9/14 15:23

* Copyright (c) 2017 Jian Junbo All rights reserved.

*

* Description:

*/

package main

import (

"net"

"time"

"strconv"

"protocol"

"fmt"

"os"

)

//发送100次请求

func send(conn net.Conn) {

for i := 0; i < 100; i++ {

session := GetSession()

words := "{\"ID\":\""+strconv.Itoa(i)+"\",\"Session\":\""+session+"20170914165908\",\"Meta\":\"golang\",\"Content\":\"message\"}"

conn.Write(protocol.Enpack([]byte(words)))

fmt.Println(words) //打印发送出去的信息

}

fmt.Println("send over")

defer conn.Close()

}

//用当前时间做识别。当前时间的十进制整数

func GetSession() string {

gs1 := time.Now().Unix()

gs2 := strconv.FormatInt(gs1, 10)

return gs2

}

func main() {

server := "localhost:7373"

tcpAddr, err := net.ResolveTCPAddr("tcp4", server)

if err != nil{

fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())

os.Exit(1)

}

conn, err := net.DialTCP("tcp", nil, tcpAddr)

if err != nil{

fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())

os.Exit(1)

}

fmt.Println("connect success")

send(conn)

}

补充:golang从0到1利用socket编程实现一个简单的http服务器

开始编程

第一份代码

package main

import (

"fmt"

"net"

)

func accept_request_thread(conn net.Conn) {

defer conn.Close()

for {

// 创建一个新切片, 用作保存数据的缓冲区

buf := make([]byte, 1024)

n, err := conn.Read(buf) // 从conn中读取客户端发送的数据内容

if err != nil {

fmt.Printf("客户端退出 err=%v\n", err)

return

}

fmt.Printf(" 接受消息 %s\n", string(buf[:n]))

}

}

func main() {

listen, err := net.Listen("tcp", ":8888") // 创建用于监听的 socket

if err != nil {

fmt.Println("listen err=", err)

return

}

fmt.Println("监听套接字,创建成功, 服务器开始监听。。。")

defer listen.Close() // 服务器结束前关闭 listener

// 循环等待客户端来链接

for {

fmt.Println("阻塞等待客户端来链接...")

conn, err := listen.Accept() // 创建用户数据通信的socket

if err != nil {

fmt.Println("Accept() err=", err)

} else {

fmt.Println("通信套接字,创建成功。。。")

}

// 这里准备起一个协程,为客户端服务

go accept_request_thread(conn)

}

}

浏览器发送一个get请求:

http://192.168.0.20:8888/api/camera/get_ptz?camera_id=1324566666789876543

服务端接受到的消息如下:

http://192.168.0.20:8888/api/camera/get_ptz?camera_id=1324566666789876543

6d19e787372ca6c54af636c83ccd6d51.png

我们接下来的任务就是 解析这些字符串,从中获取 当前是什么方法,什么请求,参数是什么?

先定义一个小目标,获取当前是什么方法。

处理一个简单的get请求

package main

import (

"encoding/json"

"fmt"

"log"

"net"

"strings"

)

func unimplemented(conn net.Conn){

var buf string

buf = "HTTP/1.0 501 Method Not Implemented\r\n"

_, _ = conn.Write([]byte(buf))

buf = "Server: httpd/0.1.0\r\n"

_, _ = conn.Write([]byte(buf))

buf = "Content-Type: text/html\r\n"

_, _ = conn.Write([]byte(buf))

buf = "\r\n"

_, _ = conn.Write([]byte(buf))

buf = "

Method Not Implemented\r\n"

_, _ = conn.Write([]byte(buf))

buf = "

\r\n"

_, _ = conn.Write([]byte(buf))

buf = "

HTTP request method not supported.\r\n"

_, _ = conn.Write([]byte(buf))

buf = "\r\n"

_, _ = conn.Write([]byte(buf))

}

func accept_request_thread(conn net.Conn) {

defer conn.Close()

var i int

buf := make([]byte, 1024)

n, err := conn.Read(buf) // 从conn中读取客户端发送的数据内容

if err != nil {

fmt.Printf("客户端退出 err=%v\n", err)

return

}

// 获取方法

i = 0

var method_bt strings.Builder

for(i < n && buf[i] != ' '){

method_bt.WriteByte(buf[i])

i++;

}

method := method_bt.String()

if(method != "GET"){

unimplemented(conn)

return

}

for(i < n && buf[i] == ' '){

i++

}

//api/camera/get_ptz?camera_id=1324566666789876543

var url_bt strings.Builder

for(i < n && buf[i] != ' '){

url_bt.WriteByte(buf[i])

i++;

}

url := url_bt.String()

if(method == "GET"){

//url ---> /api/camera/get_ptz?camera_id=1324566666789876543

// 跳到第一个?

var path, query_string string

j := strings.IndexAny(url, "?")

if(j != -1){

path = url[:j]

if(j + 1 < len(url)){

query_string = url[j+1:]

}

}else{

path = url

}

fmt.Print(path + "请求已经创建\t")

resp := execute(path, query_string)// =1324566666789876543

fmt.Println("返回", string(resp))

header(conn, "application/json", len(resp));

_ , err := conn.Write(resp)

if(err != nil){

fmt.Println(err)

}

}

}

//回应客户端必须先设置好head头,浏览器才能解析

func header(conn net.Conn, content_type string , length int ) {

var buf string

buf = "HTTP/1.0 200 OK\r\n"

_, _ = conn.Write([]byte(buf))

buf = "Server: httpd/0.1.0\r\n"

_, _ = conn.Write([]byte(buf))

buf = "Content-Type: " + content_type + "\r\n"

_, _ = conn.Write([]byte(buf))

_, _ = fmt.Sscanf(buf, "Content-Length: %d\r\n", length)

buf = "Content-Type: " + content_type + "\r\n"

_, _ = conn.Write([]byte(buf))

buf = "\r\n"

_, _ = conn.Write([]byte(buf))

}

func execute(path string, query_string string) ([]byte) {

query_params := make(map[string]string)

parse_query_params(query_string, query_params)

if("/api/camera/get_ptz" == path){

/*

* do something

*/

camera_id := query_params["camera_id"]

resp := make(map[string]interface{})

resp["camera_id"] = camera_id

resp["code"] = 200

resp["msg"] = "ok"

rs, err := json.Marshal(resp)

if err != nil{

log.Fatalln(err)

}

return rs

}else if("get_abc" == path){

/*

* do something

*/

return []byte("abcdcvfdswa")

}

return []byte("do't match")

}

/*map作为函数入参是作为指针进行传递的

函数里面对map进行修改时,会同时修改源map的值,但是将map修改为nil时,则达不到预期效果。*/

// camera_id=1324566666789876543&tt=%E5%88%9B%E5%BB%BA%E6%88%90%E5%8A%9F

func parse_query_params(query_string string, query_params map[string]string) {

kvs := strings.Split(query_string, "&")

if(len(kvs) == 0){

return

}

for _, kv := range kvs {

kv := strings.Split(kv, "=")

if(len(kv) != 2){

continue

}

query_params[kv[0]] = kv[1]

}

}

func main() {

listen, err := net.Listen("tcp", ":8888") // 创建用于监听的 socket

if err != nil {

fmt.Println("listen err=", err)

return

}

fmt.Println("监听套接字,创建成功, 服务器开始监听。。。")

defer listen.Close() // 服务器结束前关闭 listener

// 循环等待客户端链接

for {

fmt.Println("阻塞等待客户端链接...")

conn, err := listen.Accept() // 创建用户数据通信的socket

if err != nil {

panic("Accept() err= " + err.Error())

}

// 这里准备起一个协程,为客户端服务

go accept_request_thread(conn)

}

}

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。如有错误或未考虑完全的地方,望不吝赐教。