grpc压测工具golang实现的版本ghz:
https://github.com/bojand/ghz
通过对源码进行分析,实现了类似mysqlap的mysql压测工具:
https://github.com/xiazemin/mysqlslap-go
当然,在源码分析的过程中也发现了ghz的一个bug:
https://github.com/bojand/ghz/pull/323/files
ghz的使用:
./ghz --skipTLS --insecure --protoset ./bundle.protoset \
-B ./grpc_payload --call tensorflow.serving.PredictionService/Predict \
127.0.0.1:8500
具体用法可以参考wiki:https://ghz.sh/docs/examples,也可以使用-D参数来加载json格式的参数list,请求的时候会通过round robin的形式依次取对应数据,下面我们开始看下源码:
入口是:cmd/ghz/main.go,首先是参数解析:
isCallSet = false
call = kingpin.Flag("call", `A fully-qualified method name in 'package.Service/method' or 'package.Service.Method' format.`).
PlaceHolder(" ").IsSetByUser(&isCallSet).String()
// Concurrency
isCSet = false
c = kingpin.Flag("concurrency", "Number of request workers to run concurrently for const concurrency schedule. Default is 50.").
Short('c').Default("50").IsSetByUser(&isCSet).Uint()
isCPUSet = false
cpus = kingpin.Flag("cpus", "Number of cpu cores to use.").
Default(strconv.FormatUint(uint64(nCPUs), 10)).IsSetByUser(&isCPUSet).Uint()
解析完命令行参数,就开始执行命令:
report, err := runner.Run(cfg.Call, cfg.Host, options...)
具体代码实现在runner/run.go
func Run(call, host string, options ...Option) (*Report, error) {
c, err := NewConfig(call, host, options...)
oldCPUs := runtime.NumCPU()
runtime.GOMAXPROCS(c.cpus)
defer runtime.GOMAXPROCS(oldCPUs)
reqr, err := NewRequester(c)
mtd, err = protodesc.GetMethodDescFromProto(c.call, c.proto, c.importPaths)
mtd, err = protodesc.GetMethodDescFromProtoSet(c.call, c.protoset)
cc, err = reqr.newClientConn(false)
refClient := grpcreflect.NewClient(refCtx, reflectpb.NewServerReflectionClient(cc))
mtd, err = protodesc.GetMethodDescFromReflect(c.call, refClient)
go func() {
<-cancel
reqr.Stop(ReasonCancel)
}()
if c.z > 0 {
go func() {
time.Sleep(c.z)
reqr.Stop(ReasonTimeout)
}()
}
rep, err := reqr.Run()
通过配置初始化option,获取对应请求,如果没有指定proto文件或者proto文件目录,通过grpc reflection 获取指定方法的describe。
runner/options.go里定义压测的选项参数:
func NewConfig(call, host string, options ...Option) (*RunConfig, error)
type RunConfig struct {
// call settings
call string
host string
proto string
importPaths []string
protoset string
enableCompression bool
// security settings
creds credentials.TransportCredentials
cacert string
cert string
key string
cname string
skipVerify bool
insecure bool
authority string
// load
rps int
loadStart uint
loadEnd uint
loadStep int
loadSchedule string
loadDuration time.Duration
loadStepDuration time.Duration
pacer load.Pacer
// concurrency
c int
cStart uint
cEnd uint
cStep int
cSchedule string
cMaxDuration time.Duration
cStepDuration time.Duration
workerTicker load.WorkerTicker
// test
n int
async bool
// number of connections
nConns int
// timeouts
z time.Duration
timeout time.Duration
dialTimeout time.Duration
keepaliveTime time.Duration
zstop string
streamInterval time.Duration
streamCallDuration time.Duration
streamCallCount uint
streamDynamicMessages bool
// lbStrategy
lbStrategy string
// TODO consolidate these actual value fields to be implemented via provider funcs
// data & metadata
data []byte
metadata []byte
binary bool
dataFunc BinaryDataFunc
dataProviderFunc DataProviderFunc
dataStreamFunc StreamMessageProviderFunc
mdProviderFunc MetadataProviderFunc
funcs template.FuncMap
// reflection metadata
rmd map[string]string
// debug
hasLog bool
log Logger
// misc
name string
cpus int
tags []byte
skipFirst int
countErrors bool
recvMsgFunc StreamRecvMsgInterceptFunc
}
在runner/requester.go里处理参数的组装和发送请求相关的工作:
func NewRequester(c *RunConfig) (*Requester, error)
md := mtd.GetInputType()
payloadMessage := dynamic.NewMessage(md)
reqr.dataProvider = c.dataProviderFunc
defaultDataProvider, err := newDataProvider(reqr.mtd, c.binary, c.dataFunc, c.data, c.funcs)
reqr.metadataProvider = c.mdProviderFunc
defaultMDProvider, err := newMetadataProvider(reqr.mtd, c.metadata, c.funcs)
request的定义如下
type Requester struct {
conns []*grpc.ClientConn
stubs []grpcdynamic.Stub
handlers []*statsHandler
mtd *desc.MethodDescriptor
reporter *Reporter
config *RunConfig
results chan *callResult
stopCh chan bool
start time.Time
dataProvider DataProviderFunc
metadataProvider MetadataProviderFunc
lock sync.Mutex
stopReason StopReason
workers []*Worker
}
Run函数是核心函数,发送请求,记录返回结果,并且最终会返回一个报告信息:
func (b *Requester) Run() (*Report, error)
cc, err := b.openClientConns()
start := time.Now()
stub := grpcdynamic.NewStub(cc[n])
b.reporter = newReporter(b.results, b.config)
go func() {
b.reporter.Run()
}()
wt := createWorkerTicker(b.config)
p := createPacer(b.config)
err = b.runWorkers(wt, p)
report := b.Finish()
b.closeClientConns()
打开grpc连接请求的链路如下:
func (b *Requester) openClientConns() ([]*grpc.ClientConn, error)
c, err := b.newClientConn(true)
func (b *Requester) newClientConn(withStatsHandler bool) (*grpc.ClientConn, error)
return grpc.DialContext(ctx, b.config.host, opts...)
创建定时器,定时将返回结果写入一个chan里面,供reporter消费:
func createWorkerTicker(config *RunConfig) load.WorkerTicker
func createPacer(config *RunConfig) load.Pacer
runWorkers执行最后的具体工作,最终调用w.runWorker():
func (b *Requester) runWorkers(wt load.WorkerTicker, p load.Pacer) error
wct := wt.Ticker()
go func()
wt.Run()
}()
go func() {
for tv := range wct {
b.workers = append(b.workers, &w)
go func() {
errC <- w.runWorker()
}()
for _, wrk := range b.workers {
wrk.Stop()
go func() {
b.workers[i].Stop()
for {
wait, stop := p.Pace(time.Since(began), counter.Get())
Finish渲染压测报告:
func (b *Requester) Finish() *Report
total := time.Since(b.start)
<-b.reporter.done
return b.reporter.Finalize(r, total)
中间需要做json到proto的转换,代码在protodesc/protodesc.go
func GetMethodDescFromReflect(call string, client *grpcreflect.Client) (*desc.MethodDescriptor, error)
call = strings.Replace(call, "/", ".", -1)
file, err := client.FileContainingSymbol(call)
return getMethodDesc(call, files)
func getMethodDesc(call string, files map[string]*desc.FileDescriptor) (*desc.MethodDescriptor, error)
svc, mth, err := parseServiceMethod(call)
dsc, err := findServiceSymbol(files, svc)
mtd := sd.FindMethodByName(mth)
func GetMethodDescFromProto(call, proto string, imports []string) (*desc.MethodDescriptor, error)
fds, err := p.ParseFiles(filename)
return getMethodDesc(call, files)
最终的数据格式runner/data.go
func newDataProvider(mtd *desc.MethodDescriptor,
binary bool, dataFunc BinaryDataFunc, data []byte,
funcs template.FuncMap) (*dataProvider, error)
ctd := newCallData(mtd, funcs, "", 0)
ha, err := ctd.hasAction(string(dp.data))
type dataProvider struct {
binary bool
data []byte
mtd *desc.MethodDescriptor
dataFunc BinaryDataFunc
arrayJSONData []string
hasActions bool
// cached messages only for binary
mutex sync.RWMutex
cachedMessages []*dynamic.Message
}
在runner/calldata.go里组装最终的请求上下文信息:
type CallData struct {
WorkerID string // unique worker ID
RequestNumber int64 // unique incremented request number for each request
FullyQualifiedName string // fully-qualified name of the method call
MethodName string // shorter call method name
ServiceName string // the service name
InputName string // name of the input message type
OutputName string // name of the output message type
IsClientStreaming bool // whether this call is client streaming
IsServerStreaming bool // whether this call is server streaming
Timestamp string // timestamp of the call in RFC3339 format
TimestampUnix int64 // timestamp of the call as unix time in seconds
TimestampUnixMilli int64 // timestamp of the call as unix time in milliseconds
TimestampUnixNano int64 // timestamp of the call as unix time in nanoseconds
UUID string // generated UUIDv4 for each call
t *template.Template
}
func newCallData(
mtd *desc.MethodDescriptor,
funcs template.FuncMap,
workerID string, reqNum int64) *CallData
定时器实现在load/worker_ticker.go
type WorkerTicker interface {
// Ticker returns a channel which sends TickValues
// When a value is received the number of workers should be appropriately
// increased or decreased given by the delta property.
Ticker() <-chan TickValue
// Run starts the worker ticker
Run()
// Finish closes the channel
Finish()
}
load/pacer.go里定义了分发任务的接口:
type Pacer interface {
// Pace returns the duration the attacker should wait until
// making next hit, given an already elapsed duration and
// completed hits. If the second return value is true, an attacker
// should stop sending hits.
Pace(elapsed time.Duration, hits uint64) (wait time.Duration, stop bool)
// Rate returns a Pacer's instantaneous hit rate (per seconds)
// at the given elapsed duration of an attack.
Rate(elapsed time.Duration) float64
}
最终完成请求调用的是runner/worker.go
type Worker struct {
stub grpcdynamic.Stub
mtd *desc.MethodDescriptor
config *RunConfig
workerID string
active bool
stopCh chan bool
ticks <-chan TickValue
dataProvider DataProviderFunc
metadataProvider MetadataProviderFunc
msgProvider StreamMessageProviderFunc
streamRecv StreamRecvMsgInterceptFunc
}
发送请求:
func (w *Worker) runWorker() error
g := new(errgroup.Group)
for {
return g.Wait()
g.Go(func() error {
return w.makeRequest(tv)
})
rErr := w.makeRequest(tv)
组装请求参数
func (w *Worker) makeRequest(tv TickValue) error
ctd := newCallData(w.mtd, w.config.funcs, w.workerID, reqNum)
inputs, err := w.dataProvider(ctd)
mp, err := newDynamicMessageProvider(w.mtd, w.config.data, w.config.streamCallCount)
mp, err := newStaticMessageProvider(w.config.streamCallCount, inputs)
_ = w.makeBidiRequest(&ctx, ctd, msgProvider)
_ = w.makeClientStreamingRequest(&ctx, ctd, msgProvider)
_ = w.makeServerStreamingRequest(&ctx, inputs[0])
_ = w.makeUnaryRequest(&ctx, reqMD, inputs[0])
InvokeRpc请求的地方:
func (w *Worker) makeUnaryRequest(ctx *context.Context, reqMD *metadata.MD, input *dynamic.Message) error
res, resErr = w.stub.InvokeRpc(*ctx, w.mtd, input, callOptions...)
中间的group是对waitgroup的一个简单封装:
pkg/mod/golang.org/x/sync@v0.0.0-20200625203802-6e8e738ad208/errgroup/errgroup.go
func (g *Group) Go(f func() error)
g.wg.Add(1)
defer g.wg.Done()
压测报告的实现代码在:runner/reporter.go
func newReporter(results chan *callResult, c *RunConfig) *Reporter
type Reporter struct {
config *RunConfig
results chan *callResult
done chan bool
totalLatenciesSec float64
details []ResultDetail
errorDist map[string]int
statusCodeDist map[string]int
totalCount uint64
}
最大允许存储的结果是写死的
const maxResult = 1000000
func (r *Reporter) Run()
for res := range r.results
r.totalCount++
r.totalLatenciesSec += res.duration.Seconds()
r.details = append(r.details, ResultDetail{
Latency: res.duration,
Timestamp: res.timestamp,
Status: res.status,
Error: errStr,
})
type ResultDetail struct {
Timestamp time.Time `json:"timestamp"`
Latency time.Duration `json:"latency"`
Error string `json:"error"`
Status string `json:"status"`
}
生成最终的报告
func (r *Reporter) Finalize(stopReason StopReason, total time.Duration) *Report
rep := &Report
rep.Options = Options
if len(r.details) > 0
average := r.totalLatenciesSec float64(r.totalCount)
rep.Average = time.Duration(average * float64(time.Second))
rep.Rps = float64(r.totalCount) total.Seconds()
for _, d := range r.details {
okLats = append(okLats, d.Latency.Seconds())
sort.Float64s(okLats)
fastestNum = okLats[0]
slowestNum = okLats[len(okLats)-1]
rep.Histogram = histogram(okLats, slowestNum, fastestNum)
rep.LatencyDistribution = latencies(okLats)
报告的内容:
type Report struct {
Name string `json:"name,omitempty"`
EndReason StopReason `json:"endReason,omitempty"`
Options Options `json:"options,omitempty"`
Date time.Time `json:"date"`
Count uint64 `json:"count"`
Total time.Duration `json:"total"`
Average time.Duration `json:"average"`
Fastest time.Duration `json:"fastest"`
Slowest time.Duration `json:"slowest"`
Rps float64 `json:"rps"`
ErrorDist map[string]int `json:"errorDistribution"`
StatusCodeDist map[string]int `json:"statusCodeDistribution"`
LatencyDistribution []LatencyDistribution `json:"latencyDistribution"`
Histogram []Bucket `json:"histogram"`
Details []ResultDetail `json:"details"`
Tags map[string]string `json:"tags,omitempty"`
}
计算直方图:
func histogram(latencies []float64, slowest, fastest float64) []Bucket
bc := 10
buckets := make([]float64, bc+1)
counts := make([]int, bc+1)
bs := (slowest - fastest) float64(bc)
for i := 0; i < bc; i++ {
buckets[i] = fastest + bs*float64(i)
}
buckets[bc] = slowest
var bi int
var max int
for i := 0; i < len(latencies); {
if latencies[i] <= buckets[bi] {
i++
counts[bi]++
if max < counts[bi] {
max = counts[bi]
}
} else if bi < len(buckets)-1 {
bi++
}
}
Mark: buckets[i],
Count: counts[i],
Frequency: float64(counts[i]) float64(len(latencies)),
计算耗时分布:
func latencies(latencies []float64) []LatencyDistribution
pctls := []int{10, 25, 50, 75, 90, 95, 99}
data := make([]float64, len(pctls))
lt := float64(len(latencies))
for i, p := range pctls {
ip := (float64(p) 100.0) * lt
di := int(ip)
if ip == float64(di) {
di = di - 1
}
if di < 0 {
di = 0
}
data[i] = latencies[di]
res := make([]LatencyDistribution, len(pctls))
for i := 0; i < len(pctls); i++ {
if data[i] > 0 {
lat := time.Duration(data[i] * float64(time.Second))
res[i] = LatencyDistribution{Percentage: pctls[i], Latency: lat}
就是将排序好的延迟按照百分比切割
如果对mysql压测工具感兴趣可以体验下:
https://github.com/xiazemin/mysqlslap-go
useage:
mysqlslap -Hhost -uuser -ppassword -P3306 -ddatabase -q"select id from deviceattr where name='attr10' or name='attr20' group by id;" -ffilename -c 50 -i 100
输出报告:
Summary:
Name: mysqlslap
Count: 40
Total: 1.01 s
Slowest: 1.00 s
Fastest: 1.01 s
Average: 1.00 s
Requests/sec: 39.76
Response time histogram(ms):
1003.000 [1] |∎∎
1003.300 [0] |
1003.600 [0] |
1003.900 [0] |
1004.200 [24] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
1004.500 [0] |
1004.800 [0] |
1005.100 [12] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
1005.400 [0] |
1005.700 [0] |
1006.000 [3] |∎∎∎∎∎
Latency distribution:
10 % in 1.00 s
25 % in 1.00 s
50 % in 1.00 s
75 % in 1.00 s
90 % in 1.00 s
95 % in 1.01 s
99 % in 1.01 s
Status code distribution:
[success] 40 responses
[failed] 0 responses