二、连接池代码如下:
packagemain;import( "time" "sync" "errors" "net" "fmt")//频繁的创建和关闭连接,对系统会造成很大负担//所以我们需要一个池子,里面事先创建好固定数量的连接资源,需要时就取,不需要就放回池中。//但是连接资源有一个特点,我们无法保证连接长时间会有效。//比如,网络原因,人为原因等都会导致连接失效。//所以我们设置一个超时时间,如果连接时间与当前时间相差超过超时时间,那么就关闭连接。//只要类型实现了ConnRes接口中的方法,就认为是一个连接资源类型typeConnRes interface{ Close() error;}//工厂方法,用于创建连接资源typeFactory func() (ConnRes, error)//连接typeConn struct{ conn ConnRes; //连接时间 time time.Time;}//连接池typeConnPool struct{ //互斥锁,保证资源安全 mu sync.Mutex; //通道,保存所有连接资源 conns chan*Conn; //工厂方法,创建连接资源 factory Factory; //判断池是否关闭 closed bool; //连接超时时间 connTimeOut time.Duration;}//创建一个连接资源池funcNewConnPool(factory Factory, cap int, connTimeOut time.Duration) (*ConnPool, error) { ifcap <= 0 { returnnil, errors.New("cap不能小于0"); } ifconnTimeOut <= 0 { returnnil, errors.New("connTimeOut不能小于0"); } cp := &ConnPool{ mu: sync.Mutex{}, conns: make(chan*Conn, cap), factory: factory, closed: false, connTimeOut: connTimeOut, }; fori := 0; i < cap; i++ { //通过工厂方法创建连接资源 connRes, err := cp.factory(); iferr != nil { cp.Close(); returnnil, errors.New("factory出错"); } //将连接资源插入通道中 cp.conns <- &Conn{conn: connRes, time: time.Now()}; } returncp, nil;}//获取连接资源func(cp *ConnPool) Get() (ConnRes, error) { ifcp.closed { returnnil, errors.New("连接池已关闭"); } for{ select{ //从通道中获取连接资源 caseconnRes, ok := <-cp.conns: { if!ok { returnnil, errors.New("连接池已关闭"); } //判断连接中的时间,如果超时,则关闭 //继续获取 iftime.Now().Sub(connRes.time) > cp.connTimeOut { connRes.conn.Close(); continue; } returnconnRes.conn, nil; } default: { //如果无法从通道中获取资源,则重新创建一个资源返回 connRes, err := cp.factory(); iferr != nil { returnnil, err; } returnconnRes, nil; } } }}//连接资源放回池中func(cp *ConnPool) Put(conn ConnRes) error { ifcp.closed { returnerrors.New("连接池已关闭"); } select{ //向通道中加入连接资源 casecp.conns <- &Conn{conn: conn, time: time.Now()}: { returnnil; } default: { //如果无法加入,则关闭连接 conn.Close(); returnerrors.New("连接池已满"); } }}//关闭连接池func(cp *ConnPool) Close() { ifcp.closed { return; } cp.mu.Lock(); cp.closed = true; //关闭通道 close(cp.conns); //循环关闭通道中的连接 forconn := rangecp.conns { conn.conn.Close(); } cp.mu.Unlock();}//返回池中通道的长度func(cp *ConnPool) len() int { returnlen(cp.conns);}funcmain() { cp, _ := NewConnPool(func() (ConnRes, error) { returnnet.Dial("tcp", ":8080"); }, 10, time.Second*10); //获取资源 conn1, _ := cp.Get(); conn2, _ := cp.Get(); //这里连接池中资源大小为8 fmt.Println("cp len : ", cp.len()); conn1.(net.Conn).Write([]byte("hello")); conn2.(net.Conn).Write([]byte("world")); buf := make([]byte, 1024); n, _ := conn1.(net.Conn).Read(buf); fmt.Println("conn1 read : ", string(buf[:n])); n, _ = conn2.(net.Conn).Read(buf); fmt.Println("conn2 read : ", string(buf[:n])); //等待15秒 time.Sleep(time.Second * 15); //我们再从池中获取资源 conn3, _ := cp.Get(); //这里显示为0,因为池中的连接资源都超时了 fmt.Println("cp len : ", cp.len()); conn3.(net.Conn).Write([]byte("test")); n, _ = conn3.(net.Conn).Read(buf); fmt.Println("conn3 read : ", string(buf[:n])); //把三个连接资源放回池中 cp.Put(conn1); cp.Put(conn2); cp.Put(conn3); //这里显示为3 fmt.Println("cp len : ", cp.len()); cp.Close();}
三、8080服务端代码如下:
packagemain;import( "net" "io" "log")funchandler(conn net.Conn) { for{ io.Copy(conn, conn); }}funcmain() { lis, err := net.Listen("tcp", ":8080"); iferr != nil { log.Fatal(err); } for{ conn, err := lis.Accept(); iferr != nil { continue; } gohandler(conn); }}
测试结果如下: