1.消费kafka消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | package main import ( "fmt" "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" "time" ) var ( kafkaConsumer *cluster.Consumer kafkaBrokers = []string{<!-- -->"127.0.0.1:9092"} kafkaTopic = "test_topic_1" groupId = "csdn_test_1" ) func init() {<!-- --> var err error config := cluster.NewConfig() config.Consumer.Return.Errors = true config.Group.Return.Notifications = true config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange config.Consumer.Offsets.Initial = -2 config.Consumer.Offsets.CommitInterval = 1 * time.Second config.Group.Return.Notifications = true kafkaConsumer, err = cluster.NewConsumer(kafkaBrokers, groupId, []string{<!-- -->kafkaTopic}, config) if err != nil {<!-- --> panic(err.Error()) } if kafkaConsumer == nil {<!-- --> panic(fmt.Sprintf("consumer is nil. kafka info -> {brokers:%v, topic: %v, group: %v}", kafkaBrokers, kafkaTopic, groupId)) } fmt.Printf("kafka init success, consumer -> %v, topic -> %v, ", kafkaConsumer, kafkaTopic) } func main() {<!-- --> for {<!-- --> select {<!-- --> case msg, ok := <-kafkaConsumer.Messages(): if ok {<!-- --> fmt.Printf("kafka msg: %s \n", msg.Value) kafkaConsumer.MarkOffset(msg, "") } else {<!-- --> fmt.Printf("kafka 监听服务失败") } case err, ok := <-kafkaConsumer.Errors(): if ok {<!-- --> fmt.Printf("consumer error: %v" , err) } case ntf, ok := <-kafkaConsumer.Notifications(): if ok {<!-- --> fmt.Printf("consumer notification: %v" , ntf) } } } } |
2.生产kafka消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | package main import ( "bytes" "encoding/json" "fmt" "github.com/Shopify/sarama" "time" ) func main() {<!-- --> startProduce() } var ( producer sarama.SyncProducer brokers = []string{<!-- -->"127.0.0.1:9092"} topic = "test_topic_1" ) func init() {<!-- --> config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForLocal config.Producer.Retry.Max = 5 config.Producer.Return.Successes = true brokers := brokers var err error producer, err = sarama.NewSyncProducer(brokers, config) if err != nil {<!-- --> fmt.Printf("init producer failed -> %v \n", err) panic(err) } fmt.Println("producer init success") } func produceMsg(msg string) {<!-- --> msgX := &sarama.ProducerMessage{<!-- --> Topic: topic, Value: sarama.StringEncoder(msg), } fmt.Printf("SendMsg -> %v\n", dumpString(msgX)) partition, offset, err := producer.SendMessage(msgX) if err != nil {<!-- --> fmt.Printf("send msg error:%s \n", err) } fmt.Printf("msg send success, message is stored in topic(%s)/partition(%d)/offset(%d)\n", topic, partition, offset) } func startProduce() {<!-- --> tick := time.Tick(2 * time.Second) for {<!-- --> select {<!-- --> case <-tick: t := time.Now().Unix() * 1000 msg := fmt.Sprintf("{"timestamp":%d}", t) produceMsg(msg) } } } //解析为json字符串 func dumpString(v interface{<!-- -->}) (str string) {<!-- --> bs, err := json.Marshal(v) b := bytes.Buffer{<!-- -->} if err != nil {<!-- --> b.WriteString("{err:"json format error.") b.WriteString(err.Error()) b.WriteString(""}") } else {<!-- --> b.Write(bs) } str = b.String() return str } |