最近做了grpc的tracing相关的工作。这里把业务上用到的组件,包括HTTP、gRPC、Redis、MySQL、Kafka等的tracing接入方案和依赖做一个整理

一些相关材料和背景知识

Peter Bourgon对可观测体系tracing、logging、metric的一个分析:

随便找的中文翻译:

OpenTelemetry 是CNCF 的可观测项目,内容包括一套完整的可观测标准化方案,以及相关的工具集。

Github上OpenTelemetry的golang第三方组件的集合项目,本文中大多数实现方案的依赖都在这个项目中

选型和实现

HTTP

我们项目的技术栈中HTTP服务都是基于Gin实现,对应的,使用的组件包是:

import "go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin"

...

        server := gin.New()
	server.Use(otelgin.Middleware("my-server"))

gRPC

grpc请求是我们tracing监控的核心内容,做了一些定制,故没有使用第三方包,直接基于http://go.opentelemetry.io/otel包做的实现。但其基本思路和方法与go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc 包相同。即:在gRPC的主被调拦截器中收集整理数据,用context做carrier。

import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"

...
	
        s := grpc.NewServer(
		grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
		grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
	)

Redis

项目的Redis使用的go-redis v8以上版本内置了redisotel包:https://github.com/go-redis/redis/tree/master/extra/redisotel

直接引入并AddHook就行。

go-redis v8以下版本不支持传入ctx,故只能使用v8及以上版本。

import semconv "go.opentelemetry.io/otel/semconv/v1.7.0"

...

	conn := redis.NewClient(&redis.Options{
		Addr:     addr,
		Password: dbConfig.GetString("password"),
		DB:       dbConfig.GetInt("db"),
	})
	conn.AddHook(redisotel.TracingHook{})

MySQL

semconv是基于sqlDriver实现,理论上不影响MySQL包的选型。但是不支持传入ctx的sqlx和gormv1无法支持。故只能使用gorm v2

import semconv "go.opentelemetry.io/otel/semconv/v1.4.0"

...

	driverName, err := otelsql.Register("mysql", semconv.DBSystemMySQL.Value.AsString(),
		otelsql.WithAttributes(
			attribute.KeyValue{Key: "component", Value: attribute.StringValue("mysql")},
			semconv.DBUserKey.String(config.Username),
			semconv.DBNameKey.String(config.DbName),
			attribute.KeyValue{Key: "component", Value: attribute.StringValue("mysql")},
			attribute.KeyValue{Key: "db.ip", Value: attribute.StringValue(config.Host)},
			attribute.KeyValue{Key: "db.port", Value: attribute.IntValue(int(config.Port))},
			attribute.KeyValue{Key: "db.instance", Value: attribute.StringValue(config.DbName)},
		))
	conn, err := sql.Open(driverName, dataSourceName)
...
	db, err = gormV2.Open(mysql.New(mysql.Config{
		Conn: conn,
	}))

Kafka

项目的kafka使用的sarama,对应使用的tracing包是:


kafka的tracing需要支持传入msg头,故需要kafka版本v0.11.0及以上

消费者初始化:

import "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama"

...

        consumerGroupHandler := Consumer{}
	handler := otelsarama.WrapConsumerGroupHandler(&consumerGroupHandler)

消费消息:

ctx := otel.GetTextMapPropagator().Extract(context.Background(), otelsarama.NewConsumerMessageCarrier(msg))

生产者初始化:

 	producer, err := sarama.NewAsyncProducer(brokerList, config)
	if err != nil {
		log.Fatalln("Failed to start Sarama producer:", err)
	}

	producer = otelsarama.WrapAsyncProducer(config, producer)

生产消息:

otel.GetTextMapPropagator().Inject(ctx, otelsarama.NewProducerMessageCarrier(&msg))