前言

最近去一家大公司面试,我遇到一道有趣的golang笔试题:

目前有一个日志系统,以文件的形式存储文件。外部服务会不断写入日志。请利用go语言编写一个程序,同时完成以下两个目标:
1、每隔1秒钟,将日志刷盘到文件;
2、日志条数每达到100条,将这100条日志刷盘到文件。

当时,由于笔试时间紧张,我来不及多想,这道题就空着。回来后,思来想去,自己决定实现一个简易的golang日志库,完成这个缺憾的梦,也和大家一起对“golang的并发之道”进行研究。

你可以收获
  • channel在高并发场景下的使用;

  • for-select-case对channel的优雅遍历处理;

  • 定时器在select中的使用;

  • golang如何用结束信号让程序优雅退出。

打造一个简易的golang日志库

接下来,我们用不超过130行的代码,通过一系列golang的特性,来打造一个简易的golang日志库。

内容脉络

为了帮助大家有个大致的轮廓,我先把后面的大纲展示出来。

在这里插入图片描述

标准日志库长啥样?

一个标准的日志库一般有以下特点:

  • 将日志内容持久化到文件中,并同时注意磁盘io。上述面试题涉及到的就是这个。

  • 日志的基本信息需要尽量详细,需要包含文件,函数名,时间等等。

  • 支持不同的日志级别。我们所熟知的DEBUG/INFO/ERROR等等,说的就是这个。

  • 支持日志切割。支持的维度一般是时间,当然也有根据文件大小的。

然而,在这里,我们只实现第一个诉求。

我们要做什么?

经过对需求的拆解,我们希望完成以下几个功能:

  • 定时刷盘:每隔1s,将这1s内的日志全部刷盘。

  • 超限刷盘:日志条数积压到100条,将这100条日志日志全部刷盘。

  • 退出刷盘:程序(或服务)退出时,积压在内存中的日志全部刷盘。

自己动手,丰衣足食

数据流

  • 用户先调用日志库的New()。

  • 此时程序会开启一个异步协程,循环监听logger对象的buf。

  • logger对象返回用户。

  • 用户通过调用logger对象的Info(),将日志输出到日志库。

  • logger对象的buf产生变更。

  • 当满足以下条件之一的时候,buf中的日志会统一刷盘。

    • buf日志数达到100刷盘一次;
    • 每隔1s刷盘一次(如果buf有日志);
    • 程序退出刷盘1次。

在这里插入图片描述

数据结构

既然,我们要做日志系统,那么,数据结构得先考虑好。

1、logger结构体

(1)f

既然是日志系统,那么必然有一个写文件的过程。那么logger数据结构中,应该有一个文件指针字段f。

(2)bufMessages

以上三个功能中,都需要一个缓冲区暂存一些日志,到达一定的条件后,就将缓冲区的日志批量刷盘。因此,在logger结构体中,需要有一个缓冲区切片bufMessages。

(3)mesChan

这里有一个问题,bufMessages是一个切片,线程不安全。如果日志并发写入的话,会存在问题。这里主要有两种方案解决,一种是加一个互斥锁,一种是用channel。这里,我用了第二种方案。因此,logger结构体多了一个channel。

2、message结构体

当然,我们还需要一个message结构体,来存储日志的详细信息。这里,我们做得比较简单,只有内容和时间。

// 日志对象
type logger struct {
   f           *os.File      // 日志文件指针
   bufMessages []*message    // 存储每一次需要同步的消息,相当于缓冲区,刷盘就会被清空
   mesChan     chan *message // 该管道接收每一条日志消息
}

// 日志消息
type message struct {
   content     string    // 日志内容
   currentTime time.Time // 日志写入时间
}

代码框架

基于上述数据结构,我们可以把代码的架子逐渐地搭起来:

1、调用日志库的第一步:通过New()初始化一个logger对象

const (
   MsgChanLength = 10000           // 日志消息管道最大长度
   MaxMsgBufLength = 100            // 日志消息缓冲区最大长度
   FlushTimeInterval = time.Second // 日志刷盘周期
)

// 初始化一个logger对象
func New(logPath string) *logger {
   // 打开一个文件,这里的模式为创建或者追加
   f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND, 0777)
   if err != nil {
      panic(err)
   }

   logger := &logger{
      f:           f,
      bufMessages: make([]*message, 0, MaxMsgBufLength),
      mesChan:     make(chan *message, MsgChanLength),
   }

   // todo: 这里需要做一点事儿,监听日志刷盘情况
   // ...
   return logger
}

