1.需求背景

服务升级,需要对kafka消息持久化服务进行压测,预计每分钟要产生消息400w条。
目前使用Golang实现了批量发送kafka消息的接口,但100w条消息就要还是50s多,无法满足需求,因此需要对发送kafka接口进行性能调优

2.问题分析

2.1发送的消息量是否已经达到了网络io的瓶颈

经过测试,本地调试时,确实存在这个问题。
在服务器上调试,则可以避免这个问题

2.2发送kafka接口的实现存在性能问题

  • 组装kafka消息的逻辑(BenchMark测试也没啥问题)
// 使用jsonpath替换某个字段
// source {"name":{"first":"Janet","last":"Prichard"}}
// jsonKeyValue {"name.first": {"name.first", "_", "name.last"}}
// want {"name":{"first":"Janet_Prichard","last":"Prichard"}}

func replaceJsonMsg(source string, jsonKeyValue map[string][]string) string {
    for keyPath, valueList := range jsonKeyValue {
        var builder strings.Builder
        for _, valuePath := range valueList {
            value := gjson.Get(source, valuePath)
            if value.Exists() {
                builder.WriteString(value.Str)
            } else {
                builder.WriteString(valuePath)
            }
        }
        source, _ = sjson.Set(source, keyPath, builder.String())
    }
    return source
}
rikasai@huacainoMBP handlers % go test -bench=BenchmarkReplaceJsonMsg -benchtime=5s -cpuprofile jsonCpu.out
goos: darwin
goarch: amd64
pkg: BigDataTestTool/handlers
BenchmarkReplaceJsonMsg-12       7363956               814 ns/op
PASS
rikasai@huacainoMBP handlers % go tool pprof -http=:8081 jsonCpu.out
  • 发送kafka的第三方库有问题(经过测试,直接发消息,不经过任何处理时,第三方库能满足需求)

2.3kafka服务端配置存在问题

  • 经研发大佬确认过,配置正常
3.调试过程
replaceJsonMsgstringmapreplaceMapMsg
func replaceMapMsg(msg *map[string]interface{}, jsonKeyValue *map[string][]string) *[]byte {
    for keyPath, valueList := range *jsonKeyValue {
        var builder strings.Builder
        for _, valuePath := range valueList {
            property, err := GetProperty(*msg, valuePath)
            if err == nil {
                s := fmt.Sprintf("%v", property)
                builder.WriteString(s)
            } else {
                s := fmt.Sprintf("%v", valuePath)
                builder.WriteString(s)
            }
        }
        UpdateProperty(*msg, keyPath, builder.String())
    }
    b, _ := json.Marshal(msg)
    return &b
}
func BenchmarkReplaceMapMsg(b *testing.B) {
    msg := map[string]interface{}{"name": map[string]interface{}{"first": "Janet", "last": "Prichard"}}
    j := map[string][]string{"name.first": {"name.first", "_", "name.last"}}
    for i := 0; i < b.N; i++ {
        ReplaceMapMsg(&msg, &j)
    }
}

结果大吃一惊!
反向优化了!!!

rikasai@huacainoMBP handlers % go test -bench=BenchmarkReplaceMapMsg -benchtime=5s -cpuprofile mapCpu.out 
goos: darwin
goarch: amd64
pkg: BigDataTestTool/handlers
BenchmarkReplaceMapMsg-12          41468            419347 ns/op
PASS
ok      BigDataTestTool/handlers        19.071s
rikasai@huacainoMBP handlers % go tool pprof -http=:8080 mapCpu.out Serving web UI on http://localhost:8080pprof

一语惊醒梦中人~
json.Marshal
原来是这个家伙占了大头
通过map替换是很快,但终究还是要序列化成
[]byte
类型才能发送kafka消息
那用
replaceJsonMsg
是事先序列化好了,怎么还会那么慢呢?
    for begin <= end {
        deviceNo := fmt.Sprintf("%v%v", b.DevicePrefix, begin)
        for _, m := range b.MsgPayload {
            var msg string
            if b.MsgType == "json" {
                m["deviceNo"] = deviceNo
                marshal, _ := json.Marshal(m)
                s := string(marshal)
                msg = replaceJsonMsg(s, b.JsonKeyValue)
            } 
            p.Produce(&kafka.Message{
                TopicPartition: kafka.TopicPartition{Topic: &t, Partition: kafka.PartitionAny},
                Value:          []byte(msg),
            }, nil)
        }
        if (end-begin)*int64(len(b.MsgPayload))%100000 == 0 {
            p.Flush(5 * 1000)
        }
        begin++
    }
json.Marshal(m)
4.优化结果
// 先序列化,下面用到的时候,只需要迭代strMsg这个切片取出即可
    strMsg := make([]string, 0, len(b.MsgPayload))  // 定义切片要小心,要先指定好容量,否则append会触发自动扩容
    if b.MsgType == "json" {
        for _, m := range b.MsgPayload {
            marshal, _ := json.Marshal(m)
            s := string(marshal)
            strMsg = append(strMsg, s)
        }
    }

    for begin <= end {
                deviceNo := fmt.Sprintf("%v%v", b.DevicePrefix, begin)
        var msg string
        if b.MsgType == "json" {
            for _, s := range strMsg {
                msg = replaceJsonMsg(s, deviceNo, b.JsonKeyValue)
                p.Produce(&kafka.Message{
                    TopicPartition: kafka.TopicPartition{Topic: &t, Partition: kafka.PartitionAny},
                    Value:          []byte(msg),
                }, nil)
            }
        }

优化前发送100w条消息,耗时将近56s

{
    "success_count": 1000001,
    "fail_count": 0,
    "parse_fail": [],
    "elapsed": "55.874803s",
    "msg": "success"
}

优化后发送100w条消息,耗时将近18s,提升了三倍的性能!

{
    "success_count": 1000001,
    "fail_count": 0,
    "parse_fail": [],
    "elapsed": "17.823006273s",
    "msg": "success"
}