先将这个bufio.Writer如何创建的过程暂且不表,只要知道这是一个bufio.Writer类型的即可。回到SegmentWAL的write方法中。这个方法主要执行的是w.writeTo()方法,但是当条件一成立的时候,先执行w.cut()方法。这个条件解释如下:
w.cur为nil
m.curN超过了最大的segmentSize,
加上新数据的长度超过segmentSize且新数据规模小于segmentSize
只要符合其中一个条件,就会执行w.cut(),接着再执行wr.WriteTo()方法。在wr.WriteTo()方法中主要有几个知识点:
crc32包实现了32位循环冗余校验(CRC-32)的校验和算法,参见:http://en.wikipedia.org/wiki/Cyclic_redundancy_check
wr = io.MultiWriter(crc32, wr), 类似于linux tee命令,详见http://man.linuxde.net/tee
写入到wr中的是这么五段 WALEntryType类型+ flag + buf长度 + buf本身 + 校验和
返回写入的总长度
`单行代码`
从两周前我就开始阅读prometheus时序数据库存储部分。此前我采用了顺序阅读的方式,虽然这种方式遵循着代码执行顺序,但经常由于不理解各模块的作用,以及各模块调用过程又比较复杂,整个阅读过程困难重重。必须改变阅读方式,模块化地阅读代码,或者才是正道。也决心写点文章,记录下阅读代码的过程。
今天首先阅读的wal.go的代码,这部分代码是WriteAheadLog部分的代码实现。
// WAL is a write ahead log that can log new series labels and samples.
// It must be completely read before new entries are logged.
type WAL interface {
Reader() WALReader
LogSeries([]RefSeries) error
LogSamples([]RefSample) error
LogDeletes([]Stone) error
Truncate(mint int64, keep func(uint64) bool) error
Close() error
该接口有两个实现noWAL和SegmentWAL。noWAL即什么也不干的wal,所有方法几乎都是直接return,重点来看下SegmentWAL。
// SegmentWAL is a write ahead log for series data.
type SegmentWAL struct {
mtx sync.Mutex
metrics *walMetrics
dirFile *os.File
files []*segmentFile
logger log.Logger
flushInterval time.Duration
segmentSize int64
crc32 hash.Hash32
cur *bufio.Writer
curN int64
stopc chan struct{}
donec chan struct{}
actorc chan func() error // sequentialized background operations
buffers sync.Pool
先不要去管这些结构体成员, 等后面方法中用到了再去理解,这里贴图只是为了后面代码提到的时候方便翻阅。首先来看看SegementWAL实现的func (w *SegmentWAL)LogSeries(series []RefSeries)error 方法。这个方法接收一个slice,类型为结构体RefSeries,看名字,这个结构体似乎是用户指向一个Series,下图给出了这个结构体的定义。
// RefSeries is the series labels with the series ID.
type RefSeries struct {
Ref uint64
Labels labels.Labels
结构很简单,由一个uint64和lables.labels组成。Labels是时序数据的标签,其实质是KV组成的数组,prometheus里面将时序数据的metric也包含在Labels里面,即name: $name的形式。也就是说Lables能够唯一地表示一条时间序列。uint64则是这条时间序列的id。也就是说LogSeries方法就是存储标签名称以及id使用的。接下来看看这个方法是如何实现的。
// LogSeries writes a batch of new series labels to the log.
// The series have to be ordered.
func (w *SegmentWAL) LogSeries(series []RefSeries) error {
//获得一个buffer,至于buffer如何实现,后面再表
buf := w.getBuffer()
//将series存储到buffer,返回一个uint8类型的flag
flag := w.encodeSeries(buf, series)
//加锁,读写可能会竞争
w.mtx.Lock()
defer w.mtx.Unlock()
//将buf内的内容写入到xx
err := w.write(WALEntrySeries, flag, buf.get())
w.putBuffer(buf)
if err != nil {
return errors.Wrap(err, "log series")
tf := w.head()
for _, s := range series {
if tf.minSeries > s.Ref {
tf.minSeries = s.Ref
return nil
这里用到了几个SegmentWAL结构体成员变量,首先是
type SegmentWAL struct {
......
buffers sync.Pool
sync.Pool是官方的包,这里给出一段介绍
众所周知,go是自动垃圾回收的(garbage collector),这大大减少了程序编程负担。但gc是一把双刃剑,带来了编程的方便但同时也增加了运行时开销,使用不当甚至会严重影响程序的性能。因此性能要求高的场景不能任意产生太多的垃圾(有gc但又不能完全依赖它挺恶心的),如何解决呢?那就是要重用对象了,我们可以简单的使用一个chan把这些可重用的对象缓存起来,但如果很多goroutine竞争一个chan性能肯定是问题.....由于golang团队认识到这个问题普遍存在,为了避免大家重造车轮,因此官方统一出了一个包Pool。原文:https://blog.csdn.net/yongjian_lian/article/details/42058893
简单地说,就是缓存对象, 不会被gc清理掉。需要用的时候,再取出来。这里重用的对象或者说结构体是encbuf,是一个结构体,
// encbuf is a helper type to populate a byte slice with various types.
type encbuf struct {
b []byte
c [binary.MaxVarintLen64]byte
encbuf使用了binary包,简单的数字与字节序列的转换以及变长值的编解码,prometheus采用了BigEndian方式进行编解码。下面来看下encodeSeries方法
func (w *SegmentWAL) encodeSeries(buf *encbuf, series []RefSeries) uint8 {
for _, s := range series {
buf.putBE64(s.Ref)
buf.putUvarint(len(s.Labels))
for _, l := range s.Labels {
buf.putUvarintStr(l.Name)
buf.putUvarintStr(l.Value)
return walSeriesSimple
对于多组RefSeries,首先写入series的RefId,然后用变长编码写入labels个数,最后用变长字符串的形式分别写入标签的名字和值。
写到buf里面之后,调用SegmentWAL的write方法
func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error {
// Cut to the next segment if the entry exceeds the file size unless it would also
// exceed the size of a new segment.
// TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize.
var (
sz = int64(len(buf)) + 6
newsz = w.curN + sz
// XXX(fabxc): this currently cuts a new file whenever the WAL was newly opened.
// Probably fine in general but may yield a lot of short files in some cases.
if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize {
if err := w.cut(); err != nil {
return err
n, err := w.writeTo(w.cur, w.crc32, t, flag, buf)
w.curN += int64(n)
return err
func (w *SegmentWAL) writeTo(wr io.Writer, crc32 hash.Hash, t WALEntryType, flag uint8, buf []byte) (int, error) {
if len(buf) == 0 {
return 0, nil
crc32.Reset()
wr = io.MultiWriter(crc32, wr)
var b [6]byte
b[0] = byte(t)
b[1] = flag
binary.BigEndian.PutUint32(b[2:], uint32(len(buf)))
n1, err := wr.Write(b[:])
if err != nil {
return n1, err
n2, err := wr.Write(buf)
if err != nil {
return n1 + n2, err
n3, err := wr.Write(crc32.Sum(b[:0]))
return n1 + n2 + n3, err
首先来看一下一个结构体成员
type SegmentWAL struct {
......
cur *bufio.Writer
curN int64
segmentSize int64 // default 256 * 1024 * 1024 // 256 MB
先将这个bufio.Writer如何创建的过程暂且不表,只要知道这是一个bufio.Writer类型的即可。回到SegmentWAL的write方法中。这个方法主要执行的是w.writeTo()方法,但是当条件一成立的时候,先执行w.cut()方法。这个条件解释如下:
w.cur为nil
m.curN超过了最大的segmentSize,
加上新数据的长度超过segmentSize且新数据规模小于segmentSize
只要符合其中一个条件,就会执行w.cut(),接着再执行wr.WriteTo()方法。在wr.WriteTo()方法中主要有几个知识点:
crc32包实现了32位循环冗余校验(CRC-32)的校验和算法,参见:http://en.wikipedia.org/wiki/Cyclic_redundancy_check
wr = io.MultiWriter(crc32, wr), 类似于linux tee命令,详见http://man.linuxde.net/tee
写入到wr中的是这么五段 WALEntryType类型+ flag + buf长度 + buf本身 + 校验和
返回写入的总长度
最高记录 3727
©2013-2019 studygolang.com Go语言中文网,中国 Golang 社区,致力于构建完善的 Golang 中文社区,Go语言爱好者的学习家园。
Powered by StudyGolang(Golang + MySQL) • · CDN 采用 七牛云
VERSION: V3.5.0 · 8.541384ms · 为了更好的体验,本站推荐使用 Chrome 或 Firefox 浏览器