转自:https://studygolang.com/articles/12483

bufio.Scanner

协议数据包定义

本文模拟一个日志服务器,该服务器接收客户端传到的数据包并显示出来

复制代码

type Package struct {
    Version        [2]byte // 协议版本,暂定V1
    Length         int16   // 数据部分长度
    Timestamp      int64   // 时间戳
    HostnameLength int16   // 主机名长度
    Hostname       []byte  // 主机名
    TagLength      int16   // 标签长度
    Tag            []byte  // 标签
    Msg            []byte  // 日志数据
}

复制代码

 

协议定义部分没有什么好讲的,根据具体的业务逻辑定义即可。

数据打包

binary

复制代码

func (p *Package) Pack(writer io.Writer) error {
    var err error
    err = binary.Write(writer, binary.BigEndian, &p.Version)
    err = binary.Write(writer, binary.BigEndian, &p.Length)
    err = binary.Write(writer, binary.BigEndian, &p.Timestamp)
    err = binary.Write(writer, binary.BigEndian, &p.HostnameLength)
    err = binary.Write(writer, binary.BigEndian, &p.Hostname)
    err = binary.Write(writer, binary.BigEndian, &p.TagLength)
    err = binary.Write(writer, binary.BigEndian, &p.Tag)
    err = binary.Write(writer, binary.BigEndian, &p.Msg)
    return err
}

复制代码

 

io.Writerbinary.BigEndian

数据解包

数据无关

复制代码

func (p *Package) Unpack(reader io.Reader) error {
    var err error
    err = binary.Read(reader, binary.BigEndian, &p.Version)
    err = binary.Read(reader, binary.BigEndian, &p.Length)
    err = binary.Read(reader, binary.BigEndian, &p.Timestamp)
    err = binary.Read(reader, binary.BigEndian, &p.HostnameLength)
    p.Hostname = make([]byte, p.HostnameLength)
    err = binary.Read(reader, binary.BigEndian, &p.Hostname)
    err = binary.Read(reader, binary.BigEndian, &p.TagLength)
    p.Tag = make([]byte, p.TagLength)
    err = binary.Read(reader, binary.BigEndian, &p.Tag)
    p.Msg = make([]byte, p.Length-8-2-p.HostnameLength-2-p.TagLength)
    err = binary.Read(reader, binary.BigEndian, &p.Msg)
    return err
}

复制代码

由于主机名、标签这种数据是不固定长度的,所以需要两个字节来标识数据长度,否则读取的时候只知道一个总的数据长度是无法区分主机名、标签名、日志数据的。

数据包的粘包问题解决

编码/解码
  1. 定长分隔(每个数据包最大为该长度) 缺点是数据不足时会浪费传输资源
  2. 特定字符分隔(如rn) 缺点是如果正文中有rn就会导致问题
  3. 在数据包中添加长度字段(本文采用的)
bufio.Scanner

复制代码

scanner := bufio.NewScanner(reader) // reader为实现了io.Reader接口的对象,如net.Conn
scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {
    if !atEOF && data[0] == 'V' { // 由于我们定义的数据包头最开始为两个字节的版本号,所以只有以V开头的数据包才处理
        if len(data) > 4 { // 如果收到的数据>4个字节(2字节版本号+2字节数据包长度)
            length := int16(0)
            binary.Read(bytes.NewReader(data[2:4]), binary.BigEndian, &length) // 读取数据包第3-4字节(int16)=>数据部分长度
            if int(length)+4 <= len(data) { // 如果读取到的数据正文长度+2字节版本号+2字节数据长度不超过读到的数据(实际上就是成功完整的解析出了一个包)
                return int(length) + 4, data[:int(length)+4], nil
            }
        }
    }
    return
})
// 打印接收到的数据包
for scanner.Scan() {
    scannedPack := new(Package)
    scannedPack.Unpack(bytes.NewReader(scanner.Bytes()))
    log.Println(scannedPack)
}

复制代码

scanner.Split

完整源码

复制代码

package main

import (
    "bufio"
    "bytes"
    "encoding/binary"
    "fmt"
    "io"
    "log"
    "os"
    "time"
)

