一.连接池的介绍

当一个TCP客户端连接上服务器就给该客户端开辟一个协程去处理相关的业务,这种情况如果在大批量的客户端都同一时刻向服务器发起连接请求超过了服务器端的最大的限制,为了解决就写了这个测试的demo

二.具体的实现

一个客户端的连接我们可以看作是一个任务,可以定义如下数据结构


type Task struct{
    tcpconn net.Conn
    task func(conn net.Conn) error
}

在客户端连接上服务器时就需要创建一个新的任务

func NewTask(arg_task func(conn net.Conn) error,conn net.Conn) *Task{
    t := Task{
        tcpconn:conn,
        task : arg_task,
    }

    return &t
}

这个任务的具体处理函数
func (t *Task)Execute(){
    t.task(t.tcpconn)
}

下面定义连接池的数据结构


type Pool struct{
    EntryChannel chan *Task //客户端连接上服务器时,就将创建的任务加入该channel
    JobsChannel chan *Task //连接池调度的channel
    work_num int //连接池的最大的个数
}

创建一个新的连接池

func NewPool(worker_max_num int) *Pool{
    t := Pool{
        EntryChannel: make(chan *Task,0),
        JobsChannel: make(chan *Task,0),
        work_num : worker_max_num,
    }

    return &t
}

连接池的具体处理函数

func (p *Pool)worker(worker_id int){
    for task := range p.JobsChannel{
        fmt.Println("This is worker ",worker_id)
        task.Execute()
    }
}

让连接池正式运行起来

func (p *Pool)run(){
    for i:=0; i<p.work_num;i++{
        go p.worker(i)
    }

    for task:=range p.EntryChannel{
        p.JobsChannel <- task
    }
}

三.测试的demo

服务端

package main 

import (
    "fmt"
    "net"
)


type Task struct{
    tcpconn net.Conn
    task func(conn net.Conn) error
}

func NewTask(arg_task func(conn net.Conn) error,conn net.Conn) *Task{
    t := Task{
        tcpconn:conn,
        task : arg_task,
    }

    return &t
}

func (t *Task)Execute(){
    t.task(t.tcpconn)
}

type Pool struct{
    EntryChannel chan *Task
    JobsChannel chan *Task
    work_num int
}

func NewPool(worker_max_num int) *Pool{
    t := Pool{
        EntryChannel: make(chan *Task,0),
        JobsChannel: make(chan *Task,0),
        work_num : worker_max_num,
    }

    return &t
}

func (p *Pool)worker(worker_id int){
    for task := range p.JobsChannel{
        fmt.Println("This is worker ",worker_id)
        task.Execute()
    }
}


func (p *Pool)run(){
    for i:=0; i<p.work_num;i++{
        go p.worker(i)
    }

    for task:=range p.EntryChannel{
        p.JobsChannel <- task
    }
}

func main(){
    ln,err := net.Listen("tcp","0.0.0.0:9000")
    if err != nil{
        fmt.Println(err)
        fmt.Println("listen is error!")
        return 
    }

    pool := NewPool(4)

    go func(p *Pool){
        for {
            conn,err := ln.Accept()
            if err != nil{
                fmt.Println("accept is error!")
                return
            }

            TcpHandle(conn,p)
        }
    }(pool)

    pool.run()
}

func TaskHandle(conn net.Conn) error{
    addr := conn.RemoteAddr().String()

    fmt.Println(addr)

    return nil
}

func TcpHandle(conn net.Conn,pool *Pool){
    task := NewTask(TaskHandle,conn)

    pool.EntryChannel <- task
}
 

客户端

    package main
  
   import (
           "net"
   )
   
   func main(){
          for i:=0; i<10000;i++{
                   go func(){
                         net.Dial("tcp","127.0.0.1:9000")
                 }()
          }
  }
 

四.执行效果