在使用goreplay完成流量录制到es后( mac 上学习k8s系列(52)goreplay流量录制),总有一个疑问,它的具体实现原理是什么?它那么多参数,我应该如何组合使用呢?当一件事觉得很难理解的时候,那就撸下源码,始终要相信任何事物都有一个极其简单的内核。我们还是从main函数开始,代码位于:gor.go,核心的就下面几行

 func main()
plugins = NewPlugins()
emitter := NewEmitter()
go emitter.Start(plugins, Settings.Middleware)

1,定义插件,包括输入监听处理的插件和输出发送的插件。

2,定义事件分发器

3,开始根据插件和middle分发流量。

so easy 有没有?接着我们看插件的定义,它的代码位于:plugins.go,这里注册了很多常用的插件InputRAW,InputTCP,OutputTCP,InputFile,InputHTTP,OutputHTTP,NewKafkaOutput,NewKafkaInput,如果我们希望自定义,也可以在这里注册

func NewPlugins() *InOutPlugins {
for _, options := range Settings.InputDummy {
plugins.registerPlugin(NewDummyInput, options)
for range Settings.OutputDummy {
plugins.registerPlugin(NewDummyOutput)
if Settings.OutputStdout {
plugins.registerPlugin(NewDummyOutput)
for _, options := range Settings.InputRAW {
plugins.registerPlugin(NewRAWInput, options, Settings.InputRAWConfig)
for _, options := range Settings.InputTCP {
plugins.registerPlugin(NewTCPInput, options, &Settings.InputTCPConfig)
for _, options := range Settings.OutputTCP {
plugins.registerPlugin(NewTCPOutput, options, &Settings.OutputTCPConfig)
for _, options := range Settings.InputFile {
plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth, Settings.InputFileMaxWait, Settings.InputFileDryRun)
for _, options := range Settings.InputHTTP {
plugins.registerPlugin(NewHTTPInput, options)
for _, options := range Settings.OutputHTTP {
plugins.registerPlugin(NewHTTPOutput, options, &Settings.OutputHTTPConfig)
if Settings.OutputKafkaConfig.Host != "" && Settings.OutputKafkaConfig.Topic != "" {
plugins.registerPlugin(NewKafkaOutput, "", &Settings.OutputKafkaConfig, &Settings.KafkaTLSConfig)
if Settings.InputKafkaConfig.Host != "" && Settings.InputKafkaConfig.Topic != "" {
plugins.registerPlugin(NewKafkaInput, "", &Settings.InputKafkaConfig, &Settings.KafkaTLSConfig)

下面具体看下插件注册的实现:

func (plugins *InOutPlugins) registerPlugin(constructor interface{}, options ...interface{}) {
vc := reflect.ValueOf(constructor)
plugin := vc.Call(vo)[0].Interface()
if r, ok := plugin.(PluginReader); ok {
plugins.Inputs = append(plugins.Inputs, r)
if w, ok := plugin.(PluginWriter); ok {
plugins.Outputs = append(plugins.Outputs, w)
plugins.All = append(plugins.All, plugin)

它先通过反射拿到插件的interface,然后判断interface是否实现了PluginReader 和PluginWriter 接口,将插件分成输入插件和输出插件两类存储在数组里。

type PluginReader interface {
PluginRead() (msg *Message, err error)
}
type PluginWriter interface {
PluginWrite(msg *Message) (n int, err error)
}

接着看流量转发的实现。代码位于emitter.go:

func (e *Emitter) Start(plugins *InOutPlugins, middlewareCmd string) {
middleware := NewMiddleware(middlewareCmd)
if err := CopyMulty(middleware, plugins.Outputs...)
for _, in := range plugins.Inputs {
e.Add(1)
go func(in PluginReader) {
defer e.Done()
if err := CopyMulty(in, plugins.Outputs...); err != nil {

它的核心逻辑是将所有的InputPlugin的数据经过middleware处理后转发给所有的OutputPlugin,转发函数是通过CopyMulty 实现的:

func CopyMulty(src PluginReader, writers ...PluginWriter) error {
for {
msg, err := src.PluginRead()
msg.Data = modifier.Rewrite(msg.Data)
dst.PluginWrite(msg)
for _, dst := range writers {
if _, err := dst.PluginWrite(msg)

原理也很简单,读取src.PluginRead()的数据,写入dst.PluginWrite(msg)

以上就是goreplay的核心框架源码,核心原理如下图:

对于命令行选项参数定义在settings.go的init函数中,比如 --input-raw

的参数定义:

type AppSettings struct
func init() {
flag.Var(&MultiOption{&Settings.InputRAW}, 

接着我们通过一些常用的插件分析下其实现原理:

1,inputHttp:

它的源码位于input_http.go

 func NewHTTPInput(address string) (i *HTTPInput) {
i.listen(address)
func (i *HTTPInput) listen(address string) {
mux.HandleFunc("/", i.handler)
i.listener, err = net.Listen("tcp", address)
err = http.Serve(i.listener, mux)
func (i *HTTPInput) handler(w http.ResponseWriter, r *http.Request) {
buf, _ := httputil.DumpRequestOut(r, true)
i.data <- buf 
func (i *HTTPInput) PluginRead() (*Message, error) {
case buf := <-i.data:
msg.Data = buf
msg.Meta = payloadHeader(RequestPayload, uuid(), time.Now().UnixNano(), -1)

可以看到,它就是一个简单的http服务器,它的handler只做了一件事情,就是将http请求通过 httputil.DumpRequestOut,序列化到buf里面,然后写入chanel data里面,PluginRead 接口就是负责消费data这个chan里面的数据,是不是贼简单?

2,outputHttp:

代码位于output_http.go:

func NewHTTPOutput(address string, config *HTTPOutputConfig) PluginReadWriter
if o.config.ElasticSearch != "" {
o.elasticSearch = new(ESPlugin)
o.elasticSearch.Init(o.config.ElasticSearch)
o.client = NewHTTPClient(o.config)
for i := 0; i < o.config.WorkersMin; i++ {
go o.startWorker()
go o.workerMaster()

httpOutput还支持写入es,自己实现es插件需要实现Init接口;我们还是从PluginWrite接口入手:

func (o *HTTPOutput) PluginWrite(msg *Message) (n int, err error) {
select {
case <-o.stop:
return 0, ErrorStopped
case o.queue <- msg:
if len(o.queue) > 0 {
if atomic.LoadInt32(&o.activeWorkers) < int32(o.config.WorkersMax) {
go o.startWorker()
atomic.AddInt32(&o.activeWorkers, 1)

它将msg放入queue里面,然后实现了一个协程池,启动woker来处理输出。worker,则是不断从queue里面拿消息向外发送请求

func (o *HTTPOutput) startWorker()
case msg := <-o.queue:
o.sendRequest(o.client, msg)

workerMaster则是不断销毁已经处理完请求的woker协程

func (o *HTTPOutput) workerMaster()
if atomic.LoadInt32(&o.activeWorkers) > int32(o.config.WorkersMin) && len(o.queue) < 1 {
// close one worker
o.stopWorker <- struct{}{}
atomic.AddInt32(&o.activeWorkers, -1)

发送请求自然很简单了,就是一个http client 请求而已

func (o *HTTPOutput) sendRequest(client *HTTPClient, msg *Message) {
resp, err := client.Send(msg.Data)
if o.elasticSearch != nil {
o.elasticSearch.ResponseAnalyze(msg.Data, resp, start, stop)

如果输出需要存入es,则调用es插件的ResponseAnalyze 方法进行数据写入。mac 上学习k8s系列(52)goreplay流量录制 中实现的es.go插件,主要就是实现了下面两个接口。

 func (p *ESPlugin) Init(URI string) {
func (p *ESPlugin) ResponseAnalyze(req, resp []byte, start, stop time.Time)

3,inputKafka:

input_kafka.go它其实就是实现了一个kafka的消费者

func NewKafkaInput(_ string, config *InputKafkaConfig, tlsConfig *KafkaTLSConfig) *KafkaInput {
con, err = sarama.NewConsumer(strings.Split(config.Host, ","), c)
go func(consumer sarama.PartitionConsumer) {
defer consumer.Close()
for message := range consumer.Messages() {
i.messages <- message
}
}(consumer)
func (i *KafkaInput) PluginRead() (*Message, error) {
case message = <-i.messages:

4,inputTcp:

input_tcp.go 和http一样,仅仅完成了tcp请求的监听:

func (i *TCPInput) listen(address string) {
listener, err := tls.Listen("tcp", address, config)
listener, err := net.Listen("tcp", address)
go func() {
for {
conn, err := i.listener.Accept()
if err == nil {
go i.handleConnection(conn)

5,inputRaw:

input_raw.go这是所有插件里面唯一有点技术含量的东东,也是我们最常用的无侵入流量录制用到的插件:

func NewRAWInput(address string, config RAWInputConfig) (i *RAWInput) {
host, _ports, err := net.SplitHostPort(address)
i.listen(address)
func (i *RAWInput) PluginRead() (*Message, error) {
case msgTCP = <-i.listener.Messages():
msg.Data = msgTCP.Data()
if msgTCP.Direction == tcp.DirIncoming {

它实现了自己的listen方法:

func (i *RAWInput) listen(address string) {
i.listener, err = capture.NewListener(i.host, i.ports, i.config)
errCh := i.listener.ListenBackground(ctx)
<-i.listener.Reading

底层调用了 capture包的NewListener方法,获得监听器。

capture/capture.go

func NewListener(host string, ports []uint16, config PcapOptions) (l *Listener, err error) {
l = &Listener{}
switch config.Engine {
default:
l.Activate = l.activatePcap
case EngineRawSocket:
l.Activate = l.activateRawSocket
case EngineAFPacket:
l.Activate = l.activateAFPacket
case EnginePcapFile:
l.Activate = l.activatePcapFile
return
case EngineVXLAN:
l.Activate = l.activateVxLanSocket
err = l.setInterfaces()

它枚举所有可用的网络设备,然后筛选出我们关心的接口

func (l *Listener) setInterfaces() (err error) {
pifis, err = pcap.FindAllDevs()
ifis, _ := net.Interfaces()
for _, pi := range pifis {
l.Interfaces = append(l.Interfaces, pi)

引用了大名鼎鼎的"github.com/google/gopacket/pcap"包来做数据监听

func (l *Listener) Listen(ctx context.Context) (err error) {
for key, handle := range l.Handles {
go l.readHandle(key, handle)
for key, handle := range l.Handles {
if key == in.Name {
fmt.Println("Activating capture on:", in.Name)
go l.readHandle(key, handle)

然后根据需要来做数据解析

func (l *Listener) readHandle(key string, hndl packetHandle) {
messageParser := tcp.NewMessageParser(l.messages, l.ports, hndl.ips, l.config.Expire, l.config.AllowIncomplete)
data, ci, err := hndl.handler.ReadPacketData()
messageParser.PacketHandler(&tcp.PcapPacket{
Data: data,
LType: linkType,
LTypeLen: linkSize,
Ci: &ci,
})

总结一下:goreplay其实类似一个交换机,它接收inputPlugin的流量,然后分发给outputPlugin,其中inputRaw插件实现了流量的透明录制。