最近去一家大公司面试,我遇到一道有趣的golang笔试题:
目前有一个日志系统,以文件的形式存储文件。外部服务会不断写入日志。请利用go语言编写一个程序,同时完成以下两个目标:
1、每隔1秒钟,将日志刷盘到文件;
2、日志条数每达到100条,将这100条日志刷盘到文件。
当时,由于笔试时间紧张,我来不及多想,这道题就空着。回来后,思来想去,自己决定实现一个简易的golang日志库,完成这个缺憾的梦,也和大家一起对“golang的并发之道”进行研究。
你可以收获-
channel在高并发场景下的使用;
-
for-select-case对channel的优雅遍历处理;
-
定时器在select中的使用;
-
golang如何用结束信号让程序优雅退出。
接下来,我们用不超过130行的代码,通过一系列golang的特性,来打造一个简易的golang日志库。
内容脉络
为了帮助大家有个大致的轮廓,我先把后面的大纲展示出来。
标准日志库长啥样?
一个标准的日志库一般有以下特点:
-
将日志内容持久化到文件中,并同时注意磁盘io。上述面试题涉及到的就是这个。
-
日志的基本信息需要尽量详细,需要包含文件,函数名,时间等等。
-
支持不同的日志级别。我们所熟知的DEBUG/INFO/ERROR等等,说的就是这个。
-
支持日志切割。支持的维度一般是时间,当然也有根据文件大小的。
然而,在这里,我们只实现第一个诉求。
我们要做什么?
经过对需求的拆解,我们希望完成以下几个功能:
-
定时刷盘:每隔1s,将这1s内的日志全部刷盘。
-
超限刷盘:日志条数积压到100条,将这100条日志日志全部刷盘。
-
退出刷盘:程序(或服务)退出时,积压在内存中的日志全部刷盘。
自己动手,丰衣足食
数据流
-
用户先调用日志库的New()。
-
此时程序会开启一个异步协程,循环监听logger对象的buf。
-
logger对象返回用户。
-
用户通过调用logger对象的Info(),将日志输出到日志库。
-
logger对象的buf产生变更。
-
当满足以下条件之一的时候,buf中的日志会统一刷盘。
- buf日志数达到100刷盘一次;
- 每隔1s刷盘一次(如果buf有日志);
- 程序退出刷盘1次。
数据结构
既然,我们要做日志系统,那么,数据结构得先考虑好。
1、logger结构体
(1)f
既然是日志系统,那么必然有一个写文件的过程。那么logger数据结构中,应该有一个文件指针字段f。
(2)bufMessages
以上三个功能中,都需要一个缓冲区暂存一些日志,到达一定的条件后,就将缓冲区的日志批量刷盘。因此,在logger结构体中,需要有一个缓冲区切片bufMessages。
(3)mesChan
这里有一个问题,bufMessages是一个切片,线程不安全。如果日志并发写入的话,会存在问题。这里主要有两种方案解决,一种是加一个互斥锁,一种是用channel。这里,我用了第二种方案。因此,logger结构体多了一个channel。
2、message结构体
当然,我们还需要一个message结构体,来存储日志的详细信息。这里,我们做得比较简单,只有内容和时间。
// 日志对象
type logger struct {
f *os.File // 日志文件指针
bufMessages []*message // 存储每一次需要同步的消息,相当于缓冲区,刷盘就会被清空
mesChan chan *message // 该管道接收每一条日志消息
}
// 日志消息
type message struct {
content string // 日志内容
currentTime time.Time // 日志写入时间
}
代码框架
基于上述数据结构,我们可以把代码的架子逐渐地搭起来:
1、调用日志库的第一步:通过New()初始化一个logger对象
const (
MsgChanLength = 10000 // 日志消息管道最大长度
MaxMsgBufLength = 100 // 日志消息缓冲区最大长度
FlushTimeInterval = time.Second // 日志刷盘周期
)
// 初始化一个logger对象
func New(logPath string) *logger {
// 打开一个文件,这里的模式为创建或者追加
f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND, 0777)
if err != nil {
panic(err)
}
logger := &logger{
f: f,
bufMessages: make([]*message, 0, MaxMsgBufLength),
mesChan: make(chan *message, MsgChanLength),
}
// todo: 这里需要做一点事儿,监听日志刷盘情况
// ...
return logger
}
2、当用户调用Info()的时候,日志消息直接进入管道。
// 将日志消息入队
func (l *logger) Info(content string) {
// 只要mesChan中的日志消息不超过MsgChanLength,这里就不会阻塞。
l.mesChan <- &message{
content: content,
currentTime: time.Now(),
}
}
3、格式化一下日志,让日志好看一点儿~~
// 格式化一下日志,让日志好看一点儿
func (l *logger) formatMsg(mes *message) string {
builder := &strings.Builder{}
builder.WriteString(mes.currentTime.Format("2006-01-02 15:04:05.999999"))
builder.WriteString(" ")
builder.WriteString(mes.content)
builder.WriteString("\n")
return builder.String()
}
4、批量将日志刷盘,实际上就是操作buf,然后将buf清空的过程。
// 公用方法:批量将buf内容刷新到日志文件
// 由于该方法放在同一个select中调用,因此线程安全
func (l *logger) batchFlush() (err error) {
builder := strings.Builder{}
for _, mes := range l.bufMessages {
// 将所有的buffer内容全部拼接起来
builder.WriteString(l.formatMsg(mes))
}
content := builder.String()
if content == "" {
return
}
// 重置bufMessages
l.bufMessages = make([]*message, 0, MaxMsgBufLength)
// 写入日志文件
_, err = l.f.WriteString(content)
if err != nil {
fmt.Println("写入日志文件失败,", err)
return
}
fmt.Println("成功写入日志文件,", time.Now().String())
return
}
定时刷盘
既然是要定时刷盘,我们很容易想到用ticker来处理。
func (l *logger) listenFlush() {
// 注册定时器
ticker := time.NewTicker(FlushTimeInterval)
// 这里我们使用select-case组合,对channel进行优雅处理
for {
select {
case <-ticker.C:
fmt.Println("每隔1s,将日志刷盘")
l.batchFlush()
}
}
}
由于这个函数会阻塞,因此,我们要将其放在一个协程中,这个协程应该在New()中去创建。
// 初始化一个logger对象
func New(logPath string) *logger {
// 打开一个文件,这里的模式为创建或者追加
f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND, 0777)
if err != nil {
panic(err)
}
logger := &logger{
f: f,
bufMessages: make([]*message, 0, MaxMsgBufLength),
mesChan: make(chan *message, MsgChanLength),
}
// 监听定时日志刷盘情况
go logger.listenFlush()
return logger
}
超限刷盘
“超限刷盘”的实现,其实就是接收mesChan的消息,将其塞到bufMessages中,当bufMessages达到100条,则触发批量刷盘。由于我们不清楚啥时候需要close这个mesChan,因此,我们需要用for-select-case来接收其消息。这里,我们可以把它加到ListenFlush中。
func (l *logger) listenFlush() {
// 注册定时器
ticker := time.NewTicker(FlushTimeInterval)
for {
select {
case mes := <-l.mesChan:
l.bufMessages = append(l.bufMessages, mes)
if len(l.bufMessages) == MaxMsgBufLength {
fmt.Println("缓冲区日志到达上限,将日志刷盘")
l.batchFlush()
}
case <-ticker.C:
fmt.Println("每隔1s,将日志刷盘")
l.batchFlush()
}
}
}
退出刷盘
无论是上面的“定时刷盘”,还是“超限刷盘”,都有一个问题,那就是,当主进程退出后,如果bufMessages中还有日志,那么这部分日志就会丢失。为了解决这个问题,我们可以做一个优化,利用golang的结束信号,让程序优雅退出:退出前将buffer刷盘。
这部分代码,我们可以加到上限刷盘的代码里面:
func (l *logger) listenFlush() {
// 这里,我们加一个结束信号,来优雅退出
c := make(chan os.Signal)
// 监听信号
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
// 注册定时器
ticker := time.NewTicker(FlushTimeInterval)
for {
select {
case mes := <-l.mesChan:
l.bufMessages = append(l.bufMessages, mes)
if len(l.bufMessages) == MaxMsgBufLength {
fmt.Println("缓冲区日志到达上限,将日志刷盘")
l.batchFlush()
}
case <-ticker.C:
fmt.Println("每隔1s,将日志刷盘")
l.batchFlush()
case <-c:
fmt.Println("收到结束信号,将日志刷盘")
l.batchFlush()
}
}
}
这样,我们就完成了整个日志库。
完整代码
package log
import (
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"time"
)
const (
MsgChanLength = 10000 // 日志消息管道最大长度
MaxMsgBufLength = 100 // 日志消息缓冲区最大长度
FlushTimeInterval = time.Second // 日志刷盘周期
)
// 日志对象
type logger struct {
f *os.File // 日志文件指针
bufMessages []*message // 存储每一次需要同步的消息,相当于缓冲区,刷盘就会被清空
mesChan chan *message // 该管道接收每一条日志消息
}
// 日志消息
type message struct {
content string // 日志内容
currentTime time.Time // 日志写入时间
}
// 初始化一个logger对象
func New(logPath string) *logger {
// 打开一个文件,这里的模式为创建或者追加
f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND, 0777)
if err != nil {
panic(err)
}
logger := &logger{
f: f,
bufMessages: make([]*message, 0, MaxMsgBufLength),
mesChan: make(chan *message, MsgChanLength),
}
// 监听日志buf刷盘情况
go logger.listenFlush()
return logger
}
// 格式化一下日志,让日志好看一点儿
func (l *logger) formatMsg(mes *message) string {
builder := &strings.Builder{}
builder.WriteString(mes.currentTime.Format("2006-01-02 15:04:05.999999"))
builder.WriteString(" ")
builder.WriteString(mes.content)
builder.WriteString("\n")
return builder.String()
}
// 将日志入队
func (l *logger) Info(content string) {
l.mesChan <- &message{
content: content,
currentTime: time.Now(),
}
}
// 批量将buf内容刷新到日志文件
func (l *logger) batchFlush() (err error) {
builder := strings.Builder{}
for _, mes := range l.bufMessages {
// 将所有的buffer内容全部拼接起来
builder.WriteString(l.formatMsg(mes))
}
content := builder.String()
if content == "" {
return
}
// 重置bufMessages
l.bufMessages = make([]*message, 0, MaxMsgBufLength)
// 写入日志文件
_, err = l.f.WriteString(content)
if err != nil {
fmt.Println("写入日志文件失败,", err)
return
}
fmt.Println("成功写入日志文件,", time.Now().String())
return
}
// 监听刷盘情况
func (l *logger) listenFlush() {
// 这里,我们加一个结束信号,来优雅退出
c := make(chan os.Signal)
// 监听信号
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
// 注册定时器
ticker := time.NewTicker(FlushTimeInterval)
for {
select {
case mes := <-l.mesChan:
l.bufMessages = append(l.bufMessages, mes)
if len(l.bufMessages) == MaxMsgBufLength {
fmt.Println("缓冲区日志到达上限,将日志刷盘")
l.batchFlush()
}
case <-ticker.C:
fmt.Println("每隔1s,将日志刷盘")
l.batchFlush()
case <-c:
fmt.Println("收到结束信号,将日志刷盘")
l.batchFlush()
}
}
}
测试文件:log_test.go
package log
import (
"math/rand"
"testing"
"time"
)
// 模拟生产场景进行日志输出
func TestBatchLog(t *testing.T) {
// 初始化一个logger对象
logger := New("./batch.log")
for i := 0; i < 1000; i++ {
// 开启1000个协程,每个协程都是一个死循环,不定期地往batch.log里面写日志
go func() {
for {
n := rand.Intn(100)
logger.Info("hello" + time.Now().String())
time.Sleep(time.Duration(n) * time.Millisecond)
}
}()
}
// 阻塞程序
select {}
}
小结
虽然,这个日志库和zerolog等标准日志库相比,还有一定的差距,但是,通过这个小巧的项目,我们可以对golang的并发处理有更加清晰的认识,例如channel、select、定时器的使用,golang程序退出的优雅处理,切片非线程安全的应对等等。
不仅如此,我们还可以在此基础上继续探索,一个标准日志库的实现,需要具备哪些要素,甚至打造出一套深受业界青睐的好库。