grpc压测工具golang实现的版本ghz:

https://github.com/bojand/ghz

通过对源码进行分析,实现了类似mysqlap的mysql压测工具:

https://github.com/xiazemin/mysqlslap-go

当然,在源码分析的过程中也发现了ghz的一个bug:

https://github.com/bojand/ghz/pull/323/files

ghz的使用:

./ghz --skipTLS --insecure --protoset ./bundle.protoset \
-B ./grpc_payload --call tensorflow.serving.PredictionService/Predict \
127.0.0.1:8500

具体用法可以参考wiki:https://ghz.sh/docs/examples,也可以使用-D参数来加载json格式的参数list,请求的时候会通过round robin的形式依次取对应数据,下面我们开始看下源码:

        入口是:cmd/ghz/main.go,首先是参数解析:

isCallSet = false
call = kingpin.Flag("call", `A fully-qualified method name in 'package.Service/method' or 'package.Service.Method' format.`).
PlaceHolder(" ").IsSetByUser(&isCallSet).String()
// Concurrency
isCSet = false
c = kingpin.Flag("concurrency", "Number of request workers to run concurrently for const concurrency schedule. Default is 50.").
Short('c').Default("50").IsSetByUser(&isCSet).Uint()
isCPUSet = false
cpus = kingpin.Flag("cpus", "Number of cpu cores to use.").
Default(strconv.FormatUint(uint64(nCPUs), 10)).IsSetByUser(&isCPUSet).Uint()

解析完命令行参数,就开始执行命令:

report, err := runner.Run(cfg.Call, cfg.Host, options...)

具体代码实现在runner/run.go

