在实际业务场景中,为了提高系统的实时性,减轻日志存储压力,需要将日志直接生产至消息中间件,减少flume或flumted收集所导致的延时及性能压力,本文实现了一下功能:
实现了一个静态调用的异步生产者    AsyncProducer
封装了一个用于异步发送的生产器    Agent

 

//@description	kafka代理
//@author chenbintao
//@data	2017-09-27	10:30	初稿
//		2017-09-27	11:15	规范代码
//		2017-09-28	14:15	对发送逻辑进行了优化

package kafkaAgent

import (
	"fmt"
	"log"
	"runtime/debug"
	"strings"
	"time"

	"github.com/Shopify/sarama"
)

const (
	_BROKER_LIST_ = `localhost:9092`
)
const (
	_LABEL_ = "[_kafkaAgent_]"
)

var (
	IS_DEBUG = false
	_PAUSE_  = false
)

func SetDebug(debug bool) {
	IS_DEBUG = debug
}

type Agent struct {
	flag           bool
	BrokerList     string
	TopicList      string
	SendTimeOut    time.Duration
	ReceiveTimeOut time.Duration
	AsyncProducer  sarama.AsyncProducer
}

func (this *Agent) Set(BrokerList, TopicList string, SendTimeOut, ReceiveTimeOut time.Duration) bool {
	//只允许初始化一次
	if this.flag {

		return false
	}

	this.flag = true
	this.BrokerList = BrokerList
	this.TopicList = TopicList
	this.SendTimeOut = SendTimeOut
	this.ReceiveTimeOut = ReceiveTimeOut
	this.AsyncProducer = getProducer(this.BrokerList, this.SendTimeOut, true)
	if nil == this.AsyncProducer {

		return false
	}

	return this.Check()
}
func (this *Agent) Check() bool {
	if "" == this.BrokerList || "" == this.TopicList {

		return false
	}
	if 0 == this.SendTimeOut && 0 == this.ReceiveTimeOut {

		return false
	}

	return true
}
func (this *Agent) Send(msg string) bool {
	defer func() {
		if e, ok := recover().(error); ok {
			log.Println(_LABEL_, "WARN: panic in %v", e)
			log.Println(_LABEL_, string(debug.Stack()))
			this.AsyncProducer.Close()
			this.AsyncProducer = getProducer(this.BrokerList, this.SendTimeOut, true)
		}
	}()
	if !this.Check() {

		return false
	}

	return asyncProducer(
		this.AsyncProducer,
		this.TopicList,
		msg,
	)
}

//=========================================================================
// asyncProducer 异步生产者
func AsyncProducer(kafka_list, topics, s string, timeout time.Duration) bool {
	if "" == kafka_list || "" == topics {

		return false
	}
	producer := getProducer(kafka_list, timeout, false)
	if nil == producer {

		return false
	}
	defer producer.Close()
	go func(p sarama.AsyncProducer) {
		errors := p.Errors()
		success := p.Successes()
		for {
			select {
			case err := <-errors:
				if err != nil {
					if IS_DEBUG {
						log.Println(_LABEL_, err)
					}
					return
				} else {
					return
				}
			case <-success:
				return
			}
		}
	}(producer)

	return asyncProducer(producer, topics, s)
}
func asyncProducer(p sarama.AsyncProducer, topics, s string) bool {
	if nil == p {

		return false
	}

	msg := &sarama.ProducerMessage{
		Topic: topics,
		Value: sarama.ByteEncoder(s),
	}
	p.Input() <- msg

	if IS_DEBUG {
		fmt.Println(_LABEL_, msg)
	}

	return true
}
func getProducer(kafka_list string, timeout time.Duration, monitor bool) sarama.AsyncProducer {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.Timeout = timeout
	producer, err := sarama.NewAsyncProducer(strings.Split(kafka_list, ","), config)
	if err != nil {
		if IS_DEBUG {
			log.Println(_LABEL_, err)
		}
	}
	if monitor {
		//消费状态消息,防止死锁
		go func(producer sarama.AsyncProducer) {
			if nil == producer {
				log.Println(_LABEL_, "getProducer() producer error!")

				return
			}
			errors := producer.Errors()
			success := producer.Successes()
			for {
				select {
				case err := <-errors:
					if err != nil {
						if IS_DEBUG {
							log.Println(_LABEL_, err)
						}
						continue
					} else {
						continue
					}
				case <-success:
					continue
				}
			}
		}(producer)
	}

	return producer
}

 


有疑问加站长微信联系(非本文作者)