1 常用特性
1.1 go:build
//go:build !windows
//go:build是前缀指令,!windows是逻辑判断的条件。这个指令的作用是在Windows系统外,编译当前源文件。
// +build !windows
// +build是前缀指令,!windows是编译标记。这个指令的作用是告诉编译器只有当编译标记中不包含 windows时,才会编译当前源文件。
综合上述两个指令的作用,只有在非Windows系统下编译这个源文件时,会将其编译进目标可执行文件中。
作用:只有当操作系统为windows,同时arch架构为386时,才编译当前文件
1.2 go:embed
//go:embed指令是Go 1.16版本新增的官方命令,可以用于在可执行文件中嵌入文件。
指令格式有三种形式:
- //go:embed path…
- //go:embed regexp
- //go:embed dir/*.ext
其中:
- path… 是需要嵌入的文件或目录,可以为多个,用空格分隔。
- regexp 是需要嵌入的文件名或目录名的正则表达式。
- dir/*.ext 是需要嵌入的某个目录下特定扩展名的文件。
示例
假设我们有一个文件叫 data.txt,然后我们希望在程序中引用它,通过 //go:embed 指令即可嵌入。
package main
import (
"embed"
"fmt"
)
//go:embed data.txt
var data string
func main() {
fmt.Println(data)
}
在这个示例中,我们使用 //go:embed data.txt 将 data.txt 文件嵌入到了可执行文件中,并将其作为字符串赋值给了 var data string,然后在 main() 函数中输出了 data 变量的值。
1.3 其他
- go:noinline:禁止编译器进行内联,即使启用了 -l 标志也不会内联。
- go:noescape:告诉编译器某个函数或方法没有任何指针可以逃逸到外部,可以更好地进行优化。
- go:linkname:用于跨包调用未导出的函数。
- go:cgo_export_static、//go:cgo_export_dynamic、//go:cgo_import_static、//go:cgo_import_dynamic:用于在Go和C语言之间交互数据。
1.4 插件化开发
Golang官方提供了plugin模块,该模块可以支持插件开发.
目前很多思路都是在开发过程中支持插件话,当主体程序写完后,不能够临时绑定插件.但是本文将带领你进行主体程序自动识别并加载、控制插件调用.
1 基本思路
插件化开发中,一定存在一个主体程序,对其他插件进行控制、处理、调度.
1.1 基本业务
- 我们首先开发一个简单的业务程序,进行两种输出.
- 当时间秒数为奇数的时候,输出hello
- 当时间秒数为偶数的时候,输出world
主体代码,MainFile.go:
package main
import (
"fmt"
"time"
)
// init 函数将于 main 函数之前运行
func init() {
fmt.Println("Process On ==========")
}
func main() {
// time.Now().Second 将会返回当前秒数
nowSecond := time.Now().Second()
doPrint(nowSecond)
fmt.Println("Process Stop ========")
}
// 执行打印操作
func doPrint(nowSecond int) {
if nowSecond%2 == 0 {
printWorld() //偶数
} else {
printHello() //奇数
}
}
func printHello() {
fmt.Println("hello")
}
func printWorld() {
fmt.Println("world")
}
代码有一定的冗余,是为了模拟业务之间的调度
运行代码:
1.2 编写简单插件
插件代码的入口package也要为main
- 设定插件逻辑为当当前秒数为奇数的时候,同时输出当前时间(与hello的判定不是一个时间)
- 插件文件名:HelloPlugin.go
在当前目录下,执行插件生成指令:
注意:
// mac或者linux系统
go build --buildmode=plugin -o HelloPlugin.so HelloPlugin.go
当前目录下就会多出来一个文件HelloPlugin.so,然后,我们让主程序加载该插件
package main
import (
"fmt"
"plugin"
"time"
)
// 定义插件信息
const pluginFile = "HelloPlugin.so"
// 存储插件中将要被调用的方法或变量
var pluginFunc plugin.Symbol
// init 函数将于 main 函数之前运行
func init() {
// 查找插件文件
pluginFile, err := plugin.Open(pluginFile)
if err != nil {
fmt.Println("An error occurred while opening the plug-in")
} else {
// 查找目标函数
targetFunc, err := pluginFile.Lookup("PrintNowTime")
if err != nil {
fmt.Println("An error occurred while search target func")
}
pluginFunc = targetFunc
}
fmt.Println("Process On ==========")
}
func main() {
// time.Now().Second 将会返回当前秒数
nowSecond := time.Now().Second()
doPrint(nowSecond)
fmt.Println("Process Stop ========")
}
func doPrint(nowSecond int) {
if nowSecond%2 == 0 {
printWorld() //偶数
} else {
printHello() //奇数
}
}
func printHello() {
// 执行插件调用
if pluginFunc != nil {
//将存储的信息转换为函数
if targetFunc, ok := pluginFunc.(func()); ok {
targetFunc()
}
}
fmt.Println("hello")
}
func printWorld() {
fmt.Println("world")
}
运行代码:
2 常用包
2.1 标准库
文档地址:http://doc.golang.ltd/
①os模块
1 文件目录相关
//【1】创建文件
file, err := os.Create("file.txt")
//【2】创建目录
err := os.Mkdir("test2", os.ModePerm) //单个目录
err := os.MkdirAll("/a/b/c", os.ModePerm) //层级目录
//【3】删除文件或目录
err := os.Remove("test.txt")
err = os.RemoveAll("test2")
//【4】获取工作目录
dir, err := os.Getwd()
//【5】修改工作目录
err := os.Chdir("d:/")
//【6】读写文件
bytes, err := os.ReadFile("test2.txt")
os.WriteFile("test2.txt", []byte("hello go"), os.ModePerm)
//【7】文件重命名
err := os.Rename("test2.txt", "test3.txt")
//【8】读取目录列表
2 File文件读操作
//【1】打开文件
file, err := os.Open("a.txt) //如果a.txt不存在,则报错[打开的文件为只读]
file, err := os.OpenFile("a1.txt", os.O_RDWR|os.OCREATE, 755) //如果不存在则创建
//【2】循环读取文件
f, _ := os.Open("a.txt")
for {
buf := make([]byte, 3)
n, err := f.Read(buf)
//读到文件末尾
if err == io.EOF {
break
}
fmt.Printf("n:%v\n", n)
fmt.Printf("string(buf):%v\n", string(buf))
}
f.Close()
//【3】从指定位置读取
方法一:
f, _ := os.Open("a.txt")
buf := make([]byte, 4)
//从offset为3的位置开始读取
n, _ := f.ReadAt(buf, 3)
fmt.Println("n=", n)
fmt.Println("string(buf)=", string(buf))
方法二:
file, _ := os.Open("test/a.txt")
defer file.Close()
//从偏移量为3的位置开始读取
file.Seek(3, 0)
buf := make([]byte, 10)
n, _ := file.Read(buf)
fmt.Println("n=", n)
fmt.Println("string(buf)=", string(buf))
//【4】读取目录
dir, _ := os.ReadDir("a/")
for _, v := range dir {
fmt.Printf("v.IsDir():%v\n", v.IsDir())
fmt.Printf("v.Name():%v\n", v.Name())
}
3 File文件写操作
//os.O_TRUNC //覆盖之前的
//os.O_APPEND //追加写
//【1】写入字节
file, _ := os.OpenFile("a.txt", os.O_RDWR|os.O_APPEND, 0775)
file.Write([]byte("hello golang"))
file.Close()
//【2】写入字符串
file.WriteString("hello java")
//【3】从指定位置写
//从file的offset为3的位置开始写
file.WriteAt([]byte("aaa"), 3)
4 进程相关操作
package main
import (
"fmt"
"os"
"time"
)
func main() {
//获取当前正在运行的进程id
fmt.Printf("os.Getpid():%v\n", os.Getpid())
//父id
fmt.Printf("os.Getppid():%v\n", os.Getppid())
//设置新进程的属性
attr := &os.ProcAttr{
//files指定新进程继承的活动文件对象
//前三个分别为:标准输入、标准输出、标准错误输出
Files: []*os.File{os.Stdin, os.Stdout, os.Stderr},
//新进程的环境变量
Env: os.Environ(),
}
//开始一个新进程
p, err := os.StartProcess("D:\\Download\\EditPlus\\EditPlus.exe", []string{"D:\\Download\\EditPlus\\EditPlus.exe", "E:\\processDemo.txt"}, attr)
if err != nil {
fmt.Println(err)
}
fmt.Println(p)
fmt.Println("进程ID:", p.Pid)
//通过进程id查找进程
p2, _ := os.FindProcess(p.Pid)
fmt.Println(p2)
//等待5s,执行函数
time.AfterFunc(time.Second*5, func() {
//向进程p发送退出信号【杀死进程】
p.Signal(os.Kill)
})
//等待进程p的退出,返回进程状态
ps, _ := p.Wait()
fmt.Println(ps.String())
}
运行结果:
os.Getpid():19828
os.Getppid():22092
&{20312 372 0 {{0 0} 0 0 0 0}}
进程ID: 20312
&{20312 352 0 {{0 0} 0 0 0 0}}
exit status 1
5 环境变量
//【1】获取和设置
//获取所有环境变量
s := os.Environ()
fmt.Printf("s:%v\n", s)
//获取某个环境变量
s2 := os.Getenv("GOPATH")
fmt.Printf("s2:%v\n", s2)
//获取不存在的环境变量[获取的结果为空,不会报错;如果要看环境变量是否存在;推荐使用LookupEnv]
s2 = os.Getenv("lalala")
//设置环境变量
os.Setenv("env1", "env1Value")
//【2】查找
s3, b := os.LookupEnv("env1")
if b {
fmt.Println("s3=", s3)
}
//【3】清空环境变量,慎用!!!!
//os.Clearenv()
②io包、ioutil包、bufio
1 io包
哪些类型实现了Reader和Writer接口:
- os.File
- string.Reader
- bufio.Reader
- bytes.Buffer
- bytes.Reader
- compress/gzip.Reader/Writer
- encoding/csv.Reader/Writer
简单测试案例:
func main() {
r := strings.NewReader("hello world")
//os.Stdout返回的也是一个Writer
_, err := io.Copy(os.Stdout, r)
if err != nil {
fmt.Println(err)
}
//控制台输出:hello world
}
2 iotuil包
名称 | 作用 |
---|---|
ReadAll | 读取数据,返回读到的字节 |
ReadDir | 读取一个目录,返回目录入口数组[]os.FileInfo |
ReadFile | 读取一个文件,返回文件内容(字节slice) |
WriteFile | 根据文件路径,写入字节slice |
TempDir | 在一个目录中创建指定前缀名的临时目录,返回新临时目录的路径 |
TempFile | 在一个目录中创建指定前缀名的临时文件,返回os.File |
简单案例:
func main() {
fi, _ := ioutil.ReadDir(".")
for _, v := range fi {
if v.IsDir() {
fmt.Println("dir=", v.Name())
} else {
fmt.Println("file=", v.Name())
}
}
}
3 bufio包
涉及到其他语言,如:中文,直接使用rune转换即可
Reader操作:
func main() {
file, _ := os.Open("test/test1/test2/test.csv")
defer file.Close()
br := bufio.NewReader(file)
buffer := make([]byte, 10)
for {
n, err := br.Read(buffer)
if err == io.EOF {
break
} else {
fmt.Println("value=", string(buffer[:n]))
}
}
}
- Writer操作
func main() {
//写入文件的话,需要使用OpenFile,并设置对应写入权限
file, _ := os.OpenFile("test/test1/test2/test.csv", os.O_RDWR, 0777)
defer file.Close()
w := bufio.NewWriter(file)
w.Write([]byte("hahaha~~~"))
w.Flush()
}
- Scanner
func main() {
s := strings.NewReader("ABC DEF KIS")
bs := bufio.NewScanner(s)
//以空格作为分隔符
bs.Split(bufio.ScanWords)
for bs.Scan() {
fmt.Println(bs.Text())
}
}
③path/filepath
- Rel
func Rel(basepath, targpath string) (string, error)
该函数以basepath为基准,返回targpath相对于basepath的相对路径,也就是说如果basepath为/a,targpath为/a/b/c,那么则会返回/b/c,如果两个参数有一个为绝对路径,一个为相对路径,则会返回错误
- Join
func Join(elem ...string) string
Join 函数将多个路径进行连接,并且进行Clean操作,然后返回
- filepath
files := "E:\\data\\test.txt"
paths, fileName := filepath.Split(files)
fmt.Println(paths, fileName) //获取路径中的目录及文件名 E:\data\ test.txt
fmt.Println(filepath.Base(files)) //获取路径中的文件名test.txt
fmt.Println(path.Ext(files)) //获取路径中的文件的后缀 .txt
④archive/zip包
- 压缩
package main
import (
"archive/zip"
"fmt"
"io"
"os"
"path/filepath"
"strings"
)
func compressionDir(baseDir string) (string, error) {
zipFileName := baseDir + ".zip"
// 创建一个新的 zip 文件
zipFile, err := os.Create(zipFileName)
if err != nil {
return "", err
}
defer zipFile.Close()
// 创建一个 zip.Writer
zipWriter := zip.NewWriter(zipFile)
defer zipWriter.Close()
// 遍历目录下的所有文件和子目录
err = filepath.Walk(baseDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// 创建一个 zip 文件中的文件或目录
relativePath := strings.TrimPrefix(path, baseDir)
zipPath := strings.TrimLeft(filepath.Join("/", relativePath), "/")
// 如果是目录或空目录,则在 zip 文件中创建一个目录
if info.IsDir() || isEmptyDir(path) {
_, err := zipWriter.Create(zipPath + "/")
if err != nil {
return err
}
} else {
// 如果是文件,则创建一个 zip 文件中的文件
zipFile, err := zipWriter.Create(zipPath)
if err != nil {
return err
}
// 打开原始文件
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
// 将原始文件的内容拷贝到 zip 文件中
_, err = io.Copy(zipFile, file)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return "", err
}
return zipFileName, nil
}
// 判断目录是否为空目录
func isEmptyDir(dirPath string) bool {
dir, err := os.Open(dirPath)
if err != nil {
return false
}
defer dir.Close()
_, err = dir.Readdirnames(1)
return err == io.EOF
}
func main() {
// 调用压缩函数
zipFile, err := compressionDir("E:\\Go\\GoPro\\src\\go_code\\gouitest\\test")
if err != nil {
fmt.Println("压缩目录失败:", err)
return
}
fmt.Println("目录压缩成功,压缩文件:", zipFile)
}
运行结果:
- 解压
2.2 并发
①Mutex
- 互斥锁
- 不可重入锁
sync.Mutex
Unlock
现在我们通过sync.Mutex来保证并发访问资源安全
- 如果不使用sync.Mutex
package main
import (
"fmt"
"sync"
"time"
)
var (
i = 100
wg sync.WaitGroup
)
func main() {
for i := 0; i < 50; i++ {
wg.Add(1)
go add()
wg.Add(1)
go sub()
}
//通过waitGroup等待所有任务跑完
wg.Wait()
fmt.Println("main....i=", i)
}
func add() {
time.Sleep(time.Millisecond * 10)
defer wg.Done()
i += 1
fmt.Println("i++, i=", i)
}
func sub() {
time.Sleep(time.Millisecond * 2)
defer wg.Done()
i -= 1
fmt.Println("i--, i=", i)
}
正确结果应该是i=100,因此可以知道结果是错误的,因为我们没有控制并发。接下来,我们通过sync.Mutex互斥锁来控制并发
- 使用sync.Mutex控制并发
package main
import (
"fmt"
"sync"
"time"
)
var (
i = 100
wg sync.WaitGroup
lock sync.Mutex
)
func main() {
for i := 0; i < 50; i++ {
wg.Add(1)
go add()
wg.Add(1)
go sub()
}
//通过waitGroup等待所有任务跑完
wg.Wait()
fmt.Println("main....i=", i)
}
func add() {
defer wg.Done()
//访问共享资源的时候加锁
lock.Lock()
time.Sleep(time.Millisecond * 10)
i += 1
fmt.Println("i++, i=", i)
lock.Unlock()
}
func sub() {
defer wg.Done()
lock.Lock()
time.Sleep(time.Millisecond * 2)
i -= 1
fmt.Println("i--, i=", i)
lock.Unlock()
}
现在,不管我们怎么运行就都可以获取到正确的结果了
sync.Mutex
package main
import "sync"
var (
mutex sync.Mutex
)
func main() {
/*
【1】加锁一次,解锁两次=》报错
mutex.Lock()
mutex.Unlock()
mutex.Unlock()
//fatal error: sync: unlock of unlocked mutex
*/
/*
【2】加锁两次,解锁一次=》报错
mutex.Lock()
mutex.Lock()
mutex.Unlock()
//fatal error: all goroutines are asleep - deadlock!
*/
/*
【3】先连续加锁两次,然后连续解锁两次=》报错 `sync.Mutex是不可重入锁`
mutex.Lock()
mutex.Lock()
mutex.Unlock()
mutex.Unlock()
//fatal error: all goroutines are asleep - deadlock!
*/
// 【4】可行,只有先加锁,然后释放锁之后才能继续加锁
mutex.Lock()
mutex.Unlock()
mutex.Lock()
mutex.Unlock()
}
可重入锁
- 当一个线程获取锁时,如果没有其它线程拥有这个锁,那么,这个线程就成功获取到这个锁。之后,如果其它线程再请求这个锁,就会处于阻塞等待的状态。但是,如果拥有这把锁的线程再请求这把锁的话,不会阻塞,而是成功返回,所以叫可重入锁。只要你拥有这把锁,你可以可着劲儿地调用,比如通过递归实现一些算法,调用者不会阻塞或者死锁。
- Mutex 不是可重入的锁。Mutex 的实现中没有记录哪个 goroutine 拥有这把锁。理论上,任何 goroutine 都可以随意地 Unlock 这把锁,所以没办法计算重入条件。
package main
import (
"fmt"
"sync"
"time"
)
var (
mutex sync.Mutex
count int
)
func main() {
go func() {
mutex.Lock()
count++
}()
time.Sleep(time.Second * 1)
mutex.Unlock()
fmt.Println("main.....")
fmt.Println("count=", count)
//main.....
//count= 1
}
可以看到我们通过一个协程(goroutine)加的锁,但是可以在main方法中释放(main方法也可以看做一个特殊的goroutine)中释放,因此可以知道sync.Mutex不会记录哪个goroutine持有这把锁。
②WaitGroup
两个协程之间相互等待,如果没有waitGroup,可能协程里面的任务还没有完成,主程序就退出了,导致所有的协程也退出
package main
import (
"fmt"
"sync"
)
var (
wg sync.WaitGroup
)
func hello(i int) {
defer wg.Done() //wd.Add(-1)
fmt.Println("hello", i)
//wg.Done() //wd.Add(-1)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1)
go hello(i)
}
//等待所有协程中的任务全部完成
wg.Wait()
fmt.Println("main====")
}
③Timer、Ticker
1 Timer
定时器,timer.C本质上是一个管道
package main
import (
"fmt"
"time"
)
func main() {
//[1]time.NewTimer 等待两秒钟
//timer := time.NewTimer(time.Second * 2)
//t := <-timer.C
//fmt.Println(t)
//[2]time.After(time.Second * 2) 等待两秒钟
//time.After(time.Second * 2)
//[3]
timer := time.NewTimer(time.Second * 5)
timer.Reset(time.Second * 6) //重新设置定时器时间
//<-timer.C
timer.Stop() //停止定时器(如果没有timer.C 那么就不会阻塞暂停)
fmt.Println("--")
}
2 Ticker
Timer只执行一次,Ticker可以周期的执行
案例一:
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("start...")
//定时器,每隔5秒执行
ticker := time.NewTicker(time.Second * 5)
for _ = range ticker.C {
fmt.Println("middle...")
}
fmt.Println("end...") //永远不会执行到,因为ticker.C是可以定时器,for中没有break
}
案例二:
package main
import (
"fmt"
"time"
)
func main() {
//定时器,每隔一秒执行
ticker := time.NewTicker(time.Second)
chanInt := make(chan int)
go func() {
//ticker.C触发
for _ = range ticker.C {
select {
case chanInt <- 1:
case chanInt <- 2:
case chanInt <- 3:
}
}
}()
sum := 0
for v := range chanInt {
fmt.Println("接收到:", v)
sum += v
if sum >= 10 {
break
}
}
}
运行结果:
接收到: 2
接收到: 2
接收到: 1
接收到: 3
接收到: 1
接收到: 2
④runtime
让出CPU时间片,重新安排任务
- runtime.Gosched():让出时间片,让其他协程来执行
- runtime.Goexit():直接退出当前协程
- runtime.NumCPU():获取当前系统的cpu核心数
- runtime.GOMAXPROCS(num int):设置当前系统cpu核心数(默认为当前系统最大cpu核心数)
⑤原子变量
在并发操作资源时,我们可以通过两种方式来保证数据正确:
- 加锁
- 原子操作
atomic常见操作有:
- 增减
- 载入 read
- 比较并交换 cas
- 交换
- 存储 write
package main
import (
"fmt"
"sync/atomic"
)
func main() {
//test_add_sub()
//test_load_store()
test_cas()
//除了cas比较并交换,atomic还有比较暴力的`直接交换`,但是这种用法比较少
}
func test_add_sub() {
var i int32 = 100
atomic.AddInt32(&i, 1)
fmt.Println("i=", i)
atomic.AddInt32(&i, -1)
fmt.Println("i=", i)
}
func test_load_store() {
var j int64 = 200
val := atomic.LoadInt64(&j) //read
fmt.Println("val=", val)
atomic.StoreInt64(&j, -100) //write
fmt.Println("j=", j)
}
func test_cas() {
var k int32 = 8
f := atomic.CompareAndSwapInt32(&k, 8, 100)
fmt.Println("flag=", f)
fmt.Println("k=", k)
}
2.3 操作数据库
go get -u github.com/go-sql-driver/mysql
package main
import (
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
)
var db *sql.DB
func initDB() (err error) {
dataSource := "root:200151@tcp(127.0.0.1:3306)/test?charset=utf8mb4&parseTime=true"
//不会校验账号密码是否正确
//注意!!不要使用:=,因为我们这里是给全局变量赋值,然后在main函数中使用全局变量db
db, err = sql.Open("mysql", dataSource)
if err != nil {
return err
}
//尝试与数据库建立连接(校验dataSource是否正确)
err = db.Ping()
if err != nil {
return err
}
return nil
}
func main() {
err := initDB()
if err != nil {
fmt.Println("initDB fail, err=", err)
} else {
fmt.Println("连接成功!")
}
//以插入操作为例【其他操作类似】
s := "insert into account (name, money, question) values (?, ?, ?)"
result, err := db.Exec(s, "ziyi", 300.0, "what")
if err != nil {
fmt.Println("insert fail err=", err)
} else {
//insert success {0xc000102000 0xc00009ea00}
fmt.Println("insert success ", result)
}
}
3 常用片段
3.1 控制goroutine数目(控制并发数)
控制并发,防止内存爆掉
简单实现:
通过信号量
package main
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/sync/semaphore"
)
func main() {
// 最大并发数
maxConcurrency := 20
// 创建一个信号量
sem := semaphore.NewWeighted(int64(maxConcurrency))
// 创建一个等待组
wg := sync.WaitGroup{}
// 创建一个上下文
ctx := context.TODO()
// 待消费的任务数量
taskCount := 100
// 模拟100个任务
for i := 0; i < taskCount; i++ {
// 获取一个信号量令牌
sem.Acquire(ctx, 1)
// 增加等待组计数
wg.Add(1)
// 启动一个 goroutine 消费任务
go func(taskID int) {
// 在 goroutine 结束时释放信号量令牌
defer sem.Release(1)
// 模拟任务的耗时
time.Sleep(time.Second)
// 打印任务完成信息
fmt.Printf("Task %d completed\n", taskID)
// 减少等待组计数
wg.Done()
}(i)
}
// 等待所有任务完成
wg.Wait()
fmt.Println("All tasks completed")
}
通过单个管道
package main
import (
"fmt"
"sync"
"time"
)
func main() {
maxGoroutines := 500
taskSize := 60
var sem = make(chan struct{}, maxGoroutines)
var wg sync.WaitGroup
for i := 0; i < taskSize; i++ {
wg.Add(1)
go func(id int) {
work(id, sem, &wg)
}(i)
}
wg.Wait() // 等待所有协程执行完毕
close(sem)
}
func work(id int, sem chan struct{}, wg *sync.WaitGroup) {
sem <- struct{}{} // 信号量加一
fmt.Println("id=", id)
time.Sleep(time.Millisecond * 500)
defer func() {
<-sem
wg.Done()
}()
}
通过多个管道:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup, sem chan struct{}) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
// 模拟工作...
time.Sleep(time.Millisecond * 500)
fmt.Printf("Worker %d finished\n", id)
// 释放信号量
<-sem
}
func main() {
// 控制 Goroutine 数量的上限
maxWorkers := 2
// 创建等待组和信号量
var wg sync.WaitGroup
sem := make(chan struct{}, maxWorkers)
// 创建一些任务
tasks := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
// 创建任务通道
taskChan := make(chan int)
// 启动 Goroutine 来执行任务
go func() {
for task := range taskChan {
// 等待空闲的信号量
sem <- struct{}{}
// 增加等待组计数
wg.Add(1)
// 启动 Goroutine
go worker(task, &wg, sem)
}
}()
// 将任务发送到任务通道
for _, task := range tasks {
taskChan <- task
}
// 关闭任务通道
close(taskChan)
// 等待所有 Goroutine 结束
wg.Wait()
}
方式二:通过semaphore信号量
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/marusama/semaphore"
)
func main() {
// 控制并发数的上限
maxWorkers := 10
// 创建信号量
sem := semaphore.New(maxWorkers)
// 创建等待组
var wg sync.WaitGroup
// 创建一些任务
tasks := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
// 遍历任务
for _, task := range tasks {
// 获取信号量
sem.Acquire(context.TODO(), 1)
// 增加等待组计数
wg.Add(1)
// 启动 Goroutine
go worker(task, sem, &wg)
}
// 等待所有 Goroutine 结束
wg.Wait()
}
func worker(id int, sem semaphore.Semaphore, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
// 模拟工作
time.Sleep(time.Millisecond * 500)
fmt.Printf("Worker %d finished\n", id)
// 释放信号量
sem.Release(1)
}
方式三:
根据任务数灵活创建协程,且不超过所定义的最大协程数
- 通过协程派发task
package main
import (
"context"
"fmt"
"github.com/aobco/log"
"sync"
"sync/atomic"
"time"
)
var (
num int32 = 0
limit int32 = 300
wg = new(sync.WaitGroup)
maxWorkers = 50
numMutex sync.Mutex
)
func main() {
executeTask()
fmt.Println("main=========")
}
func executeTask() {
ctx, cancelFunc := context.WithCancel(context.Background())
//创建任务
taskChan := make(chan int)
go func() {
for {
if num > limit {
cancelFunc()
log.Infof("cancel func....num %v limit %v", num, limit)
break
}
time.Sleep(time.Millisecond * 100) // 在检查取消信号之前添加适当的延迟
}
}()
for i := 0; i < maxWorkers; i++ {
wg.Add(1)
go func(i int) {
work(i, ctx, taskChan)
wg.Done()
}(i)
}
go assignTask(taskChan)
wg.Wait()
//time.Sleep(time.Second * 10)
//close(taskChan) // 关闭任务通道以通知工作协程退出
fmt.Print("all done\n")
}
func assignTask(taskChan chan int) {
defer close(taskChan)
for i := 0; i < 20; i++ {
taskChan <- i
//time.Sleep(time.Millisecond * 50)
}
}
func work(id int, ctx context.Context, taskChan chan int) {
for {
select {
case <-ctx.Done():
log.Infof("%v received the signal...", id)
// 这里可以执行其他清理操作
return // 确保工作协程退出
case task, ok := <-taskChan:
if !ok {
log.Infof("%v task channel closed...", id)
return // 任务通道被关闭,退出工作协程
}
time.Sleep(time.Millisecond * 200)
numMutex.Lock()
atomic.AddInt32(&num, int32(task))
numMutex.Unlock()
log.Infof("%v is working...task %v", id, task)
}
}
}
教程:https://duoke360.com/tutorial/golang