grpc压测工具golang实现的版本ghz:
https://github.com/bojand/ghz
通过对源码进行分析,实现了类似mysqlap的mysql压测工具:
https://github.com/xiazemin/mysqlslap-go
当然,在源码分析的过程中也发现了ghz的一个bug:
https://github.com/bojand/ghz/pull/323/files
ghz的使用:
./ghz --skipTLS --insecure --protoset ./bundle.protoset \-B ./grpc_payload --call tensorflow.serving.PredictionService/Predict \127.0.0.1:8500
具体用法可以参考wiki:https://ghz.sh/docs/examples,也可以使用-D参数来加载json格式的参数list,请求的时候会通过round robin的形式依次取对应数据,下面我们开始看下源码:
入口是:cmd/ghz/main.go,首先是参数解析:
isCallSet = falsecall = kingpin.Flag("call", `A fully-qualified method name in 'package.Service/method' or 'package.Service.Method' format.`).PlaceHolder(" ").IsSetByUser(&isCallSet).String()// ConcurrencyisCSet = falsec = kingpin.Flag("concurrency", "Number of request workers to run concurrently for const concurrency schedule. Default is 50.").Short('c').Default("50").IsSetByUser(&isCSet).Uint()isCPUSet = falsecpus = kingpin.Flag("cpus", "Number of cpu cores to use.").Default(strconv.FormatUint(uint64(nCPUs), 10)).IsSetByUser(&isCPUSet).Uint()
解析完命令行参数,就开始执行命令:
report, err := runner.Run(cfg.Call, cfg.Host, options...)
具体代码实现在runner/run.go
func Run(call, host string, options ...Option) (*Report, error) {c, err := NewConfig(call, host, options...)oldCPUs := runtime.NumCPU()runtime.GOMAXPROCS(c.cpus)defer runtime.GOMAXPROCS(oldCPUs)reqr, err := NewRequester(c)mtd, err = protodesc.GetMethodDescFromProto(c.call, c.proto, c.importPaths)mtd, err = protodesc.GetMethodDescFromProtoSet(c.call, c.protoset)cc, err = reqr.newClientConn(false)refClient := grpcreflect.NewClient(refCtx, reflectpb.NewServerReflectionClient(cc))mtd, err = protodesc.GetMethodDescFromReflect(c.call, refClient)go func() {<-cancelreqr.Stop(ReasonCancel)}()if c.z > 0 {go func() {time.Sleep(c.z)reqr.Stop(ReasonTimeout)}()}rep, err := reqr.Run()
通过配置初始化option,获取对应请求,如果没有指定proto文件或者proto文件目录,通过grpc reflection 获取指定方法的describe。
runner/options.go里定义压测的选项参数:
func NewConfig(call, host string, options ...Option) (*RunConfig, error)type RunConfig struct {// call settingscall stringhost stringproto stringimportPaths []stringprotoset stringenableCompression bool// security settingscreds credentials.TransportCredentialscacert stringcert stringkey stringcname stringskipVerify boolinsecure boolauthority string// loadrps intloadStart uintloadEnd uintloadStep intloadSchedule stringloadDuration time.DurationloadStepDuration time.Durationpacer load.Pacer// concurrencyc intcStart uintcEnd uintcStep intcSchedule stringcMaxDuration time.DurationcStepDuration time.DurationworkerTicker load.WorkerTicker// testn intasync bool// number of connectionsnConns int// timeoutsz time.Durationtimeout time.DurationdialTimeout time.DurationkeepaliveTime time.Durationzstop stringstreamInterval time.DurationstreamCallDuration time.DurationstreamCallCount uintstreamDynamicMessages bool// lbStrategylbStrategy string// TODO consolidate these actual value fields to be implemented via provider funcs// data & metadatadata []bytemetadata []bytebinary booldataFunc BinaryDataFuncdataProviderFunc DataProviderFuncdataStreamFunc StreamMessageProviderFuncmdProviderFunc MetadataProviderFuncfuncs template.FuncMap// reflection metadatarmd map[string]string// debughasLog boollog Logger// miscname stringcpus inttags []byteskipFirst intcountErrors boolrecvMsgFunc StreamRecvMsgInterceptFunc}
在runner/requester.go里处理参数的组装和发送请求相关的工作:
func NewRequester(c *RunConfig) (*Requester, error)md := mtd.GetInputType()payloadMessage := dynamic.NewMessage(md)reqr.dataProvider = c.dataProviderFuncdefaultDataProvider, err := newDataProvider(reqr.mtd, c.binary, c.dataFunc, c.data, c.funcs)reqr.metadataProvider = c.mdProviderFuncdefaultMDProvider, err := newMetadataProvider(reqr.mtd, c.metadata, c.funcs)
request的定义如下
type Requester struct {conns []*grpc.ClientConnstubs []grpcdynamic.Stubhandlers []*statsHandlermtd *desc.MethodDescriptorreporter *Reporterconfig *RunConfigresults chan *callResultstopCh chan boolstart time.TimedataProvider DataProviderFuncmetadataProvider MetadataProviderFunclock sync.MutexstopReason StopReasonworkers []*Worker}
Run函数是核心函数,发送请求,记录返回结果,并且最终会返回一个报告信息:
func (b *Requester) Run() (*Report, error)cc, err := b.openClientConns()start := time.Now()stub := grpcdynamic.NewStub(cc[n])b.reporter = newReporter(b.results, b.config)go func() {b.reporter.Run()}()wt := createWorkerTicker(b.config)p := createPacer(b.config)err = b.runWorkers(wt, p)report := b.Finish()b.closeClientConns()
打开grpc连接请求的链路如下:
func (b *Requester) openClientConns() ([]*grpc.ClientConn, error)c, err := b.newClientConn(true)
func (b *Requester) newClientConn(withStatsHandler bool) (*grpc.ClientConn, error)return grpc.DialContext(ctx, b.config.host, opts...)
创建定时器,定时将返回结果写入一个chan里面,供reporter消费:
func createWorkerTicker(config *RunConfig) load.WorkerTickerfunc createPacer(config *RunConfig) load.Pacer
runWorkers执行最后的具体工作,最终调用w.runWorker():
func (b *Requester) runWorkers(wt load.WorkerTicker, p load.Pacer) errorwct := wt.Ticker()go func()wt.Run()}()go func() {for tv := range wct {b.workers = append(b.workers, &w)go func() {errC <- w.runWorker()}()for _, wrk := range b.workers {wrk.Stop()go func() {b.workers[i].Stop()for {wait, stop := p.Pace(time.Since(began), counter.Get())
Finish渲染压测报告:
func (b *Requester) Finish() *Reporttotal := time.Since(b.start)<-b.reporter.donereturn b.reporter.Finalize(r, total)
中间需要做json到proto的转换,代码在protodesc/protodesc.go
func GetMethodDescFromReflect(call string, client *grpcreflect.Client) (*desc.MethodDescriptor, error)call = strings.Replace(call, "/", ".", -1)file, err := client.FileContainingSymbol(call)return getMethodDesc(call, files)
func getMethodDesc(call string, files map[string]*desc.FileDescriptor) (*desc.MethodDescriptor, error)svc, mth, err := parseServiceMethod(call)dsc, err := findServiceSymbol(files, svc)mtd := sd.FindMethodByName(mth)
func GetMethodDescFromProto(call, proto string, imports []string) (*desc.MethodDescriptor, error)fds, err := p.ParseFiles(filename)return getMethodDesc(call, files)
最终的数据格式runner/data.go
func newDataProvider(mtd *desc.MethodDescriptor,binary bool, dataFunc BinaryDataFunc, data []byte,funcs template.FuncMap) (*dataProvider, error)ctd := newCallData(mtd, funcs, "", 0)ha, err := ctd.hasAction(string(dp.data))
type dataProvider struct {binary booldata []bytemtd *desc.MethodDescriptordataFunc BinaryDataFuncarrayJSONData []stringhasActions bool// cached messages only for binarymutex sync.RWMutexcachedMessages []*dynamic.Message}
在runner/calldata.go里组装最终的请求上下文信息:
type CallData struct {WorkerID string // unique worker IDRequestNumber int64 // unique incremented request number for each requestFullyQualifiedName string // fully-qualified name of the method callMethodName string // shorter call method nameServiceName string // the service nameInputName string // name of the input message typeOutputName string // name of the output message typeIsClientStreaming bool // whether this call is client streamingIsServerStreaming bool // whether this call is server streamingTimestamp string // timestamp of the call in RFC3339 formatTimestampUnix int64 // timestamp of the call as unix time in secondsTimestampUnixMilli int64 // timestamp of the call as unix time in millisecondsTimestampUnixNano int64 // timestamp of the call as unix time in nanosecondsUUID string // generated UUIDv4 for each callt *template.Template}
func newCallData(mtd *desc.MethodDescriptor,funcs template.FuncMap,workerID string, reqNum int64) *CallData
定时器实现在load/worker_ticker.go
type WorkerTicker interface {// Ticker returns a channel which sends TickValues// When a value is received the number of workers should be appropriately// increased or decreased given by the delta property.Ticker() <-chan TickValue// Run starts the worker tickerRun()// Finish closes the channelFinish()}
load/pacer.go里定义了分发任务的接口:
type Pacer interface {// Pace returns the duration the attacker should wait until// making next hit, given an already elapsed duration and// completed hits. If the second return value is true, an attacker// should stop sending hits.Pace(elapsed time.Duration, hits uint64) (wait time.Duration, stop bool)// Rate returns a Pacer's instantaneous hit rate (per seconds)// at the given elapsed duration of an attack.Rate(elapsed time.Duration) float64}
最终完成请求调用的是runner/worker.go
type Worker struct {stub grpcdynamic.Stubmtd *desc.MethodDescriptorconfig *RunConfigworkerID stringactive boolstopCh chan boolticks <-chan TickValuedataProvider DataProviderFuncmetadataProvider MetadataProviderFuncmsgProvider StreamMessageProviderFuncstreamRecv StreamRecvMsgInterceptFunc}
发送请求:
func (w *Worker) runWorker() errorg := new(errgroup.Group)for {return g.Wait()g.Go(func() error {return w.makeRequest(tv)})rErr := w.makeRequest(tv)
组装请求参数
func (w *Worker) makeRequest(tv TickValue) errorctd := newCallData(w.mtd, w.config.funcs, w.workerID, reqNum)inputs, err := w.dataProvider(ctd)mp, err := newDynamicMessageProvider(w.mtd, w.config.data, w.config.streamCallCount)mp, err := newStaticMessageProvider(w.config.streamCallCount, inputs)_ = w.makeBidiRequest(&ctx, ctd, msgProvider)_ = w.makeClientStreamingRequest(&ctx, ctd, msgProvider)_ = w.makeServerStreamingRequest(&ctx, inputs[0])_ = w.makeUnaryRequest(&ctx, reqMD, inputs[0])
InvokeRpc请求的地方:
func (w *Worker) makeUnaryRequest(ctx *context.Context, reqMD *metadata.MD, input *dynamic.Message) errorres, resErr = w.stub.InvokeRpc(*ctx, w.mtd, input, callOptions...)
中间的group是对waitgroup的一个简单封装:
pkg/mod/golang.org/x/sync@v0.0.0-20200625203802-6e8e738ad208/errgroup/errgroup.go
func (g *Group) Go(f func() error)g.wg.Add(1)defer g.wg.Done()
压测报告的实现代码在:runner/reporter.go
func newReporter(results chan *callResult, c *RunConfig) *Reporter
type Reporter struct {config *RunConfigresults chan *callResultdone chan booltotalLatenciesSec float64details []ResultDetailerrorDist map[string]intstatusCodeDist map[string]inttotalCount uint64}
最大允许存储的结果是写死的
const maxResult = 1000000
func (r *Reporter) Run()for res := range r.resultsr.totalCount++r.totalLatenciesSec += res.duration.Seconds()r.details = append(r.details, ResultDetail{Latency: res.duration,Timestamp: res.timestamp,Status: res.status,Error: errStr,})
type ResultDetail struct {Timestamp time.Time `json:"timestamp"`Latency time.Duration `json:"latency"`Error string `json:"error"`Status string `json:"status"`}
生成最终的报告
func (r *Reporter) Finalize(stopReason StopReason, total time.Duration) *Reportrep := &Reportrep.Options = Optionsif len(r.details) > 0average := r.totalLatenciesSec float64(r.totalCount)rep.Average = time.Duration(average * float64(time.Second))rep.Rps = float64(r.totalCount) total.Seconds()for _, d := range r.details {okLats = append(okLats, d.Latency.Seconds())sort.Float64s(okLats)fastestNum = okLats[0]slowestNum = okLats[len(okLats)-1]rep.Histogram = histogram(okLats, slowestNum, fastestNum)rep.LatencyDistribution = latencies(okLats)
报告的内容:
type Report struct {Name string `json:"name,omitempty"`EndReason StopReason `json:"endReason,omitempty"`Options Options `json:"options,omitempty"`Date time.Time `json:"date"`Count uint64 `json:"count"`Total time.Duration `json:"total"`Average time.Duration `json:"average"`Fastest time.Duration `json:"fastest"`Slowest time.Duration `json:"slowest"`Rps float64 `json:"rps"`ErrorDist map[string]int `json:"errorDistribution"`StatusCodeDist map[string]int `json:"statusCodeDistribution"`LatencyDistribution []LatencyDistribution `json:"latencyDistribution"`Histogram []Bucket `json:"histogram"`Details []ResultDetail `json:"details"`Tags map[string]string `json:"tags,omitempty"`}
计算直方图:
func histogram(latencies []float64, slowest, fastest float64) []Bucketbc := 10buckets := make([]float64, bc+1)counts := make([]int, bc+1)bs := (slowest - fastest) float64(bc)for i := 0; i < bc; i++ {buckets[i] = fastest + bs*float64(i)}buckets[bc] = slowestvar bi intvar max intfor i := 0; i < len(latencies); {if latencies[i] <= buckets[bi] {i++counts[bi]++if max < counts[bi] {max = counts[bi]}} else if bi < len(buckets)-1 {bi++}}Mark: buckets[i],Count: counts[i],Frequency: float64(counts[i]) float64(len(latencies)),
计算耗时分布:
func latencies(latencies []float64) []LatencyDistributionpctls := []int{10, 25, 50, 75, 90, 95, 99}data := make([]float64, len(pctls))lt := float64(len(latencies))for i, p := range pctls {ip := (float64(p) 100.0) * ltdi := int(ip)if ip == float64(di) {di = di - 1}if di < 0 {di = 0}data[i] = latencies[di]res := make([]LatencyDistribution, len(pctls))for i := 0; i < len(pctls); i++ {if data[i] > 0 {lat := time.Duration(data[i] * float64(time.Second))res[i] = LatencyDistribution{Percentage: pctls[i], Latency: lat}
就是将排序好的延迟按照百分比切割
如果对mysql压测工具感兴趣可以体验下:
https://github.com/xiazemin/mysqlslap-go
useage:
mysqlslap -Hhost -uuser -ppassword -P3306 -ddatabase -q"select id from deviceattr where name='attr10' or name='attr20' group by id;" -ffilename -c 50 -i 100
输出报告:
Summary:Name: mysqlslapCount: 40Total: 1.01 sSlowest: 1.00 sFastest: 1.01 sAverage: 1.00 sRequests/sec: 39.76Response time histogram(ms):1003.000 [1] |∎∎1003.300 [0] |1003.600 [0] |1003.900 [0] |1004.200 [24] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎1004.500 [0] |1004.800 [0] |1005.100 [12] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎1005.400 [0] |1005.700 [0] |1006.000 [3] |∎∎∎∎∎Latency distribution:10 % in 1.00 s25 % in 1.00 s50 % in 1.00 s75 % in 1.00 s90 % in 1.00 s95 % in 1.01 s99 % in 1.01 sStatus code distribution:[success] 40 responses[failed] 0 responses

