本代码实现了从Nginx日志access.log中读取信息,并用正则解析,最后将requestTime从端口暴露,以便让prometheus抓取。使用Golang并发实现。
package main
import (
"bufio"
"fmt"
"io"
"log"
"net/url"
"os"
"regexp"
"strconv"
"strings"
"time"
"net/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/client_golang/prometheus"
)
type Reader interface {
Read(rc chan []byte)
}
type Writer interface {
Write(wc chan *Message)
}
type LogProcess struct {
rc chan []byte
wc chan *Message
read Reader //接收和读取写入模块
write Writer //
}
type ReadFromFile struct {
path string
}
type WriteToProm struct {
}
type Message struct {
TimeLocal time.Time
BytesSent int
Path, Method, Scheme, Status string
UpStreamTime, RequestTime float64
}
func (r *ReadFromFile) Read(rc chan []byte) {
//读取模块
//打开文件
f, err := os.Open(r.path)
if err != nil {
panic(fmt.Sprintf("open file error:%s", err.Error()))
}
//从末位开始逐行读取
f.Seek(0, 2)
rd := bufio.NewReader(f)
for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
time.Sleep(500 * time.Microsecond)
continue
} else if err != nil {
panic(fmt.Sprintf("ReadBytes error:%s", err.Error()))
}
rc <- line[:len(line)-1]
}
}
func (w *WriteToProm) Write(wc chan *Message) {
//写入模块
//初始化日志服务
logger := log.New(os.Stdout, "[Nginx]", log.Lshortfile | log.Ldate | log.Ltime)
//初始一个http handler
http.Handle("/metrics", promhttp.Handler())
//初始化一个容器
diskPercent := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "Nginx_logs",
Help: "Nginx_logs",
},
[]string {"percent"},
)
prometheus.MustRegister(diskPercent)
// 启动web服务,监听1010端口
go func() {
logger.Println("ListenAndServe at:10.22.170.143:1010")
err := http.ListenAndServe("10.22.170.143:1010", nil)
if err != nil {
logger.Fatal("ListenAndServe: ", err)
}
}()
for v := range wc {
fmt.Println(v)
requestTime := v.RequestTime
logger.Println("get requestTime:", requestTime)
diskPercent.WithLabelValues("requestTime").Set(requestTime)
time.Sleep(time.Second*2)
}
}
func (l *LogProcess) Process() {
//解析模块
//([\d\,]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)
r := regexp.MustCompile(`([\d\,]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`)
loc, _ := time.LoadLocation("Asia/Shanghai")
for v := range l.rc {
ret := r.FindStringSubmatch(string(v))
if len(ret) != 14 {
log.Println("FindStringSubmatch fail:", string(v))
continue
}
message := &Message{}
//time_local
t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
if err != nil {
log.Println("PardeInLocation fail:", err.Error(), ret[4])
}
message.TimeLocal = t
//body_bytes_sent
byteSent, _ := strconv.Atoi(ret[8])
message.BytesSent = byteSent
//request
reqSli := strings.Split(ret[6], " ")
if len(reqSli) != 3 {
log.Println("strings.Split fail", ret[6])
continue
}
//请求方式
message.Method = reqSli[0]
//url
u, err := url.Parse(reqSli[1])
if err != nil {
log.Println("url parse fail:", err)
continue
}
message.Path = u.Path
//协议
message.Scheme = ret[5]
//status
message.Status = ret[7]
//upstreamTime
upstreamTime, _ := strconv.ParseFloat(ret[12], 64)
requestTime, _ := strconv.ParseFloat(ret[13], 64)
message.UpStreamTime = upstreamTime
message.RequestTime = requestTime
l.wc <- message
}
}
func main() {
r := &ReadFromFile{
//从当前目录的access.log中读取
path: "./access.log",
}
w := &WriteToProm{
}
lp := &LogProcess{
rc: make(chan []byte),
wc: make(chan *Message),
read: r,
write: w,
}
go lp.read.Read(lp.rc)
go lp.Process()
go lp.write.Write(lp.wc)
time.Sleep(1200 * time.Second)
}