基础数据类型
整型
int8int16int32int6uint8uint16uint32uint64
Unicoderuneint32byteuint8
常量
iota 常量生成器
常量声明可以使用iota常量生成器初始化,它用于生成一组以相似规则初始化的常量,但是不用每行都写一遍初始化表达式。在一个const声明语句中,在第一个声明的常量所在的行,iota将会被置为0,然后在每一个有常量声明的行加一。
下面是来自time包的例子,它首先定义了一个Weekday命名类型,然后为一周的每天定义了一个常量,从周日0开始。在其它编程语言中,这种类型一般被称为枚举类型。
type Weekday int
const (
Sunday Weekday = iota
Monday
Tuesday
Wednesday
Thursday
Friday
Saturday
)
周日将对应0,周一为1,如此等等。
我们也可以在复杂的常量表达式中使用iota,下面是来自net包的例子,用于给一个无符号整数的最低5bit的每个bit指定一个名字:
type Flags uint
const (
FlagUp Flags = 1 << iota // is up
FlagBroadcast // supports broadcast access capability
FlagLoopback // is a loopback interface
FlagPointToPoint // belongs to a point-to-point link
FlagMulticast // supports multicast access capability
)
1 << iota
复合数据类型
数组
数组是一个由固定长度的特定类型元素组成的序列,一个数组可以由零个或多个元素组成。因为数组的长度是固定的,因此在Go语言中很少直接使用数组。和数组对应的类型是Slice(切片),它是可以增长和收缩的动态序列,slice功能也更灵活,但是要理解slice工作原理的话需要先理解数组。
默认情况下,数组的每个元素都被初始化为元素类型对应的零值,对于数字类型来说就是0。我们也可以使用数组字面值语法用一组值来初始化数组:
var q [3]int = [3]int{1, 2, 3}
var r [3]int = [3]int{1, 2}
fmt.Println(r[2]) // "0"
在数组字面值中,如果在数组的长度位置出现的是“...”省略号,则表示数组的长度是根据初始化值的个数来计算。因此,上面q数组的定义可以简化为
q := [...]int{1, 2, 3}
fmt.Printf("%T\n", q) // "[3]int"
切片
[]TT
内置的make函数创建一个指定元素类型、长度和容量的slice。容量部分可以省略,在这种情况下,容量将等于长度
make([]T, len)
make([]T, len, cap) // same as make([]T, cap)[:len]
在底层,make创建了一个匿名的数组变量,然后返回一个slice;只有通过返回的slice才能引用底层匿名的数组变量。在第一种语句中,slice是整个数组的view。在第二个语句中,slice只引用了底层数组的前len个元素,但是容量将包含整个的数组。额外的元素是留给未来的增长用的。
函数
Deferred函数
当执行到该条语句时,函数和参数表达式得到计算,但直到包含该defer语句的函数执行完毕时,defer后的函数才会被执行,不论包含defer语句的函数是通过return正常结束,还是由于panic导致的异常结束
func main() {
f(3)
}
func f(x int) {
fmt.Printf("f(%d)\n", x+0/x) // panics if x == 0
defer fmt.Printf("defer %d\n", x)
f(x - 1)
}
上例中的运行输出如下
f(3)
f(2)
f(1)
defer 1
defer 2
defer 3
当f(0)被调用时,发生panic异常,之前被延迟执行的3个fmt.Printf被调用。程序中断执行后,panic信息和堆栈信息会被输出(下面是简化的输出)
panic: runtime error: integer divide by zero
main.f(0)
src/gopl.io/ch5/defer1/defer.go:14
main.f(1)
src/gopl.io/ch5/defer1/defer.go:16
main.f(2)
src/gopl.io/ch5/defer1/defer.go:16
main.f(3)
src/gopl.io/ch5/defer1/defer.go:16
main.main()
src/gopl.io/ch5/defer1/defer.go:10
Notice
方法
基于指针对象的方法
在每一个合法的方法调用表达式中,也就是下面三种情况里的任意一种情况都是可以的
- 要么接收器的实际参数和其形式参数是相同的类型,比如两者都是类型T或者都是类型*T
Point{1, 2}.Distance(q) // Point
pptr.ScaleBy(2) // *Point
- 或者接收器实参是类型T,但接收器形参是类型*T,这种情况下编译器会隐式地为我们取变量的地址
p.ScaleBy(2) // implicit (&p)
- 或者接收器实参是类型*T,形参是类型T。编译器会隐式地为我们解引用,取到指针指向的实际变量
pptr.Distance(q) // implicit (*pptr)
如果命名类型T(译注:用type xxx定义的类型)的所有方法都是用T类型自己来做接收器(而不是*T),那么拷贝这种类型的实例就是安全的;调用他的任何一个方法也就会产生一个值的拷贝。比如time.Duration的这个类型,在调用其方法时就会被全部拷贝一份,包括在作为参数传入函数的时候。但是如果一个方法使用指针作为接收器,你需要避免对其进行拷贝,因为这样可能会破坏掉该类型内部的不变性。比如你对bytes.Buffer对象进行了拷贝,那么可能会引起原始对象和拷贝对象只是别名而已,实际上它们指向的对象是一样的。紧接着对拷贝后的变量进行修改可能会有让你有意外的结果。
总结 其实有两点
- 不管你的method的receiver是指针类型还是非指针类型,都是可以通过指针/非指针类型进行调用的,编译器会帮你做类型转换
- 在声明一个method的receiver该是指针还是非指针类型时,你需要考虑两方面的因素,第一方面是这个对象本身是不是特别大,如果声明为非指针变量时,调用会产生一次拷贝;第二方面是如果你用指针类型作为receiver,那么你一定要注意,这种指针类型指向的始终是一块内存地址,就算你对其进行了拷贝。熟悉C或者C++的人这里应该很快能明白。
通过嵌入结构体来扩展类型
type Point struct{ X, Y float64 }
type ColoredPoint struct {
Point
Color color.RGBA
}
//!-decl
func (p Point) Distance(q Point) float64 {
dX := q.X - p.X
dY := q.Y - p.Y
return math.Sqrt(dX*dX + dY*dY)
}
func (p *Point) ScaleBy(factor float64) {
p.X *= factor
p.Y *= factor
}
func main() {
//!+main
red := color.RGBA{255, 0, 0, 255}
blue := color.RGBA{0, 0, 255, 255}
var p = ColoredPoint{Point{1, 1}, red}
var q = ColoredPoint{Point{5, 4}, blue}
fmt.Println(p.Distance(q.Point)) // "5"
p.ScaleBy(2)
q.ScaleBy(2)
fmt.Println(p.Distance(q.Point)) // "10"
//!-main
}
在类型中内嵌的匿名字段也可能是一个命名类型的指针,这种情况下字段和方法会被间接地引入到当前的类型中(译注:访问需要通过该指针指向的对象去取)。添加这一层间接关系让我们可以共享通用的结构并动态地改变对象之间的关系。下面这个ColoredPoint的声明内嵌了一个*Point的指针。
type ColoredPoint struct {
*Point
Color color.RGBA
}
p := ColoredPoint{&Point{1, 1}, red}
q := ColoredPoint{&Point{5, 4}, blue}
fmt.Println(p.Distance(*q.Point)) // "5"
q.Point = p.Point // p and q now share the same Point
p.ScaleBy(2)
fmt.Println(*p.Point, *q.Point) // "{2 2} {2 2}"
一个struct类型也可能会有多个匿名字段。我们将ColoredPoint定义为下面这样
type ColoredPoint struct {
Point
color.RGBA
}
然后这种类型的值便会拥有Point和RGBA类型的所有方法,以及直接定义在ColoredPoint中的方法。当编译器解析一个选择器到方法时,比如p.ScaleBy,它会首先去找直接定义在这个类型里的ScaleBy方法,然后找被ColoredPoint的内嵌字段们引入的方法,然后去找Point和RGBA的内嵌字段引入的方法,然后一直递归向下找。如果选择器有二义性的话编译器会报错,比如你在同一级里有两个同名的方法。
方法值和方法表达式
方法值
p.Distance()p.Distance叫作“选择器”方法“值”Point.Distance
p := Point{1, 2}
q := Point{4, 6}
distanceFromP := p.Distance // method value
fmt.Println(distanceFromP(q)) // "5"
var origin Point // {0, 0}
fmt.Println(distanceFromP(origin)) // "2.23606797749979", sqrt(5)
scaleP := p.ScaleBy // method value
scaleP(2) // p becomes (2, 4)
scaleP(3) // then (6, 12)
scaleP(10) // then (60, 120)
方法表达式
p.DistanceTT.f(*T).f函数“值”
p := Point{1, 2}
q := Point{4, 6}
distance := Point.Distance // method expression
fmt.Println(distance(p, q)) // "5"
fmt.Printf("%T\n", distance) // "func(Point, Point) float64"
scale := (*Point).ScaleBy
scale(&p, 2)
fmt.Println(p) // "{2 4}"
fmt.Printf("%T\n", scale) // "func(*Point, float64)"
// 译注:这个Distance实际上是指定了Point对象为接收器的一个方法func (p Point) Distance(),
// 但通过Point.Distance得到的函数需要比实际的Distance方法多一个参数,
// 即其需要用第一个额外参数指定接收器,后面排列Distance方法的参数。
// 看起来本书中函数和方法的区别是指有没有接收器,而不像其他语言那样是指有没有返回值。
封装
一个对象的变量或者方法如果对调用方是不可见的话,一般就被定义为“封装”。封装有时候也被叫做信息隐藏,同时也是面向对象编程最关键的一个方面。
大写首字母的标识符会从定义它们的包中被导出,小写字母的则不会struct一个类型的方法
这也就是前面的小节中IntSet被定义为struct类型的原因,尽管它只有一个字段
type IntSet struct {
words []uint64
}
最小的封装单元是package一个struct类型的字段对同一个包的所有代码都有可见性,无论你的代码是写在一个函数还是一个方法里
封装提供了三方面的优点
- 首先,因为调用方不能直接修改对象的变量值,其只需要关注少量的语句并且只要弄懂少量变量的可能的值即可
- 隐藏实现的细节,可以防止调用方依赖那些可能变化的具体实现,这样使设计包的程序员在不破坏对外的api情况下能得到更大的自由
bytes.Buffer
type Buffer struct {
buf []byte
initial [64]byte
/* ... */
}
// Grow expands the buffer's capacity, if necessary,
// to guarantee space for another n bytes. [...]
func (b *Buffer) Grow(n int) {
if b.buf == nil {
b.buf = b.initial[:0] // use preallocated space initially
}
if len(b.buf)+n > cap(b.buf) {
buf := make([]byte, b.Len(), 2*cap(b.buf) + n)
copy(buf, b.buf)
b.buf = buf
}
}
- 是阻止了外部调用方对对象内部的值任意地进行修改。因为对象内部变量只可以被同一个包内的函数修改,所以包的作者可以让这些函数确保对象内部的一些值的不变性
接口
实现接口的条件
*os.Fileio.ReaderWriterCloserReadWriter*bytes.BufferReaderWriterReadWriterCloserClose*bytes.Bufferio.Writer*os.Filesio.ReadWriter
接口指定的规则非常简单:表达一个类型属于某个接口只要这个类型实现这个接口
var w io.Writer
w = os.Stdout // OK: *os.File has Write method
w = new(bytes.Buffer) // OK: *bytes.Buffer has Write method
w = time.Second // compile error: time.Duration lacks Write method
var rwc io.ReadWriteCloser
rwc = os.Stdout // OK: *os.File has Read, Write, Close methods
rwc = new(bytes.Buffer) // compile error: *bytes.Buffer lacks Close method
这个规则甚至适用于等式右边本身也是一个接口类型
w = rwc // OK: io.ReadWriteCloser has Write method
rwc = w // compile error: io.Writer lacks Close method
ReadWritereadWriteCloserWriterReadWriterReadWriteCloserWriter
值接收者和指针接收者实现接口的区别
- 使用值接收者实现接口之后,不管是dog结构体还是结构体指针dog类型的变量都可以赋值给该接口变量。因为Go语言中有对指针类型变量求值的语法糖,dog指针fugui内部会自动求值fugui
- 使用指针接收者实现的接口,只能使用指针类型的变量赋值给该接口变量
type Mover interface {
move()
}
type dog struct {}
func (d *dog) move() {
fmt.Println("狗会动")
}
func main() {
var x Mover
var wangcai = dog{} // 旺财是dog类型
x = wangcai // x不可以接收dog类型
var fugui = &dog{} // 富贵是*dog类型
x = fugui // x可以接收*dog类型
}
接口值
接口值,由两个部分组成,一个具体的类型和那个类型的值
下面4个语句中,变量w得到了3个不同的值。(开始和最后的值是相同的)
var w io.Writer
w = os.Stdout
w = new(bytes.Buffer)
w = nil
接口比较
接口值可以使用==和!=来进行比较
然而,如果两个接口值的动态类型相同,但是这个动态类型是不可比较的(比如切片),将它们进行比较就会失败并且panic
var x interface{} = []int{1, 2, 3}
fmt.Println(x == x) // panic: comparing uncomparable type []int
考虑到这点,接口类型是非常与众不同的。其它类型要么是安全的可比较类型(如基本类型和指针)要么是完全不可比较的类型(如切片,映射类型,和函数),但是在比较接口值或者包含了接口值的聚合类型时,我们必须要意识到潜在的panic。同样的风险也存在于使用接口作为map的键或者switch的操作数。只能比较你非常确定它们的动态值是可比较类型的接口值。
警告:一个包含nil指针的接口不是nil接口
一个不包含任何值的nil接口值和一个刚好包含nil指针的接口值是不同的。这个细微区别产生了一个容易绊倒每个Go程序员的陷阱
思考下面的程序。当debug变量设置为true时,main函数会将f函数的输出收集到一个bytes.Buffer类型中。
onst debug = true
func main() {
var buf *bytes.Buffer
if debug {
buf = new(bytes.Buffer) // enable collection of output
}
f(buf) // NOTE: subtly incorrect!
if debug {
// ...use buf...
}
}
// If out is non-nil, output will be written to it.
func f(out io.Writer) {
// ...do something...
if out != nil {
out.Write([]byte("done!\n"))
}
}
debugfalse
if out != nil {
out.Write([]byte("done!\n")) // panic: nil pointer dereference
}
*bytes.Buffer*bytes.Buffer
(*bytes.Buffer).Writeos.Filebytes.Buffer
*bytes.Buffer(*bytes.Buffer).Writeio.Writer
var buf io.Writer
if debug {
buf = new(bytes.Buffer) // enable collection of output
}
f(buf) // OK
类型断言
类型断言是一个使用在接口值上的操作。语法上它看起来像x.(T)被称为断言类型,这里x表示一个接口的类型和T表示一个类型。一个类型断言检查它操作对象的动态类型是否和断言的类型匹配。
断言类型T
这里有两种可能。
- 第一种,如果断言的类型T是一个具体类型,然后类型断言检查x的动态类型是否和T相同。如果这个检查成功了,类型断言的结果是x的动态值,当然它的类型是T。换句话说,具体类型的类型断言从它的操作对象中获得具体的值。如果检查失败,接下来这个操作会抛出panic。例如:
var w io.Writer
w = os.Stdout
f := w.(*os.File) // success: f == os.Stdout
c := w.(*bytes.Buffer) // panic: interface holds *os.File, not *bytes.Buffer
- 第二种,如果相反地断言的类型T是一个接口类型,然后类型断言检查是否x的动态类型满足T。如果这个检查成功了,动态值没有获取到;这个结果仍然是一个有相同动态类型和值部分的接口值,但是结果为类型T。换句话说,对一个接口类型的类型断言改变了类型的表述方式,改变了可以获取的方法集合(通常更大),但是它保留了接口值内部的动态类型和值的部分。
var w io.Writer
w = os.Stdout
rw := w.(io.ReadWriter) // success: *os.File has both Read and Write
w = new(ByteCounter)
rw = w.(io.ReadWriter) // panic: *ByteCounter has no Read method
w和rw都持有os.Stdout,因此它们都有一个动态类型*os.File,但是变量w是一个io.Writer类型,只对外公开了文件的Write方法,而rw变量还公开了它的Read方法。
nil接口值
如果断言操作的对象是一个nil接口值,那么不论被断言的类型是什么这个类型断言都会失败。我们几乎不需要对一个更少限制性的接口类型(更少的方法集合)做断言,因为它表现的就像是赋值操作一样,除了对于nil接口值的情况。
w = rw // io.ReadWriter is assignable to io.Writer
w = rw.(io.Writer) // fails only if rw == nil
不引发panic的类型断言
经常地,对一个接口值的动态类型我们是不确定的,并且我们更愿意去检验它是否是一些特定的类型。如果类型断言出现在一个预期有两个结果的赋值操作中,例如如下的定义,这个操作不会在失败的时候发生panic,但是替代地返回一个额外的第二个结果,这个结果是一个标识成功与否的布尔值:
var w io.Writer = os.Stdout
f, ok := w.(*os.File) // success: ok, f == os.Stdout
b, ok := w.(*bytes.Buffer) // failure: !ok, b == nil
第二个结果通常赋值给一个命名为ok的变量。如果这个操作失败了,那么ok就是false值,第一个结果等于被断言类型的零值,在这个例子中就是一个nil的*bytes.Buffer类型
这个ok结果经常立即用于决定程序下面做什么。if语句的扩展格式让这个变的很简洁:
if f, ok := w.(*os.File); ok {
// ...use f...
}
类型分支
// switch x.(type){...}
switch x := x.(type) {
case nil: // ...
case int, uint: // ...
case bool: // ...
case string: // ...
default: // ...
}
这里我们已经将新的变量也命名为x;和类型断言一样,重用变量名是很常见的。和一个switch语句相似地,一个类型分支隐式的创建了一个词法块,因此新变量x的定义不会和外面块中的x变量冲突。每一个case也会隐式的创建一个单独的词法块
通道 Channels
goroutinechannelgoroutinegoroutinechannelchannelschan int
通道创建
make
ch := make(chan int) // ch has type 'chan int'
和map类似,channel也对应一个make创建的底层数据结构的引用
两个相同类型的channel可以使用==运算符比较
通道操作
一个channel有发送和接受两个主要操作,都是通信行为
ch <- x // a send statement
x = <-ch // a receive expression in an assignment statement
<-ch // a receive statement; result is discarded
通道缓存
make无缓存的channelchannel容量大于零带缓存的channel
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel with capacity 3
向缓存Channel的发送操作就是向内部缓存队列的尾部插入元素,接收操作则是从队列的头部删除元素。如果内部缓存队列是满的,那么发送操作将阻塞直到因另一个goroutine执行接收操作而释放了新的队列空间。相反,如果channel是空的,接收操作将阻塞直到有另一个goroutine执行发送操作而向队列插入元素。
cap函数
fmt.Println(cap(ch)) // "3"
len函数因为在并发程序中该信息会随着接收操作而失效,但是它对某些故障诊断和性能优化会有帮助
fmt.Println(len(ch)) // "2"
Go语言新手有时候会将一个带缓存的channel当作同一个goroutine中的队列使用,虽然语法看似简单,但实际上这是一个错误。Channel和goroutine的调度器机制是紧密相连的,如果没有其他goroutine从channel接收,发送者——或许是整个程序——将会面临永远阻塞的风险。如果你只是需要一个简单的队列,使用slice就可以了
基于select的多路复用
select {
case <-ch1:
// ...
case x := <-ch2:
// ...use x...
case ch3 <- y:
// ...
default:
// ...
}
上面是select语句的一般形式。和switch语句稍微有点相似,也会有几个case和最后的default选择分支。每一个case代表一个通信操作(在某个channel上进行发送或者接收),并且会包含一些语句组成的一个语句块。一个接收表达式可能只包含接收表达式自身(译注:不把接收到的值赋值给变量什么的),就像上面的第一个case,或者包含在一个简短的变量声明中,像第二个case里一样;第二种形式让你能够引用接收到的值。
一个没有任何case的select语句写作select{},会永远地等待下去
ch这个channel的buffer大小是1,所以会交替的为空或为满
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
case x := <-ch:
fmt.Println(x) // "0" "2" "4" "6" "8"
case ch <- i:
}
}
反复地做这样的操作叫做“轮询channel”
select {
case <-abort:
fmt.Printf("Launch aborted!\n")
return
default:
// do nothing
}
channel的零值是nil。也许会让你觉得比较奇怪,nil的channel有时候也是有一些用处的。因为对一个nil的channel发送和接收操作会永远阻塞,在select语句中操作nil的channel永远都不会被select到。
并发的退出
为了能够达到我们退出goroutine的目的,我们需要更靠谱的策略,来通过一个channel把消息广播出去,这样goroutine们能够看到这条事件消息,并且在事件完成之后,可以知道这件事已经发生过了。
回忆一下我们关闭了一个channel并且被消费掉了所有已发送的值,操作channel之后的代码可以立即被执行,并且会产生零值。我们可以将这个机制扩展一下,来作为我们的广播机制:不要向channel发送值,而是用关闭一个channel来进行广播
eg: DEMO
package main
// The du3 variant traverses all directories in parallel.
// It uses a concurrency-limiting counting semaphore
// to avoid opening too many files at once.
import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"
)
var vFlag = flag.Bool("v", false, "show verbose progress messages")
//!+
func main() {
// ...determine roots...
//!-
flag.Parse()
// Determine the initial directories.
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}
//!+
// Traverse each root of the file tree in parallel.
fileSizes := make(chan int64)
var n sync.WaitGroup
for _, root := range roots {
n.Add(1)
go walkDir(root, &n, fileSizes)
}
go func() {
n.Wait()
close(fileSizes)
}()
//!-
// Print the results periodically.
var tick <-chan time.Time
if *vFlag {
tick = time.Tick(500 * time.Millisecond)
}
var nfiles, nbytes int64
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop // fileSizes was closed
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
printDiskUsage(nfiles, nbytes) // final totals
//!+
// ...select loop...
}
//!-
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
// walkDir recursively walks the file tree rooted at dir
// and sends the size of each found file on fileSizes.
//!+walkDir
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
for _, entry := range dirents(dir) {
if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
//!-walkDir
//!+sema
// sema is a counting semaphore for limiting concurrency in dirents.
var sema = make(chan struct{}, 20)
// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
sema <- struct{}{} // acquire token
defer func() { <-sema }() // release token
// ...
//!-sema
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
return entries
}
- 首先,我们创建一个退出的channel,不需要向这个channel发送任何值,但其所在的闭包内要写明程序需要退出。我们同时还定义了一个工具函数,cancelled,这个函数在被调用的时候会轮询退出状态。
var done = make(chan struct{})
func cancelled() bool {
select {
case <-done:
return true
default:
return false
}
}
- 我们创建一个从标准输入流中读取内容的goroutine,这是一个比较典型的连接到终端的程序。每当有输入被读到(比如用户按了回车键),这个goroutine就会把取消消息通过关闭done的channel广播出去。
// Cancel traversal when input is detected.
go func() {
os.Stdin.Read(make([]byte, 1)) // read a single byte
close(done)
}()
- 现在我们需要使我们的goroutine来对取消进行响应。在main goroutine中,我们添加了select的第三个case语句,尝试从done channel中接收内容。如果这个case被满足的话,在select到的时候即会返回,但在结束之前我们需要把fileSizes channel中的内容“排”空,在channel被关闭之前,舍弃掉所有值。这样可以保证对walkDir的调用不要被向fileSizes发送信息阻塞住,可以正确地完成。
for {
select {
case <-done:
// Drain fileSizes to allow existing goroutines to finish.
for range fileSizes {
// Do nothing.
}
return
case size, ok := <-fileSizes:
// ...
}
}
- walkDir这个goroutine一启动就会轮询取消状态,如果取消状态被设置的话会直接返回,并且不做额外的事情。这样我们将所有在取消事件之后创建的goroutine改变为无操作
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
if cancelled() {
return
}
for _, entry := range dirents(dir) {
// ...
}
}
在walkDir函数的循环中我们对取消状态进行轮询可以带来明显的益处,可以避免在取消事件发生时还去创建goroutine。取消本身是有一些代价的;想要快速的响应需要对程序逻辑进行侵入式的修改。确保在取消发生之后不要有代价太大的操作可能会需要修改你代码里的很多地方,但是在一些重要的地方去检查取消事件也确实能带来很大的好处。
- 对这个程序的一个简单的性能分析可以揭示瓶颈在dirents函数中获取一个信号量。下面的select可以让这种操作可以被取消,并且可以将取消时的延迟从几百毫秒降低到几十毫秒。
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}: // acquire token
case <-done:
return nil // cancelled
}
defer func() { <-sema }() // release token
// ...read directory...
}
现在当取消发生时,所有后台的goroutine都会迅速停止并且主函数会返回。当然,当主函数返回时,一个程序会退出,而我们又无法在主函数退出的时候确认其已经释放了所有的资源(译注:因为程序都退出了,你的代码都没法执行了)
这里有一个方便的窍门我们可以一用:取代掉直接从主函数返回,我们调用一个panic,然后runtime会把每一个goroutine的栈dump下来。如果main goroutine是唯一一个剩下的goroutine的话,他会清理掉自己的一切资源。但是如果还有其它的goroutine没有退出,他们可能没办法被正确地取消掉,也有可能被取消但是取消操作会很花时间;所以这里的一个调研还是很有必要的。我们用panic来获取到足够的信息来验证我们上面的判断,看看最终到底是什么样的情况。
基于共享变量的并发
竞争条件
一个好的经验法则是根本就没有什么所谓的良性数据竞争。所以我们一定要避免数据竞争,那么在我们的程序中要如何做到呢?
有三种方式可以避免数据竞争
- 第一种方法是不要去写变量
考虑一下下面的map,会被“懒”填充,也就是说在每个key被第一次请求到的时候才会去填值。如果Icon是被顺序调用的话,这个程序会工作很正常,但如果Icon被并发调用,那么对于这个map来说就会存在数据竞争。
var icons = make(map[string]image.Image)
func loadIcon(name string) image.Image
// NOTE: not concurrency-safe!
func Icon(name string) image.Image {
icon, ok := icons[name]
if !ok {
icon = loadIcon(name)
icons[name] = icon
}
return icon
}
反之,如果我们在创建goroutine之前的初始化阶段,就初始化了map中的所有条目并且再也不去修改它们,那么任意数量的goroutine并发访问Icon都是安全的,因为每一个goroutine都只是去读取而已。
var icons = map[string]image.Image{
"spades.png": loadIcon("spades.png"),
"hearts.png": loadIcon("hearts.png"),
"diamonds.png": loadIcon("diamonds.png"),
"clubs.png": loadIcon("clubs.png"),
}
// Concurrency-safe.
func Icon(name string) image.Image { return icons[name] }
上面的例子里icons变量在包初始化阶段就已经被赋值了,包的初始化是在程序main函数开始执行之前就完成了的。只要初始化完成了,icons就再也不会被修改。数据结构如果从不被修改或是不变量则是并发安全的,无需进行同步。不过显然,如果update操作是必要的,我们就没法用这种方法,比如说银行账户
- 第二种避免数据竞争的方法是,避免从多个goroutine访问变量。
这也就是Go的口头禅“不要使用共享数据来通信;使用通信来共享数据”
银行的例子
// Package bank provides a concurrency-safe bank with one account.
package bank
var deposits = make(chan int) // send amount to deposit
var balances = make(chan int) // receive balance
func Deposit(amount int) { deposits <- amount }
func Balance() int { return <-balances }
func teller() {
var balance int // balance is confined to teller goroutine
for {
select {
case amount := <-deposits:
balance += amount
case balances <- balance:
}
}
}
func init() {
go teller() // start the monitor goroutine
}
即使当一个变量无法在其整个生命周期内被绑定到一个独立的goroutine,绑定依然是并发问题的一个解决方案
如果流水线的每一个阶段都能够避免在将变量传送到下一阶段后再去访问它,那么对这个变量的所有访问就是线性的串行绑定
下面的例子中,Cakes会被严格地顺序访问,先是baker gorouine,然后是icer gorouine
type Cake struct{ state string }
func baker(cooked chan<- *Cake) {
for {
cake := new(Cake)
cake.state = "cooked"
cooked <- cake // baker never touches this cake again
}
}
func icer(iced chan<- *Cake, cooked <-chan *Cake) {
for cake := range cooked {
cake.state = "iced"
iced <- cake // icer never touches this cake again
}
}
- 第三种避免数据竞争的方法是允许很多goroutine去访问变量,但是在同一个时刻最多只有一个goroutine在访问。这种方式被称为“互斥”
sync.Mutex互斥锁
二元信号量(binary semaphore)
var (
sema = make(chan struct{}, 1) // a binary semaphore guarding balance
balance int
)
func Deposit(amount int) {
sema <- struct{}{} // acquire token
balance = balance + amount
<-sema // release token
}
func Balance() int {
sema <- struct{}{} // acquire token
b := balance
<-sema // release token
return b
}
syncMutex类型
import "sync"
var (
mu sync.Mutex // guards balance
balance int
)
func Deposit(amount int) {
mu.Lock()
balance = balance + amount
mu.Unlock()
}
func Balance() int {
mu.Lock()
b := balance
mu.Unlock()
return b
}
sync.RWMutex读写锁
这种锁叫作“多读单写”锁(multiple readers, single writer lock),Go语言提供的这样的锁是sync.RWMutex
var mu sync.RWMutex
var balance int
func Balance() int {
mu.RLock() // readers lock
defer mu.RUnlock()
return balance
}
RWMutex只有当获得锁的大部分goroutine都是读操作,而锁在竞争条件下,也就是说,goroutine们必须等待才能获取到锁的时候,RWMutex才是最能带来好处的。RWMutex需要更复杂的内部记录,所以会让它比一般的无竞争锁的mutex慢一些
内存同步
考虑一下下面代码片段的可能输出
var x, y int
go func() {
x = 1 // A1
fmt.Print("y:", y, " ") // A2
}()
go func() {
y = 1 // B1
fmt.Print("x:", x, " ") // B2
}()
我们可能希望它能够打印出下面这四种结果中的一种,相当于几种不同的交错执行时的情况
y:0 x:1
x:0 y:1
x:1 y:1
y:1 x:1
然而实际运行时还是有些情况让我们有点惊讶
x:0 y:0
y:0 x:0
那么这两种情况要怎么解释呢?
在一个独立的goroutine中,每一个语句的执行顺序是可以被保证的,也就是说goroutine内顺序是连贯的。但是在不使用channel且不使用mutex这样的显式同步操作时,我们就没法保证事件在不同的goroutine中看到的执行顺序是一致的了。尽管goroutine A中一定需要观察到x=1执行成功之后才会去读取y,但它没法确保自己观察得到goroutine B中对y的写入,所以A还可能会打印出y的一个旧版的值。
尽管去理解并发的一种尝试是去将其运行理解为不同goroutine语句的交错执行,但看看上面的例子,这已经不是现代的编译器和cpu的工作方式了。因为赋值和打印指向不同的变量,编译器可能会断定两条语句的顺序不会影响执行结果,并且会交换两个语句的执行顺序。如果两个goroutine在不同的CPU上执行,每一个核心有自己的缓存,这样一个goroutine的写入对于其它goroutine的Print,在主存同步之前就是不可见的了。
所有并发的问题都可以用一致的、简单的既定的模式来规避。所以可能的话,将变量限定在goroutine内部;如果是多个goroutine都需要访问的变量,使用互斥条件来访问。
示例: 并发的非阻塞缓存
本节中我们会做一个无阻塞的缓存,这种工具可以帮助我们来解决现实世界中并发程序出现但没有现成的库可以解决的问题。这个问题叫作缓存(memoizing)函数, 也就是说,我们需要缓存函数的返回结果,这样在对函数进行调用的时候,我们就只需要一次计算,之后只要返回计算的结果就可以了。我们的解决方案会是并发安全且会避免对整个缓存加锁而导致所有操作都去争一个锁的设计
我们将使用下面的httpGetBody函数作为我们需要缓存的函数的一个样例。这个函数会去进行HTTP GET请求并且获取http响应body。对这个函数的调用本身开销是比较大的,所以我们尽量避免在不必要的时候反复调用。
func httpGetBody(url string) (interface{}, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
下面是我们要设计的cache的一个“草稿”:
package memo
import "sync"
// Func is the type of the function to memoize.
type Func func(string) (interface{}, error)
type result struct {
value interface{}
err error
}
//!+
type entry struct {
res result
ready chan struct{} // closed when res is ready
}
func New(f Func) *Memo {
return &Memo{f: f, cache: make(map[string]*entry)}
}
type Memo struct {
f Func
mu sync.Mutex // guards cache
cache map[string]*entry
}
func (memo *Memo) Get(key string) (value interface{}, err error) {
memo.mu.Lock()
e := memo.cache[key]
if e == nil {
// This is the first request for this key.
// This goroutine becomes responsible for computing
// the value and broadcasting the ready condition.
e = &entry{ready: make(chan struct{})}
memo.cache[key] = e
memo.mu.Unlock()
e.res.value, e.res.err = memo.f(key)
close(e.ready) // broadcast ready condition
} else {
// This is a repeat request for this key.
memo.mu.Unlock()
<-e.ready // wait for ready condition
}
return e.res.value, e.res.err
}
现在Get函数包括下面这些步骤了:获取互斥锁来保护共享变量cache map,查询map中是否存在指定条目,如果没有找到那么分配空间插入一个新条目,释放互斥锁。如果存在条目的话且其值没有写入完成(也就是有其它的goroutine在调用f这个慢函数)时,goroutine必须等待值ready之后才能读到条目的结果。而想知道是否ready的话,可以直接从ready channel中读取,由于这个读取操作在channel关闭之前一直是阻塞。
如果没有条目的话,需要向map中插入一个没有准备好的条目,当前正在调用的goroutine就需要负责调用慢函数、更新条目以及向其它所有goroutine广播条目已经ready可读的消息了。
条目中的e.res.value和e.res.err变量是在多个goroutine之间共享的。创建条目的goroutine同时也会设置条目的值,其它goroutine在收到"ready"的广播消息之后立刻会去读取条目的值。尽管会被多个goroutine同时访问,但却并不需要互斥锁。ready channel的关闭一定会发生在其它goroutine接收到广播事件之前,因此第一个goroutine对这些变量的写操作是一定发生在这些读操作之前的。不会发生数据竞争。
这样并发、不重复、无阻塞的cache就完成了。
互斥量来保护多个goroutine调用Get时的共享map变量把map变量限制在一个单独的monitor goroutine的方案
package memo
//!+Func
// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)
// A result is the result of calling a Func.
type result struct {
value interface{}
err error
}
type entry struct {
res result
ready chan struct{} // closed when res is ready
}
//!-Func
//!+get
// A request is a message requesting that the Func be applied to key.
type request struct {
key string
response chan<- result // the client wants a single result
}
type Memo struct{ requests chan request }
// New returns a memoization of f. Clients must subsequently call Close.
func New(f Func) *Memo {
memo := &Memo{requests: make(chan request)}
go memo.server(f)
return memo
}
func (memo *Memo) Get(key string) (interface{}, error) {
response := make(chan result)
memo.requests <- request{key, response}
res := <-response
return res.value, res.err
}
func (memo *Memo) Close() { close(memo.requests) }
//!-get
//!+monitor
func (memo *Memo) server(f Func) {
cache := make(map[string]*entry)
for req := range memo.requests {
e := cache[req.key]
if e == nil {
// This is the first request for this key.
e = &entry{ready: make(chan struct{})}
cache[req.key] = e
go e.call(f, req.key) // call f(key)
}
go e.deliver(req.response)
}
}
func (e *entry) call(f Func, key string) {
// Evaluate the function.
e.res.value, e.res.err = f(key)
// Broadcast the ready condition.
close(e.ready)
}
func (e *entry) deliver(response chan<- result) {
// Wait for the ready condition.
<-e.ready
// Send the result to the client.
response <- e.res
}
//!-monitor
然而Memo类型现在包含了一个叫做requests的channel,Get的调用方用这个channel来和monitor goroutine来通信。requests channel中的元素类型是request。Get的调用方会把这个结构中的两组key都填充好,实际上用这两个变量来对函数进行缓存的。另一个叫response的channel会被拿来发送响应结果。这个channel只会传回一个单独的值。
monitor goroutine
monitor goroutine(*Memo).server
和基于互斥量的版本类似
紧接着对同一个key的请求会发现map中已经有了存在的条目,然后会等待结果变为ready,并将结果从response发送给客户端的goroutien。上述工作是用(*entry).deliver来完成的。对call和deliver方法的调用必须让它们在自己的goroutine中进行以确保monitor goroutines不会因此而被阻塞住而没法处理新的请求。
这个例子说明我们无论用上锁,还是通信来建立并发程序都是可行的。
上面的两种方案并不好说特定情境下哪种更好,不过了解他们还是有价值的。有时候从一种方式切换到另一种可以使你的代码更为简洁。
- memo_test
package memo_test
import (
"testing"
"gopl.io/ch9/memo4"
"gopl.io/ch9/memotest"
)
var httpGetBody = memotest.HTTPGetBody
func Test(t *testing.T) {
m := memo.New(httpGetBody)
memotest.Sequential(t, m)
}
func TestConcurrent(t *testing.T) {
m := memo.New(httpGetBody)
memotest.Concurrent(t, m)
}
- Sequential func
func Sequential(t *testing.T, m M) {
//!+seq
for url := range incomingURLs() {
start := time.Now()
value, err := m.Get(url)
if err != nil {
log.Print(err)
continue
}
fmt.Printf("%s, %s, %d bytes\n",
url, time.Since(start), len(value.([]byte)))
}
//!-seq
}
- Concurrent func
func Concurrent(t *testing.T, m M) {
//!+conc
var n sync.WaitGroup
for url := range incomingURLs() {
n.Add(1)
go func(url string) {
defer n.Done()
start := time.Now()
value, err := m.Get(url)
if err != nil {
log.Print(err)
return
}
fmt.Printf("%s, %s, %d bytes\n",
url, time.Since(start), len(value.([]byte)))
}(url)
}
n.Wait()
//!-conc
}