func Run(call, host string, options ...Option) (*Report, error) {
    c, err := NewConfig(call, host, options...)
    oldCPUs := runtime.NumCPU()
    runtime.GOMAXPROCS(c.cpus)
defer runtime.GOMAXPROCS(oldCPUs)

reqr, err := NewRequester(c)
mtd, err = protodesc.GetMethodDescFromProto(c.call, c.proto, c.importPaths)
mtd, err = protodesc.GetMethodDescFromProtoSet(c.call, c.protoset)
cc, err = reqr.newClientConn(false)
refClient := grpcreflect.NewClient(refCtx, reflectpb.NewServerReflectionClient(cc))
mtd, err = protodesc.GetMethodDescFromReflect(c.call, refClient)

     go func() {
<-cancel
reqr.Stop(ReasonCancel)
}()

    if c.z > 0 {
go func() {
time.Sleep(c.z)
reqr.Stop(ReasonTimeout)
}()
}
rep, err := reqr.Run()  

           通过配置初始化option,获取对应请求,如果没有指定proto文件或者proto文件目录,通过grpc reflection 获取指定方法的describe。

            runner/options.go里定义压测的选项参数:

func NewConfig(call, host string, options ...Option) (*RunConfig, error)
type RunConfig struct {
// call settings
call string
host string
proto string
importPaths []string
protoset string
enableCompression bool




// security settings
creds credentials.TransportCredentials
cacert string
cert string
key string
cname string
skipVerify bool
insecure bool
authority string




// load
rps int
loadStart uint
loadEnd uint
loadStep int
loadSchedule string
loadDuration time.Duration
loadStepDuration time.Duration




pacer load.Pacer




// concurrency
c int
cStart uint
cEnd uint
cStep int
cSchedule string
cMaxDuration time.Duration
cStepDuration time.Duration




workerTicker load.WorkerTicker




// test
n int
async bool




// number of connections
nConns int




// timeouts
z time.Duration
timeout time.Duration
dialTimeout time.Duration
keepaliveTime time.Duration




zstop string




streamInterval time.Duration
streamCallDuration time.Duration
streamCallCount uint
streamDynamicMessages bool




// lbStrategy
lbStrategy string




// TODO consolidate these actual value fields to be implemented via provider funcs
// data & metadata
data []byte
metadata []byte
binary bool




dataFunc BinaryDataFunc
dataProviderFunc DataProviderFunc
dataStreamFunc StreamMessageProviderFunc
mdProviderFunc MetadataProviderFunc




funcs template.FuncMap




// reflection metadata
rmd map[string]string




// debug
hasLog bool
log Logger




// misc
name string
cpus int
tags []byte
skipFirst int
countErrors bool
recvMsgFunc StreamRecvMsgInterceptFunc
}

在runner/requester.go里处理参数的组装和发送请求相关的工作:

func NewRequester(c *RunConfig) (*Requester, error) 
      md := mtd.GetInputType()
      payloadMessage := dynamic.NewMessage(md)
reqr.dataProvider = c.dataProviderFunc
defaultDataProvider, err := newDataProvider(reqr.mtd, c.binary, c.dataFunc, c.data, c.funcs)
reqr.metadataProvider = c.mdProviderFunc
defaultMDProvider, err := newMetadataProvider(reqr.mtd, c.metadata, c.funcs)

request的定义如下

type Requester struct {


conns []*grpc.ClientConn
stubs []grpcdynamic.Stub
handlers []*statsHandler




mtd *desc.MethodDescriptor
reporter *Reporter




config *RunConfig




results chan *callResult
stopCh chan bool
start time.Time




dataProvider DataProviderFunc
metadataProvider MetadataProviderFunc




lock sync.Mutex
stopReason StopReason
workers []*Worker
}

     Run函数是核心函数,发送请求,记录返回结果,并且最终会返回一个报告信息:

func (b *Requester) Run() (*Report, error) 
cc, err := b.openClientConns()
start := time.Now()
stub := grpcdynamic.NewStub(cc[n])
b.reporter = newReporter(b.results, b.config)
go func() {
b.reporter.Run()
}()
wt := createWorkerTicker(b.config)
p := createPacer(b.config)
err = b.runWorkers(wt, p)
report := b.Finish()
b.closeClientConns()

打开grpc连接请求的链路如下:

func (b *Requester) openClientConns() ([]*grpc.ClientConn, error) 


c, err := b.newClientConn(true)
func (b *Requester) newClientConn(withStatsHandler bool) (*grpc.ClientConn, error) 


return grpc.DialContext(ctx, b.config.host, opts...)

创建定时器,定时将返回结果写入一个chan里面,供reporter消费:

func createWorkerTicker(config *RunConfig) load.WorkerTicker 
func createPacer(config *RunConfig) load.Pacer 

runWorkers执行最后的具体工作,最终调用w.runWorker():

func (b *Requester) runWorkers(wt load.WorkerTicker, p load.Pacer) error 
   wct := wt.Ticker()
  go func()
wt.Run()
}()
go func() {
for tv := range wct {
b.workers = append(b.workers, &w)
                 go func() {
errC <- w.runWorker()
}()
for _, wrk := range b.workers {
wrk.Stop()

go func() {
b.workers[i].Stop()
for {
wait, stop := p.Pace(time.Since(began), counter.Get())

      Finish渲染压测报告:

func (b *Requester) Finish() *Report 


total := time.Since(b.start)
<-b.reporter.done
return b.reporter.Finalize(r, total)

中间需要做json到proto的转换,代码在protodesc/protodesc.go

func GetMethodDescFromReflect(call string, client *grpcreflect.Client) (*desc.MethodDescriptor, error)
call = strings.Replace(call, "/", ".", -1)
file, err := client.FileContainingSymbol(call)
return getMethodDesc(call, files)
func getMethodDesc(call string, files map[string]*desc.FileDescriptor) (*desc.MethodDescriptor, error)


svc, mth, err := parseServiceMethod(call)
dsc, err := findServiceSymbol(files, svc)
mtd := sd.FindMethodByName(mth)
func GetMethodDescFromProto(call, proto string, imports []string) (*desc.MethodDescriptor, error)


fds, err := p.ParseFiles(filename)
return getMethodDesc(call, files)

最终的数据格式runner/data.go

func newDataProvider(mtd *desc.MethodDescriptor,
binary bool, dataFunc BinaryDataFunc, data []byte,
funcs template.FuncMap) (*dataProvider, error)
ctd := newCallData(mtd, funcs, "", 0)
ha, err := ctd.hasAction(string(dp.data))
type dataProvider struct {


binary bool
data []byte
mtd *desc.MethodDescriptor
dataFunc BinaryDataFunc




arrayJSONData []string
hasActions bool




// cached messages only for binary
mutex sync.RWMutex
cachedMessages []*dynamic.Message
}

在runner/calldata.go里组装最终的请求上下文信息:

type CallData struct {
WorkerID string // unique worker ID
RequestNumber int64 // unique incremented request number for each request
FullyQualifiedName string // fully-qualified name of the method call
MethodName string // shorter call method name
ServiceName string // the service name
InputName string // name of the input message type
OutputName string // name of the output message type
IsClientStreaming bool // whether this call is client streaming
IsServerStreaming bool // whether this call is server streaming
Timestamp string // timestamp of the call in RFC3339 format
TimestampUnix int64 // timestamp of the call as unix time in seconds
TimestampUnixMilli int64 // timestamp of the call as unix time in milliseconds
TimestampUnixNano int64 // timestamp of the call as unix time in nanoseconds
UUID string // generated UUIDv4 for each call




t *template.Template
}
func newCallData(


mtd *desc.MethodDescriptor,
funcs template.FuncMap,
workerID string, reqNum int64) *CallData

定时器实现在load/worker_ticker.go

type WorkerTicker interface {
// Ticker returns a channel which sends TickValues
// When a value is received the number of workers should be appropriately
// increased or decreased given by the delta property.
Ticker() <-chan TickValue




// Run starts the worker ticker
Run()




// Finish closes the channel
Finish()
}

load/pacer.go里定义了分发任务的接口:

type Pacer interface {
// Pace returns the duration the attacker should wait until
// making next hit, given an already elapsed duration and
// completed hits. If the second return value is true, an attacker
// should stop sending hits.
Pace(elapsed time.Duration, hits uint64) (wait time.Duration, stop bool)




// Rate returns a Pacer's instantaneous hit rate (per seconds)
// at the given elapsed duration of an attack.
Rate(elapsed time.Duration) float64
}

最终完成请求调用的是runner/worker.go

type Worker struct {
stub grpcdynamic.Stub
mtd *desc.MethodDescriptor




config *RunConfig
workerID string
active bool
stopCh chan bool
ticks <-chan TickValue




dataProvider DataProviderFunc
metadataProvider MetadataProviderFunc
msgProvider StreamMessageProviderFunc




streamRecv StreamRecvMsgInterceptFunc
}

发送请求:

func (w *Worker) runWorker() error


g := new(errgroup.Group)
for {
return g.Wait()
g.Go(func() error {
return w.makeRequest(tv)
})
rErr := w.makeRequest(tv)

组装请求参数

func (w *Worker) makeRequest(tv TickValue) error


ctd := newCallData(w.mtd, w.config.funcs, w.workerID, reqNum)
inputs, err := w.dataProvider(ctd)
mp, err := newDynamicMessageProvider(w.mtd, w.config.data, w.config.streamCallCount)
mp, err := newStaticMessageProvider(w.config.streamCallCount, inputs)
_ = w.makeBidiRequest(&ctx, ctd, msgProvider)
_ = w.makeClientStreamingRequest(&ctx, ctd, msgProvider)
_ = w.makeServerStreamingRequest(&ctx, inputs[0])
_ = w.makeUnaryRequest(&ctx, reqMD, inputs[0])

InvokeRpc请求的地方:

func (w *Worker) makeUnaryRequest(ctx *context.Context, reqMD *metadata.MD, input *dynamic.Message) error


res, resErr = w.stub.InvokeRpc(*ctx, w.mtd, input, callOptions...)

       中间的group是对waitgroup的一个简单封装:

pkg/mod/golang.org/x/sync@v0.0.0-20200625203802-6e8e738ad208/errgroup/errgroup.go

func (g *Group) Go(f func() error) 
g.wg.Add(1)
defer g.wg.Done()

压测报告的实现代码在:runner/reporter.go

func newReporter(results chan *callResult, c *RunConfig) *Reporter  
type Reporter struct {


config *RunConfig




results chan *callResult
done chan bool




totalLatenciesSec float64




details []ResultDetail




errorDist map[string]int
statusCodeDist map[string]int
totalCount uint64
}

最大允许存储的结果是写死的

  const maxResult = 1000000
func (r *Reporter) Run()


for res := range r.results
r.totalCount++
r.totalLatenciesSec += res.duration.Seconds()
r.details = append(r.details, ResultDetail{
Latency: res.duration,
Timestamp: res.timestamp,
Status: res.status,
Error: errStr,
})
type ResultDetail struct {


Timestamp time.Time `json:"timestamp"`
Latency time.Duration `json:"latency"`
Error string `json:"error"`
Status string `json:"status"`
}

生成最终的报告

func (r *Reporter) Finalize(stopReason StopReason, total time.Duration) *Report


rep := &Report
rep.Options = Options
if len(r.details) > 0
average := r.totalLatenciesSec float64(r.totalCount)
rep.Average = time.Duration(average * float64(time.Second))
rep.Rps = float64(r.totalCount) total.Seconds()
for _, d := range r.details {
okLats = append(okLats, d.Latency.Seconds())
sort.Float64s(okLats)
fastestNum = okLats[0]
slowestNum = okLats[len(okLats)-1]
rep.Histogram = histogram(okLats, slowestNum, fastestNum)
rep.LatencyDistribution = latencies(okLats)

报告的内容:

type Report struct {


Name string `json:"name,omitempty"`
EndReason StopReason `json:"endReason,omitempty"`




Options Options `json:"options,omitempty"`
Date time.Time `json:"date"`




Count uint64 `json:"count"`
Total time.Duration `json:"total"`
Average time.Duration `json:"average"`
Fastest time.Duration `json:"fastest"`
Slowest time.Duration `json:"slowest"`
Rps float64 `json:"rps"`




ErrorDist map[string]int `json:"errorDistribution"`
StatusCodeDist map[string]int `json:"statusCodeDistribution"`




LatencyDistribution []LatencyDistribution `json:"latencyDistribution"`
Histogram []Bucket `json:"histogram"`
Details []ResultDetail `json:"details"`




Tags map[string]string `json:"tags,omitempty"`
}

 计算直方图:

func histogram(latencies []float64, slowest, fastest float64) []Bucket 
bc := 10
buckets := make([]float64, bc+1)
counts := make([]int, bc+1)
bs := (slowest - fastest) float64(bc)
for i := 0; i < bc; i++ {
buckets[i] = fastest + bs*float64(i)
}
buckets[bc] = slowest
var bi int
var max int
for i := 0; i < len(latencies); {
if latencies[i] <= buckets[bi] {
i++
counts[bi]++
if max < counts[bi] {
max = counts[bi]
}
} else if bi < len(buckets)-1 {
bi++
}
}
Mark: buckets[i],
Count: counts[i],
Frequency: float64(counts[i]) float64(len(latencies)),

计算耗时分布:

func latencies(latencies []float64) []LatencyDistribution 


pctls := []int{10, 25, 50, 75, 90, 95, 99}
data := make([]float64, len(pctls))
lt := float64(len(latencies))
for i, p := range pctls {
ip := (float64(p) 100.0) * lt
di := int(ip)
if ip == float64(di) {
di = di - 1
}




if di < 0 {
di = 0
}
data[i] = latencies[di]
res := make([]LatencyDistribution, len(pctls))
for i := 0; i < len(pctls); i++ {
if data[i] > 0 {
lat := time.Duration(data[i] * float64(time.Second))
res[i] = LatencyDistribution{Percentage: pctls[i], Latency: lat}

就是将排序好的延迟按照百分比切割

如果对mysql压测工具感兴趣可以体验下:

https://github.com/xiazemin/mysqlslap-go


useage:

mysqlslap  -Hhost -uuser -ppassword -P3306 -ddatabase -q"select id from deviceattr where name='attr10' or name='attr20' group by id;" -ffilename -c 50 -i 100


输出报告:

Summary:
Name: mysqlslap
Count: 40
Total: 1.01 s
Slowest: 1.00 s
Fastest: 1.01 s
Average: 1.00 s
Requests/sec: 39.76


Response time histogram(ms):
1003.000 [1] |∎∎
1003.300 [0] |
1003.600 [0] |
1003.900 [0] |
1004.200 [24] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
1004.500 [0] |
1004.800 [0] |
1005.100 [12] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
1005.400 [0] |
1005.700 [0] |
1006.000 [3] |∎∎∎∎∎


Latency distribution:
10 % in 1.00 s
25 % in 1.00 s
50 % in 1.00 s
75 % in 1.00 s
90 % in 1.00 s
95 % in 1.01 s
99 % in 1.01 s


Status code distribution:
[success] 40 responses
[failed] 0 responses