LogAgent的工作流程:
1.读日志——tailf第三方库
2.往kafka中写日志 – sarama第三方库
kafka参考网站
介绍:
Kafaka是一个分布式数据流平台,可以运行在单台服务器上,也可以在多台服务器上部署形成集群。它提供了发布和订阅功能,使用者可以发送数据到Kafka中,也可以从Kafka中读取数据(以便进行后续的处理)。Kafka具有高吞吐、低延迟、高容错等特点。
1.Kafaka集群的架构:
1.broker:每台机器都是一个broker
2.topic:每个日志都给一个分类
3.partition:分区,把同一个topic分成不同的分区,提高负载
1.leader:分区的主节点(老大)
2.follower:分区的从节点(小弟)
4.Consumer Group
2.生产者往Kafka发送数据的流程(6步)
3.Kafka选择分区的模式(3种)
1.指定往哪个分区写
2.指定key,kafka根据key做hash然后决定写哪个分区
3.轮询方式
4.生产者往kafka发送数据的模式(3种)
1.0:把数据发给leader就成功,效率最高,安全性最低
2.1:把数据发送给leader,等待leader回ACK
3.all:把数据发给leader,follower从leader拉取数据回复ack给leader,leader再回复ACK;安全性最高
5.分区存储文件的原理
6.为什么kafka快?(随机读变成顺序读,记住了每个文件在磁盘中的位置)
7.消费者组
每个消费者实例可以消费多个分区,但是每个分区最多只能被消费者组中的一个实例消费。
zookeeper工作原理:
每当主机启动服务时,先登记到zookeeper中,zookeeper会生成一个目录项,当调用服务时,会先查zookeeper
tail库:
//tailf读文件实例
package main
import (
"fmt"
"time"
"github.com/hpcloud/tail"
)
func main() {
fileName := "./my.log"
config := tail.Config{
ReOpen: true, //重新打开
Follow: true, //是否跟随
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件哪个地方开始读
MustExist: false, //文件不存在不报错
Poll: true,
}
tails, err := tail.TailFile(fileName, config)
if err != nil {
fmt.Println("tail file failed,err:", err)
return
}
var (
msg *tail.Line
ok bool
)
for {
msg, ok = <-tails.Lines
if !ok {
fmt.Println("tail file close reopen,filename:", err)
time.Sleep(time.Second)
continue
}
fmt.Println("msg:", msg.Text)
}
}
kafka的启用:
先执行zookeeper的zkServer.cmd
然后执行在D:\apache-zookeeper-3.8.0-bin下执行以下命令:
kafka启用命令
.\bin\windows\kafka-server-start.bat .\config\server.properties
执行下列代码:
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
//tailf包使用
config.Producer.RequiredAcks = sarama.WaitForAll //发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner //新选出一个partition
config.Producer.Return.Successes = true //成功交付的消息将在success channel返回
//构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder("this is a test log")
//连接kafka
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println("producer closed,err:", err)
return
}
fmt.Println("连接kafka成功!")
defer client.Close()
pid, offset, err := client.SendMessage(msg) //offset是写成功的文件的索引位置
if err != nil {
fmt.Println("send msg failed,err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
kafka消费实例
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
//kafka 消费者 实例
func main() {
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer,err:%v\n", err)
return
}
partitionList, err := consumer.Partitions("web_log") //根据topic取到所有的分区
if err != nil {
fmt.Println("fail to get list of partition:", err)
return
}
fmt.Println(partitionList)
for partition := range partitionList {
pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v", partition, err)
return
}
defer pc.AsyncClose()
//异步从每个分区消费消息
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
}
}(pc)
}
select {}
}
打开kafka终端消费者读取数据
bin\windows\kafka-console-consumer.bat --bootstrap-server=127.0.0.1:9092 --topic=web_log --from-beginning