2、当用户调用Info()的时候,日志消息直接进入管道。

// 将日志消息入队
func (l *logger) Info(content string) {
   // 只要mesChan中的日志消息不超过MsgChanLength,这里就不会阻塞。
   l.mesChan <- &message{
      content:     content,
      currentTime: time.Now(),
   }
}

3、格式化一下日志,让日志好看一点儿~~

// 格式化一下日志,让日志好看一点儿
func (l *logger) formatMsg(mes *message) string {
   builder := &strings.Builder{}
   builder.WriteString(mes.currentTime.Format("2006-01-02 15:04:05.999999"))
   builder.WriteString(" ")
   builder.WriteString(mes.content)
   builder.WriteString("\n")
   return builder.String()
}

4、批量将日志刷盘,实际上就是操作buf,然后将buf清空的过程。

// 公用方法:批量将buf内容刷新到日志文件
// 由于该方法放在同一个select中调用,因此线程安全
func (l *logger) batchFlush() (err error) {
   builder := strings.Builder{}
   for _, mes := range l.bufMessages {
      // 将所有的buffer内容全部拼接起来
      builder.WriteString(l.formatMsg(mes))
   }

   content := builder.String()

   if content == "" {
      return
   }

   // 重置bufMessages
   l.bufMessages = make([]*message, 0, MaxMsgBufLength)

   // 写入日志文件
   _, err = l.f.WriteString(content)
   if err != nil {
      fmt.Println("写入日志文件失败,", err)
      return
   }

   fmt.Println("成功写入日志文件,", time.Now().String())

   return
}

定时刷盘

既然是要定时刷盘,我们很容易想到用ticker来处理。

func (l *logger) listenFlush() {
   // 注册定时器
   ticker := time.NewTicker(FlushTimeInterval)
   
   // 这里我们使用select-case组合,对channel进行优雅处理
   for {
      select {
      case <-ticker.C:
         fmt.Println("每隔1s,将日志刷盘")
         l.batchFlush()
      }
   }
}

由于这个函数会阻塞,因此,我们要将其放在一个协程中,这个协程应该在New()中去创建。

// 初始化一个logger对象
func New(logPath string) *logger {
   // 打开一个文件,这里的模式为创建或者追加
   f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND, 0777)
   if err != nil {
      panic(err)
   }

   logger := &logger{
      f:           f,
      bufMessages: make([]*message, 0, MaxMsgBufLength),
      mesChan:     make(chan *message, MsgChanLength),
   }

   // 监听定时日志刷盘情况
   go logger.listenFlush()
   return logger
}

超限刷盘

“超限刷盘”的实现,其实就是接收mesChan的消息,将其塞到bufMessages中,当bufMessages达到100条,则触发批量刷盘。由于我们不清楚啥时候需要close这个mesChan,因此,我们需要用for-select-case来接收其消息。这里,我们可以把它加到ListenFlush中。

func (l *logger) listenFlush() {
   // 注册定时器
   ticker := time.NewTicker(FlushTimeInterval)
   for {
      select {
      case mes := <-l.mesChan:
         l.bufMessages = append(l.bufMessages, mes)
         if len(l.bufMessages) == MaxMsgBufLength {
            fmt.Println("缓冲区日志到达上限,将日志刷盘")
            l.batchFlush()
         }
      case <-ticker.C:
         fmt.Println("每隔1s,将日志刷盘")
         l.batchFlush()
      }
   }
}

退出刷盘

无论是上面的“定时刷盘”,还是“超限刷盘”,都有一个问题,那就是,当主进程退出后,如果bufMessages中还有日志,那么这部分日志就会丢失。为了解决这个问题,我们可以做一个优化,利用golang的结束信号,让程序优雅退出:退出前将buffer刷盘。

这部分代码,我们可以加到上限刷盘的代码里面:

func (l *logger) listenFlush() {
   // 这里,我们加一个结束信号,来优雅退出
   c := make(chan os.Signal)
   // 监听信号
   signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
   
   // 注册定时器
   ticker := time.NewTicker(FlushTimeInterval)

   for {
      select {
      case mes := <-l.mesChan:
         l.bufMessages = append(l.bufMessages, mes)
         if len(l.bufMessages) == MaxMsgBufLength {
            fmt.Println("缓冲区日志到达上限,将日志刷盘")
            l.batchFlush()
         }
      case <-ticker.C:
         fmt.Println("每隔1s,将日志刷盘")
         l.batchFlush()
      case <-c:
         fmt.Println("收到结束信号,将日志刷盘")
         l.batchFlush()
      }
   }
}

