在实际业务场景中,为了提高系统的实时性,减轻日志存储压力,需要将日志直接生产至消息中间件,减少flume或flumted收集所导致的延时及性能压力,本文实现了一下功能:
实现了一个静态调用的异步生产者 AsyncProducer
封装了一个用于异步发送的生产器 Agent
//@description kafka代理
//@author chenbintao
//@data 2017-09-27 10:30 初稿
// 2017-09-27 11:15 规范代码
// 2017-09-28 14:15 对发送逻辑进行了优化
package kafkaAgent
import (
"fmt"
"log"
"runtime/debug"
"strings"
"time"
"github.com/Shopify/sarama"
)
const (
_BROKER_LIST_ = `localhost:9092`
)
const (
_LABEL_ = "[_kafkaAgent_]"
)
var (
IS_DEBUG = false
_PAUSE_ = false
)
func SetDebug(debug bool) {
IS_DEBUG = debug
}
type Agent struct {
flag bool
BrokerList string
TopicList string
SendTimeOut time.Duration
ReceiveTimeOut time.Duration
AsyncProducer sarama.AsyncProducer
}
func (this *Agent) Set(BrokerList, TopicList string, SendTimeOut, ReceiveTimeOut time.Duration) bool {
//只允许初始化一次
if this.flag {
return false
}
this.flag = true
this.BrokerList = BrokerList
this.TopicList = TopicList
this.SendTimeOut = SendTimeOut
this.ReceiveTimeOut = ReceiveTimeOut
this.AsyncProducer = getProducer(this.BrokerList, this.SendTimeOut, true)
if nil == this.AsyncProducer {
return false
}
return this.Check()
}
func (this *Agent) Check() bool {
if "" == this.BrokerList || "" == this.TopicList {
return false
}
if 0 == this.SendTimeOut && 0 == this.ReceiveTimeOut {
return false
}
return true
}
func (this *Agent) Send(msg string) bool {
defer func() {
if e, ok := recover().(error); ok {
log.Println(_LABEL_, "WARN: panic in %v", e)
log.Println(_LABEL_, string(debug.Stack()))
this.AsyncProducer.Close()
this.AsyncProducer = getProducer(this.BrokerList, this.SendTimeOut, true)
}
}()
if !this.Check() {
return false
}
return asyncProducer(
this.AsyncProducer,
this.TopicList,
msg,
)
}
//=========================================================================
// asyncProducer 异步生产者
func AsyncProducer(kafka_list, topics, s string, timeout time.Duration) bool {
if "" == kafka_list || "" == topics {
return false
}
producer := getProducer(kafka_list, timeout, false)
if nil == producer {
return false
}
defer producer.Close()
go func(p sarama.AsyncProducer) {
errors := p.Errors()
success := p.Successes()
for {
select {
case err := <-errors:
if err != nil {
if IS_DEBUG {
log.Println(_LABEL_, err)
}
return
} else {
return
}
case <-success:
return
}
}
}(producer)
return asyncProducer(producer, topics, s)
}
func asyncProducer(p sarama.AsyncProducer, topics, s string) bool {
if nil == p {
return false
}
msg := &sarama.ProducerMessage{
Topic: topics,
Value: sarama.ByteEncoder(s),
}
p.Input() <- msg
if IS_DEBUG {
fmt.Println(_LABEL_, msg)
}
return true
}
func getProducer(kafka_list string, timeout time.Duration, monitor bool) sarama.AsyncProducer {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = timeout
producer, err := sarama.NewAsyncProducer(strings.Split(kafka_list, ","), config)
if err != nil {
if IS_DEBUG {
log.Println(_LABEL_, err)
}
}
if monitor {
//消费状态消息,防止死锁
go func(producer sarama.AsyncProducer) {
if nil == producer {
log.Println(_LABEL_, "getProducer() producer error!")
return
}
errors := producer.Errors()
success := producer.Successes()
for {
select {
case err := <-errors:
if err != nil {
if IS_DEBUG {
log.Println(_LABEL_, err)
}
continue
} else {
continue
}
case <-success:
continue
}
}
}(producer)
}
return producer
}
有疑问加站长微信联系(非本文作者)