1.消费kafka消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main

import (
    "fmt"
    "github.com/Shopify/sarama"
    cluster "github.com/bsm/sarama-cluster"
    "time"
)


var (
    kafkaConsumer *cluster.Consumer
    kafkaBrokers = []string{<!-- -->"127.0.0.1:9092"}
    kafkaTopic   = "test_topic_1"
    groupId = "csdn_test_1"
)

func init() {<!-- -->
    var err error
    config := cluster.NewConfig()
    config.Consumer.Return.Errors = true
    config.Group.Return.Notifications = true
    config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
    config.Consumer.Offsets.Initial = -2
    config.Consumer.Offsets.CommitInterval = 1 * time.Second
    config.Group.Return.Notifications = true
    kafkaConsumer, err = cluster.NewConsumer(kafkaBrokers, groupId, []string{<!-- -->kafkaTopic}, config)
    if err != nil {<!-- -->
        panic(err.Error())
    }
    if kafkaConsumer == nil {<!-- -->
        panic(fmt.Sprintf("consumer is nil. kafka info -> {brokers:%v, topic: %v, group: %v}", kafkaBrokers, kafkaTopic, groupId))
    }
    fmt.Printf("kafka init success, consumer -> %v, topic -> %v, ", kafkaConsumer, kafkaTopic)
}

func main() {<!-- -->
    for {<!-- -->
        select {<!-- -->
        case msg, ok := <-kafkaConsumer.Messages():
            if ok {<!-- -->
                fmt.Printf("kafka msg: %s \n", msg.Value)
                kafkaConsumer.MarkOffset(msg, "")
            } else {<!-- -->
                fmt.Printf("kafka 监听服务失败")
            }
        case err, ok := <-kafkaConsumer.Errors():
            if ok {<!-- -->
                fmt.Printf("consumer error: %v" , err)
            }
        case ntf, ok := <-kafkaConsumer.Notifications():
            if ok {<!-- -->
                fmt.Printf("consumer notification: %v" , ntf)
            }
        }
    }
}

2.生产kafka消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "github.com/Shopify/sarama"
    "time"
)

func main() {<!-- -->
    startProduce()
}

var (
    producer sarama.SyncProducer
    brokers = []string{<!-- -->"127.0.0.1:9092"}
    topic   = "test_topic_1"
)

func init() {<!-- -->
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForLocal
    config.Producer.Retry.Max = 5
    config.Producer.Return.Successes = true
    brokers := brokers
    var err error
    producer, err = sarama.NewSyncProducer(brokers, config)
    if err != nil {<!-- -->
        fmt.Printf("init producer failed -> %v \n", err)
        panic(err)
    }
    fmt.Println("producer init success")
}

func produceMsg(msg string) {<!-- -->
    msgX := &sarama.ProducerMessage{<!-- -->
        Topic: topic,
        Value: sarama.StringEncoder(msg),
    }
    fmt.Printf("SendMsg -> %v\n", dumpString(msgX))

    partition, offset, err := producer.SendMessage(msgX)
    if err != nil {<!-- -->
        fmt.Printf("send msg error:%s \n", err)
    }
    fmt.Printf("msg send success, message is stored in topic(%s)/partition(%d)/offset(%d)\n", topic, partition, offset)
}

func startProduce() {<!-- -->
    tick := time.Tick(2 * time.Second)
    for {<!-- -->
        select {<!-- -->
        case <-tick:
            t := time.Now().Unix() * 1000
            msg := fmt.Sprintf("{"timestamp":%d}", t)
            produceMsg(msg)
        }
    }
}

//解析为json字符串
func dumpString(v interface{<!-- -->}) (str string) {<!-- -->

    bs, err := json.Marshal(v)
    b := bytes.Buffer{<!-- -->}
    if err != nil {<!-- -->
        b.WriteString("{err:"json format error.")
        b.WriteString(err.Error())
        b.WriteString(""}")
    } else {<!-- -->
        b.Write(bs)
    }
    str = b.String()
    return str
}