docker 搭建kafka环境
version: '2'
services:
zk1:
image: confluentinc/cp-zookeeper:latest
hostname: zk1
container_name: zk1
restart: always
ports:
- "12181:2181"
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zk1:12888:13888;zk2:22888:23888;zk3:32888:33888
zk2:
image: confluentinc/cp-zookeeper:latest
hostname: zk2
container_name: zk2
restart: always
ports:
- "22181:2181"
environment:
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zk1:12888:13888;zk2:22888:23888;zk3:32888:33888
zk3:
image: confluentinc/cp-zookeeper:latest
hostname: zk3
container_name: zk3
restart: always
ports:
- "32181:2181"
environment:
ZOOKEEPER_SERVER_ID: 3
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zk1:12888:13888;zk2:22888:23888;zk3:32888:33888
kafka1:
image: confluentinc/cp-kafka:latest
hostname: kafka1
container_name: kafka1
restart: always
ports:
- "9092:9092"
depends_on:
- zk1
- zk2
- zk3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zk1:2181,zk2:2181,zk3:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
kafka2:
image: confluentinc/cp-kafka:latest
hostname: kafka2
container_name: kafka2
restart: always
ports:
- "9093:9093"
depends_on:
- zk1
- zk2
- zk3
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zk1:2181,zk2:2181,zk3:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9093
kafka3:
image: confluentinc/cp-kafka:latest
hostname: kafka3
container_name: kafka3
restart: always
ports:
- "9094:9094"
depends_on:
- zk1
- zk2
- zk3
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zk1:2181,zk2:2181,zk3:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9094
创建topic
partitions为2个,replication有3个,topic的name为test_kafka
//创建topic
kafka-topics --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 3 --partitions 2 --create --topic test_kafka .
//查看topic
kafka-topics --zookeeper zk1:2181,zk2:2181,zk3:2181 --describe --topic test_kafka
消费kafka数据
import (
"context"
"flag"
"log"
"github.com/segmentio/kafka-go"
)
const (
kafkaConn1 = "127.0.0.1:9092"
kafkaConn2 = "127.0.0.1:9093"
kafkaConn3 = "127.0.0.1:9094"
)
var (
topic = flag.String("t", "test_kafka", "kafka topic")
group = flag.String("g", "test-group", "kafka consumer group")
)
func main() {
flag.Parse()
config := kafka.ReaderConfig{
Brokers: []string{kafkaConn1, kafkaConn2, kafkaConn3},
Topic: *topic,
MinBytes: 1e3,
MaxBytes: 1e6,
GroupID: *group,
}
reader := kafka.NewReader(config)
ctx := context.Background()
for {
msg, err := reader.FetchMessage(ctx)
if err != nil {
log.Printf("fail to get msg:%v", err)
continue
}
log.Printf("msg content:topic=%v,partition=%v,offset=%v,content=%v",
msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
err = reader.CommitMessages(ctx, msg)
if err != nil {
log.Printf("fail to commit msg:%v", err)
}
}
}
生产kafka数据
import (
"bufio"
"context"
"fmt"
"os"
"github.com/segmentio/kafka-go"
)
const (
kafkaConn1 = "127.0.0.1:9092"
kafkaConn2 = "127.0.0.1:9093"
kafkaConn3 = "127.0.0.1:9094"
topic = "test_kafka"
)
var brokerAddrs = []string{kafkaConn1, kafkaConn2, kafkaConn3}
func main() {
// read command line input
reader := bufio.NewReader(os.Stdin)
writer := newKafkaWriter(brokerAddrs, topic)
defer writer.Close()
for {
fmt.Print("Enter msg: ")
msgStr, _ := reader.ReadString('\n')
msg := kafka.Message{
Value: []byte(msgStr),
}
err := writer.WriteMessages(context.Background(), msg)
if err != nil {
fmt.Println(err)
}
}
}
//消息分发策略默认使用轮训策略
func newKafkaWriter(kafkaURL []string, topic string) *kafka.Writer {
return kafka.NewWriter(kafka.WriterConfig{
Brokers: kafkaURL,
Topic: topic,
})
}