type Package struct {
    Version        [2]byte // 协议版本
    Length         int16   // 数据部分长度
    Timestamp      int64   // 时间戳
    HostnameLength int16   // 主机名长度
    Hostname       []byte  // 主机名
    TagLength      int16   // Tag长度
    Tag            []byte  // Tag
    Msg            []byte  // 数据部分长度
}

func (p *Package) Pack(writer io.Writer) error {
    var err error
    err = binary.Write(writer, binary.BigEndian, &p.Version)
    err = binary.Write(writer, binary.BigEndian, &p.Length)
    err = binary.Write(writer, binary.BigEndian, &p.Timestamp)
    err = binary.Write(writer, binary.BigEndian, &p.HostnameLength)
    err = binary.Write(writer, binary.BigEndian, &p.Hostname)
    err = binary.Write(writer, binary.BigEndian, &p.TagLength)
    err = binary.Write(writer, binary.BigEndian, &p.Tag)
    err = binary.Write(writer, binary.BigEndian, &p.Msg)
    return err
}
func (p *Package) Unpack(reader io.Reader) error {
    var err error
    err = binary.Read(reader, binary.BigEndian, &p.Version)
    err = binary.Read(reader, binary.BigEndian, &p.Length)
    err = binary.Read(reader, binary.BigEndian, &p.Timestamp)
    err = binary.Read(reader, binary.BigEndian, &p.HostnameLength)
    p.Hostname = make([]byte, p.HostnameLength)
    err = binary.Read(reader, binary.BigEndian, &p.Hostname)
    err = binary.Read(reader, binary.BigEndian, &p.TagLength)
    p.Tag = make([]byte, p.TagLength)
    err = binary.Read(reader, binary.BigEndian, &p.Tag)
    p.Msg = make([]byte, p.Length-8-2-p.HostnameLength-2-p.TagLength)
    err = binary.Read(reader, binary.BigEndian, &p.Msg)
    return err
}

func (p *Package) String() string {
    return fmt.Sprintf("version:%s length:%d timestamp:%d hostname:%s tag:%s msg:%s",
        p.Version,
        p.Length,
        p.Timestamp,
        p.Hostname,
        p.Tag,
        p.Msg,
    )
}

func main() {
    hostname, err := os.Hostname()
    if err != nil {
        log.Fatal(err)
    }

    pack := &Package{
        Version:        [2]byte{'V', '1'},
        Timestamp:      time.Now().Unix(),
        HostnameLength: int16(len(hostname)),
        Hostname:       []byte(hostname),
        TagLength:      4,
        Tag:            []byte("demo"),
        Msg:            []byte(("现在时间是:" + time.Now().Format("2006-01-02 15:04:05"))),
    }
    pack.Length = 8 + 2 + pack.HostnameLength + 2 + pack.TagLength + int16(len(pack.Msg))

    buf := new(bytes.Buffer)
    // 写入四次,模拟TCP粘包效果
    pack.Pack(buf)
    pack.Pack(buf)
    pack.Pack(buf)
    pack.Pack(buf)
    // scanner
    scanner := bufio.NewScanner(buf)
    scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {
        if !atEOF && data[0] == 'V' {
            if len(data) > 4 {
                length := int16(0)
                binary.Read(bytes.NewReader(data[2:4]), binary.BigEndian, &length)
                if int(length)+4 <= len(data) {
                    return int(length) + 4, data[:int(length)+4], nil
                }
            }
        }
        return
    })
    for scanner.Scan() {
        scannedPack := new(Package)
        scannedPack.Unpack(bytes.NewReader(scanner.Bytes()))
        log.Println(scannedPack)
    }
    if err := scanner.Err(); err != nil {
        log.Fatal("无效数据包")
    }
}

复制代码

写在最后

golang作为一门强大的网络编程语言,实现自定义协议是非常重要的,实际上实现自定义协议也不是很难,以下几个步骤:

  1. 数据包编码
  2. 数据包解码
  3. 处理TCP粘包问题
  4. 断线重连(可以使用心跳实现)(非必须)