这样,我们就完成了整个日志库。

完整代码

package log

import (
   "fmt"
   "os"
   "os/signal"
   "strings"
   "syscall"
   "time"
)

const (
   MsgChanLength = 10000       // 日志消息管道最大长度
   MaxMsgBufLength = 100        // 日志消息缓冲区最大长度
   FlushTimeInterval = time.Second // 日志刷盘周期
)

// 日志对象
type logger struct {
   f           *os.File      // 日志文件指针
   bufMessages []*message    // 存储每一次需要同步的消息,相当于缓冲区,刷盘就会被清空
   mesChan     chan *message // 该管道接收每一条日志消息
}

// 日志消息
type message struct {
   content     string    // 日志内容
   currentTime time.Time // 日志写入时间
}

// 初始化一个logger对象
func New(logPath string) *logger {
   // 打开一个文件,这里的模式为创建或者追加
   f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND, 0777)
   if err != nil {
      panic(err)
   }

   logger := &logger{
      f:           f,
      bufMessages: make([]*message, 0, MaxMsgBufLength),
      mesChan:     make(chan *message, MsgChanLength),
   }

   // 监听日志buf刷盘情况
   go logger.listenFlush()
   return logger
}

// 格式化一下日志,让日志好看一点儿
func (l *logger) formatMsg(mes *message) string {
   builder := &strings.Builder{}
   builder.WriteString(mes.currentTime.Format("2006-01-02 15:04:05.999999"))
   builder.WriteString(" ")
   builder.WriteString(mes.content)
   builder.WriteString("\n")
   return builder.String()
}

// 将日志入队
func (l *logger) Info(content string) {
   l.mesChan <- &message{
      content:     content,
      currentTime: time.Now(),
   }
}

// 批量将buf内容刷新到日志文件
func (l *logger) batchFlush() (err error) {
   builder := strings.Builder{}
   for _, mes := range l.bufMessages {
      // 将所有的buffer内容全部拼接起来
      builder.WriteString(l.formatMsg(mes))
   }

   content := builder.String()

   if content == "" {
      return
   }

   // 重置bufMessages
   l.bufMessages = make([]*message, 0, MaxMsgBufLength)

   // 写入日志文件
   _, err = l.f.WriteString(content)
   if err != nil {
      fmt.Println("写入日志文件失败,", err)
      return
   }

   fmt.Println("成功写入日志文件,", time.Now().String())

   return
}

// 监听刷盘情况
func (l *logger) listenFlush() {
   // 这里,我们加一个结束信号,来优雅退出
   c := make(chan os.Signal)
   // 监听信号
   signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

   // 注册定时器
   ticker := time.NewTicker(FlushTimeInterval)

   for {
      select {
      case mes := <-l.mesChan:
         l.bufMessages = append(l.bufMessages, mes)
         if len(l.bufMessages) == MaxMsgBufLength {
            fmt.Println("缓冲区日志到达上限,将日志刷盘")
            l.batchFlush()
         }
      case <-ticker.C:
         fmt.Println("每隔1s,将日志刷盘")
         l.batchFlush()
      case <-c:
         fmt.Println("收到结束信号,将日志刷盘")
         l.batchFlush()
      }
   }
}

测试文件:log_test.go

package log

import (
   "math/rand"
   "testing"
   "time"
)

// 模拟生产场景进行日志输出
func TestBatchLog(t *testing.T) {
   // 初始化一个logger对象
   logger := New("./batch.log")
   for i := 0; i < 1000; i++ {
      // 开启1000个协程,每个协程都是一个死循环,不定期地往batch.log里面写日志
      go func() {
         for {
            n := rand.Intn(100)
            logger.Info("hello" + time.Now().String())
            time.Sleep(time.Duration(n) * time.Millisecond)
         }
      }()
   }

   // 阻塞程序
   select {}
}
小结

虽然,这个日志库和zerolog等标准日志库相比,还有一定的差距,但是,通过这个小巧的项目,我们可以对golang的并发处理有更加清晰的认识,例如channel、select、定时器的使用,golang程序退出的优雅处理,切片非线程安全的应对等等。

不仅如此,我们还可以在此基础上继续探索,一个标准日志库的实现,需要具备哪些要素,甚至打造出一套深受业界青睐的好库。