go git github.com/Shopify/sarama
生产者实现,基本注释的东西解释的听清楚的了,感兴趣的同学可以查看下注释,同步合异步的方式都基本实现了,本文主要介绍异步的方式。
package kafka
import (
"errors"
"fmt"
"strings"
"time"
"github.com/Shopify/sarama"
)
const (
kafkaTimeOut = time.Second * 5
// kafka生产者发送信息方式
Sync = "Sync"
Async = "Async"
KafkaMatchDataTopic = "test_producer" // kafka 匹配数据上报topic
)
var (
kafkaAddressError = errors.New("kafka address is error")
)
// NewProducer 新建kafka生产者并选择同步方式
// 一般使用异步方式
func NewProducer(address, topic string, duration time.Duration, syncOrAsync string) AbsKafkaProducer {
var producer AbsKafkaProducer
switch syncOrAsync {
case Sync:
producer = &SyncKafkaProducer{}
case Async:
producer = &AsyncKafkaProducer{}
default:
fmt.Println("kafka mode error please choose sync or async")
return nil
}
err := producer.NewKafkaProducer(address, topic, duration)
if err != nil {
fmt.Println("new kafka producer error")
return nil
}
return producer
}
// AbsKafkaProducer 生产者接口
type AbsKafkaProducer interface {
NewKafkaProducer(address, topic string, duration time.Duration) error
Send(value []byte)
}
// KafkaConfig kafka生产者
type KafkaConfig struct {
addressList []string //地址列表
topic string //kafka topic
config *sarama.Config //kafka配置信息
}
// NewProducerByMessage 创建kafka基本生产者
func NewProducerByMessage(address, topic string, duration time.Duration) *KafkaConfig {
//根据字符串解析地址列表
addressList := strings.Split(address, ",")
if len(addressList) < 1 || addressList[0] == "" {
fmt.Println("kafka addr error")
return nil
}
//配置producer参数
sendConfig := sarama.NewConfig()
sendConfig.Producer.Return.Successes = true
sendConfig.Producer.Timeout = kafkaTimeOut
if duration != 0 {
sendConfig.Producer.Timeout = duration
}
return &KafkaConfig{
addressList: addressList,
topic: topic,
config: sendConfig,
}
}
// SyncKafkaProducer 同步kafka生产者
type SyncKafkaProducer struct {
KafkaConfig *KafkaConfig
}
func (k *SyncKafkaProducer) NewKafkaProducer(address, topic string, duration time.Duration) error {
if len(address) == 0 {
return kafkaAddressError
}
k.KafkaConfig = NewProducerByMessage(address, topic, duration)
return nil
}
func (k *SyncKafkaProducer) Send(value []byte) {
if k == nil || k.KafkaConfig == nil || value == nil {
return
}
p, err := sarama.NewSyncProducer(k.KafkaConfig.addressList, k.KafkaConfig.config)
if err != nil {
fmt.Println("sarama.NewSyncProducer err")
return
}
defer p.Close()
msg := &sarama.ProducerMessage{
Topic: k.KafkaConfig.topic,
Value: sarama.ByteEncoder(value),
}
_, _, err = p.SendMessage(msg)
if err != nil {
fmt.Println("send kafka message err")
}
}
// AsyncKafkaProducer kafka异步生产者
type AsyncKafkaProducer struct {
KafkaConfig *KafkaConfig //共有的kafka生产者配置在这个里面
producer sarama.AsyncProducer //异步生产者
isClose chan struct{} // 监听producer是否可以关闭
}
// NewKafkaProducer 创建kafka异步生产者实例,并初始化参数
func (k *AsyncKafkaProducer) NewKafkaProducer(address, topic string, duration time.Duration) error {
if len(address) == 0 {
return kafkaAddressError
}
//配置异步producer启动参数
k.KafkaConfig = NewProducerByMessage(address, topic, duration)
k.isClose = make(chan struct{}, 2)
//启动kafka异步producer
k.Run()
return nil
}
// Send kafka异步发送消息
func (k *AsyncKafkaProducer) Send(value []byte) {
//如果实例或者配置为空,直接返回。如果发送数据为空也直接返回
if k == nil || k.KafkaConfig == nil || value == nil {
return
}
// 封装消息实例
msg := &sarama.ProducerMessage{
Topic: k.KafkaConfig.topic,
Value: sarama.ByteEncoder(value),
}
//这里一般不会出现,producer实例为空时,表示创建异步producer失败
if k.producer == nil {
k.Run()
}
select {
//producer 出现error需要重新启动
case <-k.isClose:
if k.producer != nil {
// 收到可以关闭producer的消息isClose,关闭producer并重启
k.producer.Close()
k.Run()
}
//直接返回,此条消息浪费掉了,如果后期需要收集未发送成功的消息可以在此收集,输出到日志或者等待producer重启成功后再发送
return
default:
// 正常情况发送消息
k.producer.Input() <- msg
}
}
// Run kafka异步生产者
func (k *AsyncKafkaProducer) Run() {
if k == nil || k.KafkaConfig == nil {
return
}
//创建异步producer
producer, err := sarama.NewAsyncProducer(k.KafkaConfig.addressList, k.KafkaConfig.config)
//如果创建失败主动置空k.producer,否则producer不为空,在重启的时候k.producer是会有值的
if err != nil {
k.isClose <- struct{}{}
k.producer = nil
fmt.Println("sarama.NewAsyncProducer err")
return
}
if producer == nil {
k.isClose <- struct{}{}
k.producer = nil
fmt.Println("sarama.NewSyncProducer is null")
return
}
//如果创建成功为实例k的prodcer赋值
k.producer = producer
go func(p sarama.AsyncProducer) {
errors := p.Errors()
success := p.Successes()
for {
select {
//出现了错误
case rc := <-errors:
if rc != nil {
//标记producer出现error,在send时会监听到这个标记
k.isClose <- struct{}{}
fmt.Println("send kafka data error")
}
return
case res:= <-success:
data ,_:= res.Value.Encode()
fmt.Printf("发送成功,value=%s \n", string(data))
}
}
}(producer)
}
测试方法
MatchDataReport := NewProducer("127.0.0.1:9092", KafkaMatchDataTopic, time.Second*5, Async)
for {
MatchDataReport.Send([]byte(time.Now().String()))
time.Sleep(time.Second)
}
本人也是第一次写,有很多不足的地方,不喜勿喷。
基本流程是这样的,有问题的同学可以评论交流下