配置相关依赖包:

  1. mkdir -p $GOPATH/src/golang.org/x
  2. cd $GOPATH/src/golang.org/x
  3. git clone https://github.com/golang/net.git
  4. git clone https://github.com/golang/crypto.git
  5. go get github.com/Shopify/sarama

 

写入信息到 kafka

  1. package main
  2.  
  3. import (
  4. "fmt"
  5. "github.com/Shopify/sarama"
  6. "time"
  7. )
  8.  
  9. func main() {
  10. //初始化配置
  11. config := sarama.NewConfig()
  12. config.Producer.RequiredAcks = sarama.WaitForAll
  13. config.Producer.Partitioner = sarama.NewRandomPartitioner
  14. config.Producer.Return.Successes = true
  15. //生产者
  16. client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
  17. if err != nil {
  18. fmt.Println("producer close,err:", err)
  19. return
  20. }
  21.  
  22. defer client.Close()
  23.  
  24. var n int=0
  25. for n<20{
  26. n++
  27. //创建消息
  28. msg := &sarama.ProducerMessage{}
  29. msg.Topic = "topic_title"
  30. msg.Value = sarama.StringEncoder("this is a good test,hello gopher.cc!!")
  31. //发送消息
  32. pid, offset, err := client.SendMessage(msg)
  33. if err != nil {
  34. fmt.Println("send message failed,", err)
  35. return
  36. }
  37. fmt.Printf("pid:%v offset:%v\n,", pid, offset)
  38. time.Sleep(10 * time.Millisecond)
  39. }
  40. }