项目介绍:日志收集项目
主要功能:收集日志并且可视化查询日志
开发环境 :windos+idea
项目描述:收集日志并且在可视化页面中查询日志信息
主要技术:etcd+kafka+ElasticSearch+Kibana
负责模块:kafka收集日志,送到ElasticSearch中。kibana展示

初始化etcd连接。然后从etcd中拉取药收集日志的配置项目 然后etcd派一个watch取监听kafka 收集到日志发送到kafka 然后程序把日志文件发送到ElasticSearch中 采用kibana读取出来。

接上篇代码解读

首先main看 用了无闻老师的go—ini的开源库 用来做读配置文件
加载kafka和colletc的配置项目
把配置加入configobj 后面加载这个configobj
打印日志

第二步初始化kafka 
连接kafka
初始化Msg Channel  气候态goroutine去往kafka里面发msg

根据配置文件中指定的路径创建了一个tailobj

从tailobj.lines 通道中一行一行的读日志包装成kafka的msg
丢到msgChan中
package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"sync"
)

// kafka consumer(消费者)

func main() {
	// 创建新的消费者
	consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
	if err != nil {
		fmt.Printf("fail to start consumer, err:%v\n", err)
		return
	}
	// 拿到指定topic下面的所有分区列表
	partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
	if err != nil {
		fmt.Printf("fail to get list of partition:err%v\n", err)
		return
	}
	fmt.Println(partitionList)
	var wg sync.WaitGroup
	for partition := range partitionList { //遍历所有的分区
		//针对每个分区创建一个对应的分区消费者
		pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetOldest)
		if err != nil {
			fmt.Printf("failed to strat consumer for partition %d,err:%v\n", partition, err)
			return
		}
		defer pc.AsyncClose()
		//异步从每个分区消费信息
		wg.Add(1)
		go func(partitionConsumer sarama.PartitionConsumer) {
			for msg := range pc.Messages() {
				fmt.Printf("Partition:%d offset:%d key:%s Value:%s",
					msg.Partition, msg.Offset, msg.Key, msg.Value)
			}
		}(pc)
	}
	wg.Wait()

}

上面是kafka consumer demo模块

下面是etcd watch demo模块

package main

import (
	"context"
	"fmt"
	clientv3 "go.etcd.io/etcd/client/v3"
	"time"
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"127.0.0.1:2379"},
		DialTimeout: time.Second * 5,
	})
	if err != nil {
		fmt.Printf("connect to etcd failed,err:%v", err)
		return
	}
	defer cli.Close()

	//watch
	watchCh := cli.Watch(context.Background(), "s4")
	for wresp := range watchCh {
		for _, evt := range wresp.Events {
			fmt.Printf("type:%s key:%s value:%s\n", evt.Type, evt.Kv.Key, evt.Kv.Value)
		}
	}

}

main demo

package main

import clientv3 "go.etcd.io/etcd/client/v3"

import (
	"context"
	"fmt"
	_ "go.etcd.io/etcd/client/v3"
	"time"
)

//连接etcd
func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"127.0.0.1:2379"},
		DialTimeout: time.Second * 5,
	})
	if err != nil {
		fmt.Printf("connect to etcd failed,err:%v", err)
		return
	}
	defer cli.Close()

	//put
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	_, err = cli.Put(ctx, "s4", "666")
	if err != nil {
		fmt.Printf("put to etcd failed,err:%v", err)
	}
	cancel()

	//get
	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
	gr, err := cli.Get(ctx, "s4")
	if err != nil {
		fmt.Printf("get from etcd failed,err:%v", err)
		return
	}
	for _, ev := range gr.Kvs {
		fmt.Printf("key:%s value:%s\n", ev.Key, ev.Value)
	}
	cancel()
}


 

到此项目结束