本文引用自我自己的博客golang解决TCP粘包问题

 

 

深入理解 Go 标准库之 bufio.Scanner

yujiahaol68 · 2017-12-10 02:49:26 · 4966 次点击 · 预计阅读时间 11 分钟 · 大约5小时之前 开始浏览    

这是一个创建于 2017-12-10 02:49:26 的文章,其中的信息可能已经有所发展或是发生改变。

socketIO 缓冲区缓冲 IO
"foo  bar   baz"

如果我们只想得到上面字符串中的单词,那么扫描器能帮我们按顺序检索出 "foo","bar" 和 "baz" 这三个单词( 查看源码 )

package main

import (
    "bufio"
    "fmt"
    "strings"
)

func main() {
    input := "foo  bar   baz"
    scanner := bufio.NewScanner(strings.NewReader(input))
    scanner.Split(bufio.ScanWords)
    for scanner.Scan() {
        fmt.Println(scanner.Text())
    }
}

输出结果:

foo
bar
baz
Scannerio.Reader
bytesstrings
splitsplit
func(data []byte, atEOF bool) (advance int, token []byte, err error)
Split

1. 需要补充更多的数据

0, nil, nil
package main

import (
    "bufio"
    "fmt"
    "strings"
)

func main() {
    input := "abcdefghijkl"
    scanner := bufio.NewScanner(strings.NewReader(input))
    split := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
        fmt.Printf("%t\t%d\t%s\n", atEOF, len(data), data)
        return 0, nil, nil
    }
    scanner.Split(split)
    buf := make([]byte, 2)
    scanner.Buffer(buf, bufio.MaxScanTokenSize)
    for scanner.Scan() {
        fmt.Printf("%s\n", scanner.Text())
    }
}

输出结果:

false    2    ab
false    4    abcd
false    8    abcdefgh
false    12    abcdefghijkl
true    12    abcdefghijkl
splitScanner
buf := make([]byte, 2)
scanner.Buffer(buf, bufio.MaxScanTokenSize)
splitscannersplitsplit

缓冲区的默认大小是 4096 个字节。

atEOFsplitscannerflagscanner.Split()falseErr
package main

import (
    "bufio"
    "errors"
    "fmt"
    "strings"
)

func main() {
    input := "abcdefghijkl"
    scanner := bufio.NewScanner(strings.NewReader(input))
    split := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
        fmt.Printf("%t\t%d\t%s\n", atEOF, len(data), data)
        if atEOF {
            return 0, nil, errors.New("bad luck")
        }
        return 0, nil, nil
    }
    scanner.Split(split)
    buf := make([]byte, 12)
    scanner.Buffer(buf, bufio.MaxScanTokenSize)
    for scanner.Scan() {
        fmt.Printf("%s\n", scanner.Text())
    }
    if scanner.Err() != nil {
        fmt.Printf("error: %s\n", scanner.Err())
    }
}

输出结果:

false    12    abcdefghijkl
true    12    abcdefghijkl
error: bad luck
atEOFsplit
foo
bar
baz
\n
package main

import (
    "bufio"
    "fmt"
    "strings"
)

func main() {
    input := "foo\nbar\nbaz"
    scanner := bufio.NewScanner(strings.NewReader(input))
    // 事实上这里并不需要传入 ScanLines 因为这原本就是标准库默认的 split 函数
    scanner.Split(bufio.ScanLines)
    for scanner.Scan() {
        fmt.Println(scanner.Text())
    }
}

输出结果:

foo
bar
baz

2. 已找到字符标记(token)

splitsplit
(4, "foo")
(4, "foo")
(3, "foo")
foo
package main

import (
    "bufio"
    "bytes"
    "fmt"
    "io"
    "strings"
)

func main() {
    input := "foofoofoo"
    scanner := bufio.NewScanner(strings.NewReader(input))
    split := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
        if bytes.Equal(data[:3], []byte{'f', 'o', 'o'}) {
            return 3, []byte{'F'}, nil
        }
        if atEOF {
            return 0, nil, io.EOF
        }
        return 0, nil, nil
    }
    scanner.Split(split)
    for scanner.Scan() {
        fmt.Printf("%s\n", scanner.Text())
    }
}

