服务升级,需要对kafka消息持久化服务进行压测,预计每分钟要产生消息400w条。
目前使用Golang实现了批量发送kafka消息的接口,但100w条消息就要还是50s多,无法满足需求,因此需要对发送kafka接口进行性能调优
2.1发送的消息量是否已经达到了网络io的瓶颈
经过测试,本地调试时,确实存在这个问题。
在服务器上调试,则可以避免这个问题
2.2发送kafka接口的实现存在性能问题
- 组装kafka消息的逻辑(BenchMark测试也没啥问题)
// 使用jsonpath替换某个字段
// source {"name":{"first":"Janet","last":"Prichard"}}
// jsonKeyValue {"name.first": {"name.first", "_", "name.last"}}
// want {"name":{"first":"Janet_Prichard","last":"Prichard"}}
func replaceJsonMsg(source string, jsonKeyValue map[string][]string) string {
for keyPath, valueList := range jsonKeyValue {
var builder strings.Builder
for _, valuePath := range valueList {
value := gjson.Get(source, valuePath)
if value.Exists() {
builder.WriteString(value.Str)
} else {
builder.WriteString(valuePath)
}
}
source, _ = sjson.Set(source, keyPath, builder.String())
}
return source
}
rikasai@huacainoMBP handlers % go test -bench=BenchmarkReplaceJsonMsg -benchtime=5s -cpuprofile jsonCpu.out
goos: darwin
goarch: amd64
pkg: BigDataTestTool/handlers
BenchmarkReplaceJsonMsg-12 7363956 814 ns/op
PASS
rikasai@huacainoMBP handlers % go tool pprof -http=:8081 jsonCpu.out
- 发送kafka的第三方库有问题(经过测试,直接发消息,不经过任何处理时,第三方库能满足需求)
2.3kafka服务端配置存在问题
- 经研发大佬确认过,配置正常
replaceJsonMsgstringmapreplaceMapMsg
func replaceMapMsg(msg *map[string]interface{}, jsonKeyValue *map[string][]string) *[]byte {
for keyPath, valueList := range *jsonKeyValue {
var builder strings.Builder
for _, valuePath := range valueList {
property, err := GetProperty(*msg, valuePath)
if err == nil {
s := fmt.Sprintf("%v", property)
builder.WriteString(s)
} else {
s := fmt.Sprintf("%v", valuePath)
builder.WriteString(s)
}
}
UpdateProperty(*msg, keyPath, builder.String())
}
b, _ := json.Marshal(msg)
return &b
}
func BenchmarkReplaceMapMsg(b *testing.B) {
msg := map[string]interface{}{"name": map[string]interface{}{"first": "Janet", "last": "Prichard"}}
j := map[string][]string{"name.first": {"name.first", "_", "name.last"}}
for i := 0; i < b.N; i++ {
ReplaceMapMsg(&msg, &j)
}
}
结果大吃一惊!
反向优化了!!!
rikasai@huacainoMBP handlers % go test -bench=BenchmarkReplaceMapMsg -benchtime=5s -cpuprofile mapCpu.out
goos: darwin
goarch: amd64
pkg: BigDataTestTool/handlers
BenchmarkReplaceMapMsg-12 41468 419347 ns/op
PASS
ok BigDataTestTool/handlers 19.071s
rikasai@huacainoMBP handlers % go tool pprof -http=:8080 mapCpu.out Serving web UI on http://localhost:8080pprof
一语惊醒梦中人~
json.Marshal
原来是这个家伙占了大头通过map替换是很快,但终究还是要序列化成
[]byte
类型才能发送kafka消息那用
replaceJsonMsg
是事先序列化好了,怎么还会那么慢呢?
for begin <= end {
deviceNo := fmt.Sprintf("%v%v", b.DevicePrefix, begin)
for _, m := range b.MsgPayload {
var msg string
if b.MsgType == "json" {
m["deviceNo"] = deviceNo
marshal, _ := json.Marshal(m)
s := string(marshal)
msg = replaceJsonMsg(s, b.JsonKeyValue)
}
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &t, Partition: kafka.PartitionAny},
Value: []byte(msg),
}, nil)
}
if (end-begin)*int64(len(b.MsgPayload))%100000 == 0 {
p.Flush(5 * 1000)
}
begin++
}
json.Marshal(m)
4.优化结果
// 先序列化,下面用到的时候,只需要迭代strMsg这个切片取出即可
strMsg := make([]string, 0, len(b.MsgPayload)) // 定义切片要小心,要先指定好容量,否则append会触发自动扩容
if b.MsgType == "json" {
for _, m := range b.MsgPayload {
marshal, _ := json.Marshal(m)
s := string(marshal)
strMsg = append(strMsg, s)
}
}
for begin <= end {
deviceNo := fmt.Sprintf("%v%v", b.DevicePrefix, begin)
var msg string
if b.MsgType == "json" {
for _, s := range strMsg {
msg = replaceJsonMsg(s, deviceNo, b.JsonKeyValue)
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &t, Partition: kafka.PartitionAny},
Value: []byte(msg),
}, nil)
}
}
优化前发送100w条消息,耗时将近56s
{
"success_count": 1000001,
"fail_count": 0,
"parse_fail": [],
"elapsed": "55.874803s",
"msg": "success"
}
优化后发送100w条消息,耗时将近18s,提升了三倍的性能!
{
"success_count": 1000001,
"fail_count": 0,
"parse_fail": [],
"elapsed": "17.823006273s",
"msg": "success"
}