本代码实现了从Nginx日志access.log中读取信息,并用正则解析,最后将requestTime从端口暴露,以便让prometheus抓取。使用Golang并发实现。

package main

import (
	"bufio"
	"fmt"
	"io"
	"log"
	"net/url"
	"os"
	"regexp"
	"strconv"
	"strings"
	"time"
	"net/http"

	"github.com/prometheus/client_golang/prometheus/promhttp"
    "github.com/prometheus/client_golang/prometheus"
)

type Reader interface {
	Read(rc chan []byte)
}

type Writer interface {
	Write(wc chan *Message)
}

type LogProcess struct {
	rc    chan []byte
	wc    chan *Message
	read  Reader //接收和读取写入模块
	write Writer //
}

type ReadFromFile struct {
	path string
}

type WriteToProm struct {
	
}

type Message struct {
	TimeLocal                    time.Time
	BytesSent                    int
	Path, Method, Scheme, Status string
	UpStreamTime, RequestTime    float64
}

func (r *ReadFromFile) Read(rc chan []byte) {
	//读取模块
	//打开文件
	f, err := os.Open(r.path)
	if err != nil {
		panic(fmt.Sprintf("open file error:%s", err.Error()))
	}

	//从末位开始逐行读取
	f.Seek(0, 2)
	rd := bufio.NewReader(f)
	for {
		line, err := rd.ReadBytes('\n')
		if err == io.EOF {
			time.Sleep(500 * time.Microsecond)
			continue
		} else if err != nil {
			panic(fmt.Sprintf("ReadBytes error:%s", err.Error()))
		}
		rc <- line[:len(line)-1]
	}
}

func (w *WriteToProm) Write(wc chan *Message) {
	//写入模块

    //初始化日志服务
    logger := log.New(os.Stdout, "[Nginx]", log.Lshortfile | log.Ldate | log.Ltime)

    //初始一个http handler
    http.Handle("/metrics", promhttp.Handler())

    //初始化一个容器
    diskPercent := prometheus.NewGaugeVec(prometheus.GaugeOpts{
        Name: "Nginx_logs",
        Help: "Nginx_logs",
    },
    []string {"percent"},
)
    prometheus.MustRegister(diskPercent)

    // 启动web服务,监听1010端口
    go func() {
        logger.Println("ListenAndServe at:10.22.170.143:1010")
        err := http.ListenAndServe("10.22.170.143:1010", nil)

        if err != nil {
            logger.Fatal("ListenAndServe: ", err)
        }
    }()
	
	for v := range wc {
		fmt.Println(v)
        requestTime := v.RequestTime
        logger.Println("get requestTime:", requestTime)
        diskPercent.WithLabelValues("requestTime").Set(requestTime)

        time.Sleep(time.Second*2)
	}
}

func (l *LogProcess) Process() {
	//解析模块
	//([\d\,]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)
	r := regexp.MustCompile(`([\d\,]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`)

	loc, _ := time.LoadLocation("Asia/Shanghai")
	for v := range l.rc {
		ret := r.FindStringSubmatch(string(v))
		if len(ret) != 14 {
			log.Println("FindStringSubmatch fail:", string(v))
			continue
		}
		message := &Message{}
		//time_local
		t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
		if err != nil {
			log.Println("PardeInLocation fail:", err.Error(), ret[4])
		}

		message.TimeLocal = t
		//body_bytes_sent
		byteSent, _ := strconv.Atoi(ret[8])
		message.BytesSent = byteSent

		//request
		reqSli := strings.Split(ret[6], " ")
		if len(reqSli) != 3 {
			log.Println("strings.Split fail", ret[6])
			continue
		}
		//请求方式
		message.Method = reqSli[0]
		//url
		u, err := url.Parse(reqSli[1])
		if err != nil {
			log.Println("url parse fail:", err)
			continue
		}
		message.Path = u.Path
		//协议
		message.Scheme = ret[5]
		//status
		message.Status = ret[7]
		//upstreamTime
		upstreamTime, _ := strconv.ParseFloat(ret[12], 64)
		requestTime, _ := strconv.ParseFloat(ret[13], 64)
		message.UpStreamTime = upstreamTime
		message.RequestTime = requestTime

		l.wc <- message
	}
}

func main() {

	r := &ReadFromFile{
		//从当前目录的access.log中读取
		path: "./access.log",
	}

	w := &WriteToProm{

	}

	lp := &LogProcess{
		rc:    make(chan []byte),
		wc:    make(chan *Message),
		read:  r,
		write: w,
	}

	go lp.read.Read(lp.rc)
	go lp.Process()
	go lp.write.Write(lp.wc)

	time.Sleep(1200 * time.Second)
}