输出结果:

F
F
F

3. 报错

split
package main

import (
    "bufio"
    "errors"
    "fmt"
    "strings"
)

func main() {
    input := "abcdefghijkl"
    scanner := bufio.NewScanner(strings.NewReader(input))
    split := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
        return 0, nil, errors.New("bad luck")
    }
    scanner.Split(split)
    for scanner.Scan() {
        fmt.Printf("%s\n", scanner.Text())
    }
    if scanner.Err() != nil {
        fmt.Printf("error: %s\n", scanner.Err())
    }
}

输出结果:

error: bad luck

然而,其中有一种特殊的错误并不会使扫描器立即停止工作。

ErrFinalToken

扫描器给信号(signal) 提供了一个叫做 最终标记 的选项,这是一个不会打破循环(扫描过程依然返回真)的特殊标记,但随后的一系列调用会使扫描动作立刻终止。

func (s *Scanner) Scan() bool {
    if s.done {
        return false
    }
    ...

在 Go 语言官方 issue #11836 中提供了一种方法使得当发现特殊标记时也能够立即停止扫描。查看源码

package main

import (
    "bufio"
    "bytes"
    "fmt"
    "strings"
)

func split(data []byte, atEOF bool) (advance int, token []byte, err error) {
    advance, token, err = bufio.ScanWords(data, atEOF)
    if err == nil && token != nil && bytes.Equal(token, []byte{'e', 'n', 'd'}) {
        return 0, []byte{'E', 'N', 'D'}, bufio.ErrFinalToken
    }
    return
}

func main() {
    input := "foo end bar"
    scanner := bufio.NewScanner(strings.NewReader(input))
    scanner.Split(split)
    for scanner.Scan() {
        fmt.Println(scanner.Text())
    }
    if scanner.Err() != nil {
        fmt.Printf("Error: %s\n", scanner.Err())
    }
}

输出结果:

foo
END
io.EOFErrFinalTokenErrnil

最大标记大小 / ErrTooLong

64 * 1024
package main

import (
    "bufio"
    "fmt"
    "strings"
)

func main() {
    input := strings.Repeat("x", bufio.MaxScanTokenSize)
    scanner := bufio.NewScanner(strings.NewReader(input))
    for scanner.Scan() {
        fmt.Println(scanner.Text())
    }
    if scanner.Err() != nil {
        fmt.Println(scanner.Err())
    }
}
bufio.Scanner: token too long
buf := make([]byte, 10)
input := strings.Repeat("x", 20)
scanner := bufio.NewScanner(strings.NewReader(input))
scanner.Buffer(buf, 20)

for scanner.Scan() {
    fmt.Println(scanner.Text())
}

if scanner.Err() != nil {
    fmt.Println(scanner.Err())
}

输出结果:

bufio.Scanner: token too long

防止死循环

atEOFsplit
package main

import (
    "bufio"
    "bytes"
    "fmt"
    "strings"
)

func main() {
    input := "foo|bar"
    scanner := bufio.NewScanner(strings.NewReader(input))
    split := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
        if i := bytes.IndexByte(data, '|'); i >= 0 {
            return i + 1, data[0:i], nil
        }
        if atEOF {
            return len(data), data[:len(data)], nil
        }
        return 0, nil, nil
    }
    scanner.Split(split)
    for scanner.Scan() {
        if scanner.Text() != "" {
            fmt.Println(scanner.Text())
        }
    }
}
splitatEOF(0, [], nil)splitpanic
foo
bar
panic: bufio.Scan: 100 empty tokens without progressing

当我第一次阅读有关 Scanner 或是 SplitFunc 的文档时我并没能弄明白在所有情况下它们是如何工作的,即便是阅读源代码也帮助甚微,因为 Scan 看上去真的很复杂,希望这篇文章能够帮助其他人更好地理清这块的细节。

本文由 GCTT 原创编译,Go语言中文网 荣誉推出