并发是指在同一时间内可以执行多个任务。包含多线程编程、多进程编程及分布式程序等。本章并发属于多线程编程。
Go语言的并发通过goroutine特性完成。goroutine类似于线程,但是可以根据需要创建多个goroutine并发工作。
9.1轻量级线程(goroutine)——根据需要随时创建的“线程”
goroutine的概念类似于线程,但goroutine由Go程序运行时的调度和管理。Go程序会智能地将goroutine中的任务合理地分配给每个CPU。
Go程序从main包的main()函数开始,在程序启动时,Go程序就会为main()函数创建一个默认的goroutine
1、使用普通函数创建goroutine
Go程序中使用go关键字为一个函数创建一个goroutine。一个函数可以被创建多个goroutine,一个goroutine必定对应一个函数。
(1)格式
为一个普通函数创建goroutine的写法
go 函数名(参数列表)
函数名:要调用的函数名
参数列表:调用函数需要传入的参数
使用go关键字创建goroutine时,被调用函数的返回值被忽略。
(2)例子
package main
import (
"fmt"
"time"
)
func running() {
var times int
//构建一个无限循环
for{
times++
fmt.Println("tick:",times)
//延时1秒
time.Sleep(time.Second)
}
}
func main() {
//并发执行程序
go running()
//接受命令行输入,不做任何事情
var input string
fmt.Scanln(&input)
}
代码执行后,命令行会不断的输出tick,同时可以用fmt.Scanln()接受用户输入。
代码的执行顺序
2、使用匿名函数创建goroutine
(1)使用匿名函数创建goroutine
格式
go func(参数列表){
函数体
}(调用参数列表)
参数列表:函数体内的参数变量列表
函数体:匿名函数的代码
调用参数列表:启动goroutine时,需要向匿名函数传递的调用参数。
(2)使用匿名函数创建goroutine的例子
package main
import (
"fmt"
"time"
)
func main(){
go func() {
var times int
for{
times++
fmt.Println("tick",times)
time.Sleep(time.Second)
}
}()
var input string
fmt.Scanln(&input)
}
由于匿名函数没有参数,因此调用参数列表为空。
3、调整并发的运行性能(GOMAXPROCS)
Go中通过runtime.GOMAXPROCS()函数维护线程池中线程和CPU核心数量的对应关系。
格式:
runtime.GOMAXPROCS(逻辑CPU数量)
这里的逻辑CPU数量可以有如下几种数值:
<1: 不修改任何数值
=1: 单核心执行
>1: 多核心并发执行
可以使用runtime.NumCPU()查询CPU数量。
4、理解并发和并行
并发:把任务在不同的时间点交给处理器进行处理。在同一时间点,任务并不会同时运行。
并行:把每一个任务分配给每一个处理器独立完成。在同一时间点,任务一定是同时运行。
9.2通道(channel)——在多个goroutine间通信的管道
channel是队列一样的结构。
1、通道的特性
在任何时候,同时只能有一个goroutine访问通道进行发送和获取数据。goroutine间通过通道就可以通信了。
通道像传送带或队列,总是遵循先入先出的规则,保证收发数据的顺序。
2、声明通道类型
通道本身需要一个类型进行修饰。通道的元素类型就是在其内部传输的数据类型
声明如下:
var 通道变量 chan 通道类型
通道类型:通道内的数据类型
通道变量:保存通道的变量
3、创建通道
通道是引用类型,需要使用make进行创建,格式如下:
通道实例:=make(chan 数据类型)
数据类型:通道内传输的元素类型
通道实例:通过make创建的通道句柄
ch1 := make(chan int)//创建一个整型类型的通道
ch2 := make(chan interface{})//创建一个空接口类型的通道,可以存放任意格式
type Equip struct{/*一些字段*/}
ch2 := make(chan *Equip)//创建Equip指针类型的通道,可以存放*Equip
4、使用通道发送数据
(1)通道发送数据的格式
通道变量<-值
通道变量:通过make创建好的通道实例
值:可以是变量、常量、表达式或者函数返回值等。
(2)通道发送数据的例子
//创建一个空接口通道
ch:=make(chan interface{})
//将0放入通道中
ch<-0
//将hello字符串放入通道中
ch<-"hello"
(3)发送将持续阻塞知道数据被接收
把数据往通道中发送时,如果接收方一直没有接收,那么发送操作将持续阻塞。
package main
func main() {
//创建一个整型通道
ch:=make(chan int)
//尝试将0通过通道发送
ch <- 0
}
结果报错
fatal error: all goroutines are asleep - deadlock!
由于没有形成channel发送接收对应的代码,所以一直阻塞在此处。
5、使用通道接收数据
通道接收的特性:
i 通道的收发操作在不同的两个goroutine间进行。
ii 接收将持续阻塞知道发送方发送数据。
iii 每次接收一个数据。
通道的数据接收写法:
(1)阻塞接收数据
data:=<-ch
(2)非阻塞接收数据
data,ok:=<-ch
data表示接收到的数据,未接收到数据时,data为通道类型的零值。
ok表示是否接收到数据。
非阻塞接收方式可能造成高CPU占用,因此使用非常少。可以配合select和计时器channel实现接收超时检测。
(3)接收任意数据,忽略收到的数据
格式: <-ch
执行该语句时会发生阻塞,知道接收到数据,但接收到的数据会被忽略。
使用通道做并发同步的写法:
package main
import "fmt"
func main() {
//构建一个通道
ch:=make(chan int)
//开启一个并发匿名函数
go func() {
fmt.Println("start goroutine")
//通过通道通知main的goroutine
ch <- 0
fmt.Println("exit goroutine")
}()
fmt.Println("wait goroutine")
//等待匿名goroutine
<-ch
fmt.Println("all done")
}
(4)循环接收
格式
for data := range ch{
}
使用for从通道中接收数据
package main
import (
"time"
"fmt"
)
func main() {
//构建一个通道
ch:=make(chan int)
//开启一个匿名函数
go func() {
//从3循环到0
for i:=3;i>=0;i--{
//发送3到0之间的数值
ch<-i
//每次发送完时等待
time.Sleep(time.Second)
}
}()
//遍历接收通道数据
for data:=range ch{
//打印通道数据
fmt.Println(data)
//当遇到数据0时,退出接收循环
if data == 0{
break
}
}
}
6、示例:并发打印
package main
import "fmt"
func printer(c chan int) {
//开始无限循环等待数据
for{
//从channel中获取一个数据
data:=<-c
//将0视为数据结束
if data==0{
break
}
//打印数据
fmt.Println(data)
}
//通知main已经结束循环
c<-0
}
func main() {
//创建一个channel
ch:=make(chan int)
//并发执行printer,传入channel
go printer(ch)
for i:=1;i<=10;i++{
//将数据通过channel投送给printer
ch<-i
}
//通知并发的printer结束循环
ch<-0
//等待printer结束
<-ch
}
7、单向通道——通道中的单行道
Go的通道可以在声明时约束其操作方向,如只发送或只接收。这种被约束方向的通道被称为单向通道。
(1)单向通道的声明格式
var 通道实例 chan<- 元素类型 //只能发送通道
var 通道实例 <-chan 元素类型 //只能发送通道
元素类型:通道包含的元素类型
通道实例:声明的通道变量
(2)单向通道的例子
ch:=make(chan int)
//声明一个只能发送的通道类型,并赋值为ch
var chSendOnly chan<- int = ch
//声明一个只能接收的通道类型,并赋值为ch
var chRecvOnly <-chan int = ch
8、带缓冲的通道
为通道增加一个有限大小的存储空间形成带缓冲通道。带缓冲通道在发送时无需等待接收方接收数据即可完成发送过程,并不会发生阻塞,只有当存储空间满时才会发送阻塞。
(1)创建带缓冲通道
通道实例:=make(chan 通道类型,缓冲大小)
package main
import "fmt"
func main(){
//创建一个3个元素缓冲大小的整型通道
ch:=make(chan int,3)
//查看当前通道的大小
fmt.Println(len(ch))
//发送3个整型元素到通道
ch<-0
ch<-1
ch<-2
//查看当前通道的大小
}
(2)阻塞条件
i 带缓冲通道被填满,尝试再次发送数据时发生阻塞。
ii 带缓冲通道为空时,尝试再次接收数据时发生阻塞。
9、通道的多路复用——同时处理接收和发送多个通道的数据
多路复用通常表示在一个信道上传输多路信号或数据流的过程和技术。
Go语言中提供了select关键字,可以同时响应多个通道的操作。select的每个case都对应一个通道的收发过程。当收发完成时,就会触发case中响应的语句。格式如下:
select{
case 操作1:
响应操作1
case 操作2:
响应操作2
...
default:
没有操作情况
}
select多路复用中可以接收的样式
10、示例:模拟远程过程调用(RPC)
package main
import (
"time"
"errors"
"fmt"
)
//模拟RPC客户端的请求和接收消息封装
func RPCClient(ch chan string,req string)(string,error) {
//向服务器发送请求
ch<-req
//等待服务器返回
select {
case ack:=<-ch: //接收到服务器返回数据
return ack,nil
case <-time.After(time.Second): //超时
return "",errors.New("Time out")
}
}
//模拟RPC服务器端接收客户端请求和响应
func RPCServer(ch chan string) {
for{
//接收客户端数据
data:=<-ch
//打印接收到的数据
fmt.Println("Server received:",data)
//模拟超时
//time.Sleep(time.Second*2)
//向客户端反馈已收到
ch<-"roger"
}
}
func main() {
//创建一个无缓冲字符串通道
ch:=make(chan string)
//并发执行服务器逻辑
go RPCServer(ch)
//客户端请求数据和接收数据
recv,err:=RPCClient(ch,"hi")
if err!=nil{
//发生错误打印
fmt.Println(err)
}else{
//正常接收数据
fmt.Println("client received",recv)
}
}
11、示例:使用通道响应计时器的事件
Go语言中的time包提供了计时器的封装。由于Go语言中的通道和goroutine的设计,定时任务可以在goroutine中通过同步的方式完成,也可以通过goroutine中异步回调完成。
(1)一段事件之后(time.After)
延迟回调
package main
import (
"fmt"
"time"
)
func main() {
//声明一个退出用的通道
exit:=make(chan int)
//打印开始
fmt.Println("start")
//过1秒后,调整匿名函数
time.AfterFunc(time.Second, func() {
//1秒后,打印结果
fmt.Println("one second after")
//通知main()的goroutine已经结束
exit<-0
})
//等待结束
<-exit
}
(2)定点计时
package main
import (
"time"
"fmt"
)
func main() {
//创建一个打点器,每500毫秒触发一次
ticker:=time.NewTicker(time.Millisecond*500)
//创建一个计时器,2秒后触发
stopper:=time.NewTimer(time.Second*2)
//声明计数变量
var i int
//不断检查通道情况
for{
//多路复用通道
select {
case <-stopper.C: //计时器到时了
fmt.Println("stop")
goto StopHere
case <-ticker.C: //打点器触发了
//记录触发了多少次
i++
fmt.Println("tick",i)
}
}
//退出的标签,使用goto跳转
StopHere:
fmt.Println("done")
}
12、关闭通道后继续使用通道
通道是一个引用对象,会被自动垃圾回收,但是通道也可以主动关闭。
(1)格式
close(ch)
(2)给被关闭的通道发送数据将会触发panic
被关闭的通道不会被置为nil。
package main
import "fmt"
func main(){
//创建一个整型通道
ch:=make(chan int)
//关闭通道
close(ch)
//打印通道的指针,容量和长度
fmt.Printf("ptr:%p cap:%d len:%d\n",ch,cap(ch),len(ch))
//给关闭的通道发送数据
ch<-1
}
执行程序,出现以下错误
panic: send on closed channel
(3)从已关闭的通道接收数据时将不会发生阻塞
从已经关闭的通道接收数据或者正在接收数据时,将会接收到通道类型的零值,然后停止阻塞并返回。
package main
import "fmt"
func main() {
//创建一个整型带两个缓冲的通道
ch:=make(chan int,2)
//给通道放两个数据
ch<-1
ch<-2
//关闭通道, 此时带缓冲通道的数据不会被释放,通道也没有消失
close(ch)
//遍历缓冲所有数据,且多遍历1个
for i:=0;i<cap(ch)+1;i++{
//从通道中取出数据
v,ok:=<-ch
//打印出数据状态
fmt.Println(v,ok)
}
}
在通道关闭后,即便通道没有数据,在获取时也不会发生阻塞,但此时取出数据会失败。
9.3示例:Telent回音服务器——TCP服务器的基本结构
回音服务器的代码分为4个部分,分别是接受连接、会话处理、Telent命令处理和程序入口。
(1)接收连接
package main
import (
"net"
"fmt"
)
//服务逻辑,传入地址和退出的通道
func server(address string,exitChan chan int) {
//根据给定地址进行侦听
l,err:=net.Listen("tcp",address)
//如果侦听发生错误,打印错误并退出
if err!=nil{
fmt.Println(err)
exitChan<-1
}
//打印侦听地址,表示侦听成功
fmt.Println("Listen:"+address)
//延迟关闭侦听器
defer l.Close()
//循环侦听
for{
//新连接没有到来时,Accept是阻塞的
conn,err:=l.Accept()
//发生任何的侦听错误,打印错误并退出服务器
if err!=nil{
fmt.Println(err.Error())
continue
}
//根据连接开启会话,每个连接生成一个会话,这个过程需要并行执行
go handleSession(conn,exitChan)
}
}
(2)会话处理
package main
import (
"net"
"fmt"
"bufio"
"strings"
)
//连接的会话逻辑
func handleSession(conn net.Conn,exitChan chan int) {
fmt.Println("Session started:")
//创建一个网络连接数据的读取器
reader:=bufio.NewReader(conn)
//接收数据的循环
for{
//读取字符串,直到碰到回车返回
str,err:=reader.ReadString('\n')
//数据读取正确
if err==nil{
//去掉字符串尾部的回车
str=strings.TrimSpace(str)
//处理Telnet指令
if !processTelnetCommand(str,exitChan){
conn.Close()
break
}
//Echo逻辑,发什么数据,原样返回
conn.Write([]byte(str+"\r\n"))
}else{
//发生错误
fmt.Println("Session closed")
conn.Close()
break
}
}
}
(3)Telnet命令处理
package main
import (
"strings"
"fmt"
)
func processTelnetCommand(str string, exitChan chan int)bool {
//@close指令表示终止本次会话
if strings.HasPrefix(str,"@close"){
fmt.Println("Session close")
//告诉外部需要断开连接
return false
//@shutdown指令表示终止服务器进程
}else if strings.HasPrefix(str,"@shutdown"){
fmt.Println("Server shutdown")
//往通道中写入0,阻塞等待接收方处理
exitChan<-0
//告诉外部需要断开连接
return false
}
//打印输入的字符串
fmt.Println(str)
return true
}
(4)程序入口
package main
import "os"
func main() {
//创建一个程序结束码的通道
exitChan:=make(chan int)
//将服务器并发运行
go server("127.0.0.1:7001",exitChan)
//通道阻塞,等待接收数据
code:=<-exitChan
//标记程序返回值并退出
os.Exit(code)
}
编译所有代码,命令行提示
listen:127.0.0.1:7001
此时Socket监听成功。在操作系统命令行中输入:
telnet 127.0.0.1 7001
尝试连接本地7001端口。接下来进入服务器测试的流程。
(5)测试输入字符串
操作系统命令行输入
hello
服务器下显示
Session started:
hello
(6)测试关闭会话
命令行输入
@close
服务器下显示
Session close
(7)测试关闭服务器
命令行输入
@shutdown
服务器显示
Server shutdown
此时服务器自动关闭。
9.4同步——保证并发环境下数据访问的正确性
Go程序可以使用通道进行多个goroutine间的数据交换,但这仅仅是数据同步中的一种方法。通道内部的实现依然使用了各种锁,因此优雅代码的代价是性能。在某些轻量级的场合,原子访问(atomic包),互斥锁(sync.Mutex)以及等待组(sync.WaitGroup)能最大程度满足需求。
1、竞态检测——检测代码在并发环境下可能出现的问题
在多线程并发运行的程序竞争访问和修改同一块资源时,会发生竞态问题。
竞态检测
package main
import (
"sync/atomic"
"fmt"
)
var(
//序列号
seq int64
)
//序列号生成器
func GetID()int64 {
//尝试原子的增加序列号
return atomic.AddInt64(&seq,1)
}
func main() {
//生成10个并发序列号
for i:=0;i<10 ;i++ {
go GetID()
}
fmt.Println(GetID())
}
2、互斥锁(sync.Mutex)——保证同时只有一个goroutine
互斥锁是一种常用的控制共享资源访问的方法。
package main
import (
"sync"
"fmt"
)
var(
//逻辑中使用的某个变量
count int
//与变量对应的使用互斥锁
countGuard sync.Mutex
)
func GetCount() int {
//锁定
countGuard.Lock()
//在函数退出时解除锁定
defer countGuard.Unlock()
return count
}
func setCount(c int) {
countGuard.Lock()
count=c
countGuard.Unlock()
}
func main() {
//可以进行并发安全的设置
setCount(1)
//可以进行并发安全的获取
fmt.Println(GetCount())
}
一般情况下,建议互斥锁的粒度设置越小越好。
3、读写互斥锁(sync.RWMutex)——在读比写多的环境下比互斥锁更高效
//sync.RWMutex 读比写多高效
var(
//逻辑中某变量
count int
//与变量对应的互斥锁
countGuard sync.RWMutex
)
func GetCount() int {
countGuard.Lock()
defer countGuard.Unlock()
return count
}
4、等待组(sync.WaitGroup)——保证在并发环境中完成指定数量的任务
等待组的方法
等待组
package main
import (
"sync"
"net/http"
"fmt"
)
func main() {
//声明一个等待组
var wg sync.WaitGroup
//准备一系列的网站地址
var urls=[]string{
"http://www.github.com/",
"http://www.qiniu.com/",
"http://www.golangtc.com/",
}
//遍历这些地址
for _,url:=range urls{
//每个任务开始时,将等待组增加1
wg.Add(1)
//开启一个并发
go func(url string) {
//使用defer,表示函数完成时将等待组值减1
defer wg.Done()
//使用http访问提供的地址
_,err:=http.Get(url)
//访问完后,打印地址和可能发生的错误
fmt.Println(url,err)
}(url)
}
//等待所有的任务完成
wg.Wait()
fmt.Println("over")
}