Golang语言社区投稿golang高并发基于协程,通道的任务池

Posted Golang语言社区

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Golang语言社区投稿golang高并发基于协程,通道的任务池相关的知识,希望对你有一定的参考价值。

要点:
封装了协程模型基于select模型的通道传递;
支持同步和异步添加任务;由于golang无函数指针,任务函数利用了go 反射机制支持可变参的入参
开发者可以在高处理性能前提下,只专注业务开发,往任务池添加任务即可。

实例:
//taskpool.go 
package taskpool

import (
    "reflect"
    "time"
)

type Task struct {
    M_func interface{}
    M_args []interface{}
}

func (task *Task) Run() {
    go func() {
        f := reflect.ValueOf(task.M_func)
        if len(task.M_args) != f.Type().NumIn() {
            return
        }
        in := make([]reflect.Value, len(task.M_args))
        for k, param := range task.M_args {
            in[k] = reflect.ValueOf(param)
        }
        f.Call(in)

    }()
}

type WorkPool struct {
    TaskChannel chan Task
    QuitChan    chan int //终止通道
}

//size 设置缓存大
func (pool *WorkPool) InitPool(size int) {
    pool.TaskChannel = make(chan Task, size)
    pool.QuitChan = make(chan int)
    go func() {
    DONE:
        for {
            select {
            case task := <-pool.TaskChannel:
                task.Run()
            case <-pool.QuitChan:
                break DONE
            }
        }
    }()
}
func (pool *WorkPool) ClosePool() {
    pool.QuitChan <- 1
}

//同步阻塞方式添加任务
func (pool *WorkPool) AddTask(task Task) {
    pool.TaskChannel <- task
}

//非阻塞方式添加任务 time 超时时间 单位毫秒
func (pool *WorkPool) AddTaskSync(task Task, millitime int) bool {
    res := false
    go func(res bool) {
        select {
        case pool.TaskChannel <- task:
            res = true
        case <-time.After(time.Millisecond * time.Duration(millitime)):
            res = false
        }
    }(res)
    return res
}

//test_main.go
package main

import (
    "fmt"
    "ms_lib/ms_taskpool"
    "time"
)

func test(i int, test string) {
    fmt.Println("hahaha", i, test)
}

func main() {
    task_pool := ms_taskpool.WorkPool{}
    task_pool.InitPool(5)
    for i := 0; i < 1000; i++ {
        task := ms_taskpool.Task{M_func: test}
        task.M_args = append(task.M_args, i)
        task.M_args = append(task.M_args, "test")
        task_pool.AddTask(task)
    }
    //task_pool.ClosePool() //可强制主动关闭任务池

    time.Sleep(5 * time.Second)
    fmt.Println("test done!")
}


以上是关于Golang语言社区投稿golang高并发基于协程,通道的任务池的主要内容,如果未能解决你的问题,请参考以下文章