我的服务器运行了一段时间,创建了大约200个连接,并进行了一些计算并关闭了,我发现它占用了大约2.7G内存,并且在服务器运行几天后从未减少。 该程序本身并没有占用太多空间,我通过memstats进行了检查。 通过cat /proc/11686/status | grep -i threads我得到了Threads: 177,所以我认为它占用大量内存的原因是它创建了许多线程。为什么go创建太多线程? 是因为我使用过多的go func()吗? 而且我敢肯定goroutines不会增加,它们会正常退出。

PS

我的程序中有很多代码,所以我排除了细节,只保留主要内容

我的问题是当go创建线程执行某项操作时。 有这么多线程正常吗? 我认为它与代码无关。

main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package main

import (
   "sanguo/base/log"
   "fmt"
   "runtime"
   "math/rand"
   "time"
   "net"
   "os"
)

type GameServer struct {
    Host   string
}


func (server *GameServer) Start() {
    // load system data
    log.Debug("/*************************SREVER START********************************/")

    tcpAddr, err := net.ResolveTCPAddr("tcp4", server.Host)
    if err != nil {
        log.Error(err.Error())
        os.Exit(-1)
    }
    go func(){
        for{
            select {
            case <-time.After(30*time.Second):
                LookUp("read memstats")
            }
        }
    }()
    listener, err := net.ListenTCP("tcp", tcpAddr)
    if err != nil {
        log.Error(err.Error())
        os.Exit(-1)
    }
    log.Debug("/*************************SERVER SUCC********************************/")
    for {
        conn, err := listener.AcceptTCP()
        if err != nil {
            continue
        }
        log.Debug("Accept a new connection", conn.RemoteAddr())
        go handleClient(conn)
    }
}

func handleClient(conn *net.TCPConn) {
    sess := NewSession(conn)
    sess.Start()
}

func main() {
    rand.Seed(time.Now().Unix())

    runtime.GOMAXPROCS(runtime.NumCPU())

    log.SetLevel(0)

    filew := log.NewFileWriter("log", true)
    err := filew.StartLogger()
    if err != nil {
        fmt.Println("Failed start log",err)
        return
    }

    var server GameServer
    server.Host ="127.0.0.1:9999"
    server.Start()
}

session.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package main

import (
   "io"
   "encoding/binary"
   "encoding/json"
   "github.com/felixge/tcpkeepalive"
   "net"
   "sanguo/base/log"
   "strings"
   "sync"
   "time"
)


type Session struct {

    conn *net.TCPConn //the tcp connection from client

    recvChan      chan *bufferedManager.Token //data from client
    closeNotiChan chan bool   //

    ok   bool
    lock sync.Mutex

}


func NewSession(connection *net.TCPConn) (sess *Session) {
    var client Session

    client.conn = connection

    client.recvChan = make(chan []byte, 1024)
    client.closeNotiChan = make(chan bool)
    client.ok = true

    log.Debug("New Connection", &client)

    kaConn, err := tcpkeepalive.EnableKeepAlive(connection)
    if err != nil {
        log.Debug("EnableKeepAlive err", err)
    } else {
        kaConn.SetKeepAliveIdle(120 * time.Second)
        kaConn.SetKeepAliveCount(4)
        kaConn.SetKeepAliveInterval(5 * time.Second)
    }
    return &client
}


func (sess *Session) Close() {
    sess.lock.Lock()
    if sess.ok {
        sess.ok = false
        close(sess.closeNotiChan)
        sess.conn.Close()
        log.Trace("Sess Close Succ", sess, sess.uid)
    }
    sess.lock.Unlock()
}

func (sess *Session) handleRecv() {
    defer func(){
        if err := recover(); err != nil {
            log.Critical("Panic", err)
        }
        log.Trace("Session Recv Exit", sess, sess.uid)
        sess.Close()
    }()
    ch := sess.recvChan
    header := make([]byte, 2)
    for {
        /**block until recieve len(header)**/
        n, err := io.ReadFull(sess.conn, header)
        if n == 0 && err == io.EOF {
            //Opposite socket is closed
            log.Warn("Socket Read EOF And Close", sess)
            break
        } else if err != nil {
            //Sth wrong with this socket
            log.Warn("Socket Wrong:", err)
            break
        }
        size := binary.LittleEndian.Uint16(header) + 4
        data := make([]byte, size)
        n, err = io.ReadFull(sess.conn, t.Data)
        if n == 0 && err == io.EOF {
            log.Warn("Socket Read EOF And Close", sess)
            break
        } else if err != nil {
            log.Warn("Socket Wrong:", err)
            break
        }
        ch <- data //send data to Client to process
    }
}

func (sess *Session) handleDispatch() {
    defer func(){
        log.Trace("Session Dispatch Exit",  sess, sess.uid)
        sess.Close()
    }()
    for {
        select {
        case msg, _ := <-sess.recvChan:
            log.Debug("msg", msg)
            sess.SendDirectly("helloworldhellowor", 1)

        case <-sess.closeNotiChan:
                return
        }
    }
}

func (sess *Session) Start() {
    defer func() {
        if err := recover(); err != nil {
            log.Critical("Panic", err)
        }
    }()
    go sess.handleRecv()

    sess.handleDispatch()

    close(sess.recvChan)
    log.Warn("Session Start Exit", sess, sess.uid)
}


func (sess *Session) SendDirectly(back interface{}, op int) bool {
    back_json, err := json.Marshal(back)
    if err != nil {
        log.Error("Can't encode json message", err, back)
        return false
    }
    log.Debug(sess.uid,"OUT cmd:", op, string(back_json))
    _, err = sess.conn.Write(back_json)
    if err != nil {
        log.Error("send fail", err)
        return false
    }
    return true
}
  • 您需要向我们展示一些代码。 充其量,我们只能猜测可能是什么原因,而这并不能在问答网站上提供非常好的材料。
  • @buzz:您尚未接受任何答案。 如果没有人能够令人满意地回答您的问题,也许您应该努力改进它们。
  • 如果打印堆栈跟踪,则可以看到所有goroutine在哪里等待,这将有助于推断哪些可能正在消耗整个线程。

使用Go,您可以创建许多goroutine,而不必增加线程数。在您的代码中,运行Go代码的线程数受runtime.NumCPU()的限制。

当goroutine必须执行阻塞调用(例如系统调用或通过cgo调用C库)时,可以创建线程。在这种情况下,运行时调度程序将从其调度池中删除运行goroutine的线程。如果调度池中的线程少于GOMAXPROCS,则将创建一个新线程。

您可以在此处找到有关其工作原理的更多信息:
https://softwareengineering.stackexchange.com/questions/222642/are-go-langs-goroutine-pools-just-green-threads/222694#222694

要了解代码为何生成线程的原因,必须调查导致阻塞系统调用或C调用的所有代码路径。请注意,与网络相关的调用是非阻塞的,因为它们由标准库自动多路复用。但是,如果执行某些磁盘I / O或调用外部库,则会生成线程。

例如,代码中使用的日志记录库可能会执行某些阻塞的I / O,从而导致创建线程(尤其是如果生成的文件托管在速度较慢的设备上)。

  • 非常感谢,这就是我想知道的。 请检查我的代码。 我使用github.comgaryburdredigoredis这个库。
  • 我怀疑redigo会产生阻塞呼叫,除非您不断连接和断开与Redis的连接。 AFAIK,Go标准库中使用的地址解析机制仍然基于cgo。