现象

某一个周末我们的服务 oom了,一个比较重要的job 没有跑完,需要重跑,以为是偶然,重跑成功,因为是周末没有去定位原因
又一个工作日,它又oom了,重跑成功,持续观察,job 在oom之前竟然占用了30g左右(这里我们的任务一个数据量都在内存中计算,所以这里数据量大一点)

应用使用30g肯定是不正常的,怀疑内存泄漏了,怎么定位内存泄漏呢?

定位

搜了一下网上经常用到的工具是 go 的 pprof 火焰图,自己在本地跑了一下,因为数据量比较少,并没有发现什么,暂时放下了。
后续某个早上在公司工具里面打开了一下,发现有火焰图的工具,打开看了一下一个函数占用了 7224.46mb,占用了 7个g, 而且这个函数是已经跑完了,这个时候定位到那个函数了,和旁边同事说了一下,同事帮忙看了下邮件告警,每个下午都会有任务失败告警(任务失败会进行重试的); 这里怀疑是失败了, channel 没有关闭,导致 消费的go routine 没有回收。

举个例子看下代码:

package main

import (
	"context"
	"fmt"
	"golang.org/x/sync/errgroup"
)

func main() {
	readGroup, _ := errgroup.WithContext(context.Background())
	consumeGroup, _ := errgroup.WithContext(context.Background())

	var (
		data = make(chan []int, 10)
	)

	//  3个生产者往里面进行进行生产
	readGroup.Go(func() error {
		for i := 0; i < 3; i++ {
			data <- []int{i}
		}
		return nil
	})

	readGroup.Go(func() error {
		for i := 3; i < 6; i++ {
			data <- []int{i}
		}
		return nil
	})

	readGroup.Go(func() (err error) {
		for i := 6; i < 9; i++ {
			// error
			if i == 7 {
				err = fmt.Errorf("error le")
				return
			}
			data <- []int{i}
		}
		return nil
	})

	// 其中一个生产者遇到error 返回导致 channel 没有关闭,消费者没有退出

	// 1个消费者进行消费

	consumeGroup.Go(func() error {
		for i := range data {
			fmt.Println(i)
		}
		return nil
	})

	if err := readGroup.Wait(); err != nil {
		fmt.Println(err)
		return
	}

	close(data)

	if err := consumeGroup.Wait(); err != nil {
		fmt.Println(err)
		return
	}

	fmt.Println("end it")
}

这个case里面,readGroup 遇到error 直接退出了,channel并没有关闭,如果是常驻进程的程序,消费的go routine 并没有回收,就导致了内存泄漏

最简单的关闭修复
将 close 放到最上面的 defer close(data)

不过最好的还是生产者进行关闭,我们可以优化一下代码,把生产者的代码放到一个函数中,这样就可以让生产者去进行关闭的操作了

package main

import (
	"context"
	"fmt"
	"golang.org/x/sync/errgroup"
)

func main() {
	var (
		data = make(chan []int, 10)
		err  error

		eg, _ = errgroup.WithContext(context.Background())
	)

	eg.Go(func() (err error) {
		defer close(data)

		err = readGroup(data)

		return
	})

	eg.Go(func() (err error) {
		err = consumeGroup(data)
		return
	})

	err = eg.Wait()
	if err != nil {
		return
	}

	fmt.Println("end it")
}

func consumeGroup(data chan []int) (err error) {
	consumeGroup, _ := errgroup.WithContext(context.Background())

	consumeGroup.Go(func() error {
		for i := range data {
			fmt.Println(i)
		}
		return nil
	})

	if err = consumeGroup.Wait(); err != nil {
		fmt.Println(err)
		return
	}

	return
}

func readGroup(data chan []int) (err error) {
	readGroup, _ := errgroup.WithContext(context.Background())

	//  3个生产者往里面进行进行生产
	readGroup.Go(func() error {
		for i := 0; i < 3; i++ {
			data <- []int{i}
		}
		return nil
	})

	readGroup.Go(func() error {
		for i := 3; i < 6; i++ {
			data <- []int{i}
		}
		return nil
	})

	readGroup.Go(func() (err error) {
		for i := 6; i < 9; i++ {
			// error
			if i == 7 {
				err = fmt.Errorf("error le")
				return
			}
			data <- []int{i}
		}
		return nil
	})

	if err = readGroup.Wait(); err != nil {
		fmt.Println(err)
		return
	}

	return
}

修复

将生产者放在一个 goroutint 里面,最后如果遇到error的话 defer()的时候会把channel给关闭了

The Channel Closing Principle
One general principle of using Go channels is don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders. In other words, we should only close a channel in a sender goroutine if the sender is the only sender of the channel.

简单点:就是在生产者中进行channel的关闭

后续讨论和遇到的新问题

拆分代码函数的时候又遇到新的问题了,又一个切片数组我拆分函数的时候,我没有去接受切片函数的返回值,导致了切片发生扩容返回的是一个空切片,并没有修改掉原来的切片。之前以为在golang里面切片是引用类型,会自动改变其中的值最后查了一下,在go 里面都是值传递,可以修改其中的值其实是使用了指针修改了同一块地址中的值所以值发生了变化

总结

使用channel 的时候在生产者中进行关闭,思考一些遇到error的时候channel是否可以正常的关闭
go 中只有值传递,引用传递是修改了同一个指向内存地址中的值

参考文章:

如何优雅地关闭Go channel
https://www.jianshu.com/p/d24dfbb33781


Go语言参数传递是传值还是传引用
https://www.flysnow.org/2018/02/24/golang-function-parameters-passed-by-value