type SegmentWAL struct { ...... cur *bufio.Writer curN int64 segmentSize int64 // default 256 * 1024 * 1024 // 256 MB

先将这个bufio.Writer如何创建的过程暂且不表,只要知道这是一个bufio.Writer类型的即可。回到SegmentWAL的write方法中。这个方法主要执行的是w.writeTo()方法,但是当条件一成立的时候,先执行w.cut()方法。这个条件解释如下:

w.cur为nil
m.curN超过了最大的segmentSize,
加上新数据的长度超过segmentSize且新数据规模小于segmentSize

只要符合其中一个条件,就会执行w.cut(),接着再执行wr.WriteTo()方法。在wr.WriteTo()方法中主要有几个知识点:

crc32包实现了32位循环冗余校验(CRC-32)的校验和算法,参见:http://en.wikipedia.org/wiki/Cyclic_redundancy_check
wr = io.MultiWriter(crc32, wr), 类似于linux tee命令,详见http://man.linuxde.net/tee
写入到wr中的是这么五段 WALEntryType类型+ flag + buf长度 + buf本身 + 校验和
返回写入的总长度

  • 请尽量让自己的回复能够对别人有帮助
  • `单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传
  • 从两周前我就开始阅读prometheus时序数据库存储部分。此前我采用了顺序阅读的方式,虽然这种方式遵循着代码执行顺序,但经常由于不理解各模块的作用,以及各模块调用过程又比较复杂,整个阅读过程困难重重。必须改变阅读方式,模块化地阅读代码,或者才是正道。也决心写点文章,记录下阅读代码的过程。
    今天首先阅读的wal.go的代码,这部分代码是WriteAheadLog部分的代码实现。

    // WAL is a write ahead log that can log new series labels and samples.
    // It must be completely read before new entries are logged.
    type WAL interface {
        Reader() WALReader
        LogSeries([]RefSeries) error
        LogSamples([]RefSample) error
        LogDeletes([]Stone) error
        Truncate(mint int64, keep func(uint64) bool) error
        Close() error
    

    该接口有两个实现noWAL和SegmentWAL。noWAL即什么也不干的wal,所有方法几乎都是直接return,重点来看下SegmentWAL。

    // SegmentWAL is a write ahead log for series data.
    type SegmentWAL struct {
        mtx     sync.Mutex
        metrics *walMetrics
        dirFile *os.File
        files   []*segmentFile
        logger        log.Logger
        flushInterval time.Duration
        segmentSize   int64
        crc32 hash.Hash32
        cur   *bufio.Writer
        curN  int64
        stopc   chan struct{}
        donec   chan struct{}
        actorc  chan func() error // sequentialized background operations
        buffers sync.Pool
    

    先不要去管这些结构体成员, 等后面方法中用到了再去理解,这里贴图只是为了后面代码提到的时候方便翻阅。首先来看看SegementWAL实现的func (w *SegmentWAL)LogSeries(series []RefSeries)error 方法。这个方法接收一个slice,类型为结构体RefSeries,看名字,这个结构体似乎是用户指向一个Series,下图给出了这个结构体的定义。

    // RefSeries is the series labels with the series ID.
    type RefSeries struct {
        Ref    uint64
        Labels labels.Labels
    

    结构很简单,由一个uint64和lables.labels组成。Labels是时序数据的标签,其实质是KV组成的数组,prometheus里面将时序数据的metric也包含在Labels里面,即name: $name的形式。也就是说Lables能够唯一地表示一条时间序列。uint64则是这条时间序列的id。也就是说LogSeries方法就是存储标签名称以及id使用的。接下来看看这个方法是如何实现的。

    // LogSeries writes a batch of new series labels to the log.
    // The series have to be ordered.
    func (w *SegmentWAL) LogSeries(series []RefSeries) error {
        //获得一个buffer,至于buffer如何实现,后面再表
        buf := w.getBuffer()
        //将series存储到buffer,返回一个uint8类型的flag
        flag := w.encodeSeries(buf, series)
        //加锁,读写可能会竞争
        w.mtx.Lock()
        defer w.mtx.Unlock()
        //将buf内的内容写入到xx
        err := w.write(WALEntrySeries, flag, buf.get())
        w.putBuffer(buf)
        if err != nil {
            return errors.Wrap(err, "log series")
        tf := w.head()
        for _, s := range series {
            if tf.minSeries > s.Ref {
                tf.minSeries = s.Ref
        return nil
    

    这里用到了几个SegmentWAL结构体成员变量,首先是

    type SegmentWAL struct {
        ......
        buffers sync.Pool
    

    sync.Pool是官方的包,这里给出一段介绍

    众所周知,go是自动垃圾回收的(garbage collector),这大大减少了程序编程负担。但gc是一把双刃剑,带来了编程的方便但同时也增加了运行时开销,使用不当甚至会严重影响程序的性能。因此性能要求高的场景不能任意产生太多的垃圾(有gc但又不能完全依赖它挺恶心的),如何解决呢?那就是要重用对象了,我们可以简单的使用一个chan把这些可重用的对象缓存起来,但如果很多goroutine竞争一个chan性能肯定是问题.....由于golang团队认识到这个问题普遍存在,为了避免大家重造车轮,因此官方统一出了一个包Pool。原文:https://blog.csdn.net/yongjian_lian/article/details/42058893

    简单地说,就是缓存对象, 不会被gc清理掉。需要用的时候,再取出来。这里重用的对象或者说结构体是encbuf,是一个结构体,

    // encbuf is a helper type to populate a byte slice with various types.
    type encbuf struct {
        b []byte
        c [binary.MaxVarintLen64]byte
    

    encbuf使用了binary包,简单的数字与字节序列的转换以及变长值的编解码,prometheus采用了BigEndian方式进行编解码。下面来看下encodeSeries方法

    func (w *SegmentWAL) encodeSeries(buf *encbuf, series []RefSeries) uint8 {
        for _, s := range series {
            buf.putBE64(s.Ref)
            buf.putUvarint(len(s.Labels))
            for _, l := range s.Labels {
                buf.putUvarintStr(l.Name)
                buf.putUvarintStr(l.Value)
        return walSeriesSimple
    

    对于多组RefSeries,首先写入series的RefId,然后用变长编码写入labels个数,最后用变长字符串的形式分别写入标签的名字和值。
    写到buf里面之后,调用SegmentWAL的write方法

    func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error {
        // Cut to the next segment if the entry exceeds the file size unless it would also
        // exceed the size of a new segment.
        // TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize.
        var (
            sz    = int64(len(buf)) + 6
            newsz = w.curN + sz
        // XXX(fabxc): this currently cuts a new file whenever the WAL was newly opened.
        // Probably fine in general but may yield a lot of short files in some cases.
        if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize {
            if err := w.cut(); err != nil {
                return err
        n, err := w.writeTo(w.cur, w.crc32, t, flag, buf)
        w.curN += int64(n)
        return err
    func (w *SegmentWAL) writeTo(wr io.Writer, crc32 hash.Hash, t WALEntryType, flag uint8, buf []byte) (int, error) {
        if len(buf) == 0 {
            return 0, nil
        crc32.Reset()
        wr = io.MultiWriter(crc32, wr)
        var b [6]byte
        b[0] = byte(t)
        b[1] = flag
        binary.BigEndian.PutUint32(b[2:], uint32(len(buf)))
        n1, err := wr.Write(b[:])
        if err != nil {
            return n1, err
        n2, err := wr.Write(buf)
        if err != nil {
            return n1 + n2, err
        n3, err := wr.Write(crc32.Sum(b[:0]))
        return n1 + n2 + n3, err
    

    首先来看一下一个结构体成员

    type SegmentWAL struct {
        ......
        cur   *bufio.Writer
        curN  int64
        segmentSize   int64 // default 256 * 1024 * 1024 // 256 MB
    

    先将这个bufio.Writer如何创建的过程暂且不表,只要知道这是一个bufio.Writer类型的即可。回到SegmentWAL的write方法中。这个方法主要执行的是w.writeTo()方法,但是当条件一成立的时候,先执行w.cut()方法。这个条件解释如下:

    w.cur为nil
    m.curN超过了最大的segmentSize,
    加上新数据的长度超过segmentSize且新数据规模小于segmentSize

    只要符合其中一个条件,就会执行w.cut(),接着再执行wr.WriteTo()方法。在wr.WriteTo()方法中主要有几个知识点:

    crc32包实现了32位循环冗余校验(CRC-32)的校验和算法,参见:http://en.wikipedia.org/wiki/Cyclic_redundancy_check
    wr = io.MultiWriter(crc32, wr), 类似于linux tee命令,详见http://man.linuxde.net/tee
    写入到wr中的是这么五段 WALEntryType类型+ flag + buf长度 + buf本身 + 校验和
    返回写入的总长度

     最高记录 3727 ©2013-2019 studygolang.com Go语言中文网,中国 Golang 社区,致力于构建完善的 Golang 中文社区,Go语言爱好者的学习家园。 Powered by StudyGolang(Golang + MySQL)  • · CDN 采用 七牛云 VERSION: V3.5.0 · 8.541384ms · 为了更好的体验,本站推荐使用 Chrome 或 Firefox 浏览器