项目介绍:日志收集项目
主要功能:收集日志并且可视化查询日志
开发环境 :windos+idea
项目描述:收集日志并且在可视化页面中查询日志信息
主要技术:etcd+kafka+ElasticSearch+Kibana
负责模块:kafka收集日志,送到ElasticSearch中。kibana展示
初始化etcd连接。然后从etcd中拉取药收集日志的配置项目 然后etcd派一个watch取监听kafka 收集到日志发送到kafka 然后程序把日志文件发送到ElasticSearch中 采用kibana读取出来。
接上篇代码解读
首先main看 用了无闻老师的go—ini的开源库 用来做读配置文件
加载kafka和colletc的配置项目
把配置加入configobj 后面加载这个configobj
打印日志
第二步初始化kafka
连接kafka
初始化Msg Channel 气候态goroutine去往kafka里面发msg
根据配置文件中指定的路径创建了一个tailobj
从tailobj.lines 通道中一行一行的读日志包装成kafka的msg
丢到msgChan中
package main
import (
"fmt"
"github.com/Shopify/sarama"
"sync"
)
// kafka consumer(消费者)
func main() {
// 创建新的消费者
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
// 拿到指定topic下面的所有分区列表
partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
fmt.Println(partitionList)
var wg sync.WaitGroup
for partition := range partitionList { //遍历所有的分区
//针对每个分区创建一个对应的分区消费者
pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetOldest)
if err != nil {
fmt.Printf("failed to strat consumer for partition %d,err:%v\n", partition, err)
return
}
defer pc.AsyncClose()
//异步从每个分区消费信息
wg.Add(1)
go func(partitionConsumer sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d offset:%d key:%s Value:%s",
msg.Partition, msg.Offset, msg.Key, msg.Value)
}
}(pc)
}
wg.Wait()
}
上面是kafka consumer demo模块
下面是etcd watch demo模块
package main
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: time.Second * 5,
})
if err != nil {
fmt.Printf("connect to etcd failed,err:%v", err)
return
}
defer cli.Close()
//watch
watchCh := cli.Watch(context.Background(), "s4")
for wresp := range watchCh {
for _, evt := range wresp.Events {
fmt.Printf("type:%s key:%s value:%s\n", evt.Type, evt.Kv.Key, evt.Kv.Value)
}
}
}
main demo
package main
import clientv3 "go.etcd.io/etcd/client/v3"
import (
"context"
"fmt"
_ "go.etcd.io/etcd/client/v3"
"time"
)
//连接etcd
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: time.Second * 5,
})
if err != nil {
fmt.Printf("connect to etcd failed,err:%v", err)
return
}
defer cli.Close()
//put
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "s4", "666")
if err != nil {
fmt.Printf("put to etcd failed,err:%v", err)
}
cancel()
//get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
gr, err := cli.Get(ctx, "s4")
if err != nil {
fmt.Printf("get from etcd failed,err:%v", err)
return
}
for _, ev := range gr.Kvs {
fmt.Printf("key:%s value:%s\n", ev.Key, ev.Value)
}
cancel()
}
到此项目结束