什么是Kafka
Apache kafka是消息中间件的一种,是一个分布式的流平台,可以应用于高吞吐,高性能的消息队列服务.具体说明可以参考Apache Kafka官网.下面简单的说下使用golang来实现Kafka的生产者和消费者.
安装Kafka
下载
http://kafka.apache.org/downloads
123
wget http://mirror.bit.edu.cn/apache/kafka/ 0.10 .2 .1/kafka_2 .12- 0.10 .2 .1.tgztar -zxvf kafka_2 .12- 0.10 .2 .1.tgzcd kafka_2 .12- 0.10 .2 .1
启动服务
1234
// 1.启动zookeeperbin/zookeeper-server-start .sh config/zookeeper .properties &// 2.启动kafkabin/kafka-server-start .sh config/server .properties &
创建Topic
12345
// 创建Topicbin/kafka-topics.sh --create --zookeeper localhost :2181 --replication-factor 1 --partitions 1 --topic test// 列出Topic是否创建成功bin/kafka-topics.sh --list --zookeeper localhost :2181
发送消息
向创建的test Topic 发送消息(生产者)
123
bin/kafka-console-producer. sh --broker- list localhos t:9092 --topic testThis is a messageThis is another message
创建消费者
订阅一个test Topic,并进行消费
123
bin/kafka- console-consumer.sh --bootstrap-server localhost: 9092 --topic test -- from-beginningThis is a messageThis is another message
如果你的生产者和消费者是成功的话,消费者开启的时候是可以收到所有生产者的消息的.
生产者消费者具体实现
下载Kafka客户端Go语言Library
1
go get github. com/Shopify/sarama
生产者的实现
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
package mainimport ("bufio""fmt""os""sarama""strings")func main() {config := sarama.NewConfig()config.Producer.Return.Successes = trueconfig.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Partitioner = sarama.NewRandomPartitionerproducer, err := sarama.NewSyncProducer([] string{ "localhost:9092"}, config)if err != nil {panic(err)}defer producer.Close()msg := &sarama.ProducerMessage{Topic: "testGo",Partition: int32( -1),Key: sarama.StringEncoder( "key"),}var value stringfor {// 生产消息inputReader := bufio.NewReader(os.Stdin)value, err = inputReader.ReadString( '\n')if err != nil {panic(err)}value = strings.Replace(value, "\n", "", -1)msg.Value = sarama.ByteEncoder(value)paritition, offset, err := producer.SendMessage(msg)if err != nil {fmt.Println( "Send Message Fail")}fmt.Printf( "Partion = %d, offset = %d\n", paritition, offset)}}
消费者的实现
123456789101112131415161718192021222324252627282930313233343536373839404142434445
package mainimport ("fmt""sarama""sync")var (wg sync.WaitGroup)func main() {consumer, err := sarama.NewConsumer([] string{ "localhost:9092"}, nil)if err != nil {panic(err)}partitionList, err := consumer.Partitions( "testGo")if err != nil {panic(err)}for partition := range partitionList {pc, err := consumer.ConsumePartition( "testGo", int32(partition), sarama.OffsetNewest)if err != nil {panic(err)}defer pc.AsyncClose()wg.Add( 1)go func(sarama.PartitionConsumer) {defer wg.Done()for msg := range pc.Messages() {fmt.Printf( "Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))}}(pc)wg.Wait()consumer.Close()}}
config/server.properties
测试运行
运行生产者
1
go run producer/main.go
运行消费者
1
go run consumer/main.go