并发编程

golang中的并发,是函数相互独立运行的能力,goroutines是并发运行的函数。golang提供了goroutines作为并发处理的一种方式。

创建一个协程非常简单,就是在一个任务函数前面添加一个go关键字:

go task()
package main

import (
	"fmt"
	"time"
)

func show(name string) {
	for i := 0; i < 5; i++ {
		fmt.Printf("name: %v\n", name)
		time.Sleep(time.Millisecond * 100)
	}
}

func main() {

	go show("Python") //开启一个go协程
	go show("Golang") //开启另外一个go协程
	time.Sleep(time.Millisecond * 2000)
	print("hello world!\n") //默认情况下:主线程结束后其他线程也会结束
}

运行结果

name: Golang
name: Python
name: Python
name: Golang
name: Golang
name: Python
name: Python
name: Golang
name: Python
name: Golang
hello world!

并发编程之channel通道

go提供了一种称为通道的机制,用于在goroutine之间共享数据。当goroutine执行并发活动时,需要在goroutine之间共享数据资源,channel通道充当goroutine之间的管道并提供一种机制来保证同步交换。

需要在声明通道时指定数据类型,我们可以共享内置,命名,结构和引用类型的值和指针。数据在通道上传递:在任何给定的时间只有一个goroutine可以访问数据项,因此按照设计不会发生数据竞争。

根据数据交换的行为,有两种通道类型:无缓冲通道缓冲通道。无缓存通道用于执行goroutine之间的同步信号,而缓冲通道用于执行异步信号。无缓冲通道保证在发送和接收发生的瞬间执行两个goroutine之间的交换。缓冲通道没有这样的保证。

通道由make函数创建,该函数指定chan关键字和通道的元素类型。

语法

chan_ubuffered:=make(chan int) //整型无缓冲通道
chan_buffered:=make(chan int, 10) //整型有缓冲通道

将值发送给通道使用==<-==运算符

c:=make(chan string)
c <- "hello"

从通道中取值

var name string
name <- c

无缓冲通道

在无缓冲通道中,在接收到任何值之前没有能力保存它。在这种类型的通道中,发送和接收的goroutine在任何发送或接收操作完成之前的同一时刻都是准备就绪的。如果两个goroutine没有在同一时刻准备好,则通道会让其中已准备好的一方等待另一方,直到两个都准备就绪。同步是通道上发送和接收之间交互的基础。没有另一个就不可能发生。

缓冲通道

在缓冲通道中,有能力在接收到一个或多个值之前保存它们。在这种类型的通道中,不强制goroutine在同一时刻准备好执行发送和接收。当发送或接收阻塞时也有不同的条件。只有当通道中没有要接收的值时,接收才会阻塞仅当没有可用缓冲区来放置正在发送的值时,发送才会阻塞。

package main

import (
	"fmt"
	"time"
)

// 将值传入通道
func send(c *chan string, val string) {
	*c <- val
}

// 将值从通道中取出
func get(c *chan string) *string {
	val := <-*c
	fmt.Printf("val: %v\n", val)
	return &val
}

func main() {

	c := make(chan string) //无缓冲通道
	defer close(c)

	go send(&c, "hello")
	go get(&c)
	go send(&c, "world")
	go get(&c)

	time.Sleep(time.Millisecond * 1000)
	fmt.Printf("main end...")
}

运行结果

val: hello
val: world
main end...

WaitGroup实现同步

在golang中,如果除了主协程以外还有其他的协程,当主协程结束的时候其他协程也会一起结束,不管它们有没有执行完毕,即默认是协程守护的。使用WaitGroup可以使用协程之间的同步,(个人感觉类似于python中的join方法)。

package main

import (
	"fmt"
)

func show_msg(msg int) {
	fmt.Printf("msg: %d\n", msg)
}

func main() {
	for i := 0; i < 10; i++ {
		go show_msg(i)
	}

	print("end main...")
}

运行结果

end main...msg: 3
msg: 0
msg: 1
msg: 2
msg: 6
msg: 4
msg: 9

可以看到上面的代码的每次运行结果都可能不一样,因为每一个show_msg都用一个协程来启动,它们和主协程一样,但是只要主协程执行结束,不管它们有没有被执行,都会被结束。

使用WaitGroup后

package main

import (
	"fmt"
	"sync"
)

var mg sync.WaitGroup

func show_msg(msg int) {
	fmt.Printf("msg: %d\n", msg)
	defer mg.Add(-1) //任务完成就减一
}

func main() {

	for i := 0; i < 10; i++ {
		mg.Add(1) //添加任务就加一
		go show_msg(i)
	}
	// Counter is 0, no need to wait.
	mg.Wait() //如果WaitGroup的计数为0就停止等待
	print("end main...")
}

运行结果

msg: 9
msg: 0
msg: 1
msg: 2
msg: 3
msg: 4
msg: 5
msg: 6
msg: 7
msg: 8
end main...

在使用WaitGroup后,通过WaitGroup来进行计数,每添加一个协程,计数加一(Add(1));每完成一个协程,计数减一(Add(-1)),只有当计数为0时Wait才不会等待。

并发编程之runtime包

runtime包中包含了一些协程管理相关的API。

runtime.Gosched()

功能:让出处理机,给其他的协程去使用。

// Gosched yields the processor, allowing other goroutines to run. It does not

// suspend the current goroutine, so execution resumes automatically.

package main

import (
	"fmt"
	"runtime"
)

func show_msg(msg int) {
	fmt.Printf("msg: %v\n", msg)

}

func main() {
	go show_msg(100)

	runtime.Gosched() //在主协程中使用的

	print("main end...")
}

运行结果

msg: 100
main end...

在上面的代码中,我们可以看到在主协程中调用了Gosched方法,因此当主协程和show_msg协程抢占处理机时,如果show_msg协程先抢到执行,则执行结果如上;如果主协程先抢到处理机,则它会主动让出处理机,此时show_msg会拿到处理机,因此执行结果还是如上。

runtime.Goexit()

功能:退出协程

package main

import (
	"fmt"
	"runtime"
)

func show_msg(msg int) {
	for i := 0; i < 5; i++ {
		if i == 3 { //i等于3时直接结束协程
			runtime.Goexit()
		}
		fmt.Printf("msg: %v-%v\n", msg, i)

	}

}

func main() {
	go show_msg(100)
	runtime.Gosched()
	print("main end...")
}

运行结果

msg: 100-0
msg: 100-1
msg: 100-2
main end...

Mutex实现同步

除了channel实现同步之外,还可以使用Mutex互斥锁实现同步。

package main

import (
	"fmt"
	"sync"
)

var n int = 100
var lock sync.Mutex
var wg sync.WaitGroup

func add() {
	lock.Lock() //加锁
	n += 1
	lock.Unlock() //解锁
	wg.Add(-1)
}

func sub() {
	lock.Lock()
	n -= 1
	lock.Unlock()
	wg.Add(-1) //完成协程任务,计数减一
}

func main() {

	//当多个协程共同处理某一个数据时,如果不进行加锁,就会出现数据混乱的情况,例如:多个协程对一个数值进行加一和减一的操作
	//因此在多协程中,我们可以对数据进行的操作加锁,让同一时刻只能有一个协程对数据进行操作
	for i := 0; i < 100000; i++ {
		wg.Add(1) //新增协程任务,计数加一
		go add()
		wg.Add(1)
		go sub()
	}
	wg.Wait() //主协程等待子协程执行完毕
	fmt.Printf("n: %v\n", n)

}

运行结果

n: 100

并发编程之select

  1. select是go中的一个控制结构,类似于switch语句,用于处理异步IO操作。select会监听case语句中的channel的读写操作,当case中channel读写操作为非阻塞状态时,会触发相应的动作。

    select中case语句必须是一个channel操作(读或者写)

    select中default子句总是可以运行的

  2. 如果有多个case都可运行,select会随机公平地选出一个执行,其他不会执行。

  3. 如果没有可运行的case语句,且有default语句,那么就会执行default语句。

  4. 如果没有可运行的case语句,且没有default语句,select将会阻塞。

package main

import (
	"fmt"
	"time"
)

var chanName = make(chan string, 0)
var chanAge = make(chan int, 0)

func main() {

	go func() {
		for i := 0; i < 5; i++ {
			name := "Tom"
			chanName <- name
			chanAge <- i
		}
	}()

	for {
		select {
		case name := <-chanName: //读chanName
			fmt.Printf("name: %v\n", name)
		case age := <-chanAge:
			fmt.Printf("age: %v\n", age)

		default:
			print("default...\n")

		}
		time.Sleep(time.Second)
	}

}

运行结果

default...
name: Tom
age: 0
name: Tom
age: 1
name: Tom
age: 2
name: Tom
age: 3
name: Tom
age: 4
default...
default...
default...
default...

并发编程是Timer

Timer顾名思义,就是计时器的意思,可以实现一些定时操作,内部也是通过channel实现的。

package main

import (
	"fmt"
	"time"
)

func main() {

	t1 := time.NewTimer(time.Second) //等待1秒钟
	fmt.Printf("time.Now(): %v\n", time.Now())
	res := <-t1.C //一直处于阻塞状态,直到到达等待时间
	fmt.Printf("res: %v\n", res)
	fmt.Printf("time.Now(): %v\n", time.Now())

	time.After(time.Second) //一秒后,After其实对NewTimer进行了一次封装
	fmt.Printf("time.Now(): %v\n", time.Now())

	//停止计时器
	t2 := time.NewTimer(time.Second * 2)

	go func() {
		print("执行到了这里...\n")
		<-t2.C
		print("匿名函数被执行...\n")
	}()

	t2.Stop() //如果调用了stop,上面的协程<-t2.C以后的语句都不会被执行
	time.Sleep(time.Second * 3)

	//修改计时时间
	t3 := time.NewTimer(time.Second)
	fmt.Printf("time.Now(): %v\n", time.Now())
	t3.Reset(time.Second * 2) //修改计时为2两秒
	<-t3.C
	fmt.Printf("time.Now(): %v\n", time.Now())

	fmt.Printf("main end...\n")
}

运行结果

time.Now(): 2022-10-05 15:04:46.380136682 +0800 CST m=+0.000102410
res: 2022-10-05 15:04:47.380331775 +0800 CST m=+1.000297497
time.Now(): 2022-10-05 15:04:47.380414771 +0800 CST m=+1.000380500
time.Now(): 2022-10-05 15:04:47.380453944 +0800 CST m=+1.000419671
执行到了这里...
time.Now(): 2022-10-05 15:04:50.381683725 +0800 CST m=+4.001649449
time.Now(): 2022-10-05 15:04:52.38280595 +0800 CST m=+6.002771675
main end...

并发编程之Ticker

Timer是到达时间后执行一次,而Ticker是周期性的执行,是一个定时器。

package main

import (
	"fmt"
	"time"
)

func main() {

	// 每隔一秒打印一次当前时间
	t1 := time.NewTicker(time.Second)
	n := 0
	for _ = range t1.C {
		fmt.Printf("time.Now(): %v\n", time.Now())
		n += 1
		if n > 5 {
			t1.Stop()
			break
		}
	}

}

运行结果

time.Now(): 2022-10-05 15:20:32.53228156 +0800 CST m=+8.000943370
time.Now(): 2022-10-05 15:20:33.532483936 +0800 CST m=+9.001145744
time.Now(): 2022-10-05 15:20:34.531494225 +0800 CST m=+10.000156042
time.Now(): 2022-10-05 15:20:35.532310152 +0800 CST m=+11.000971959
time.Now(): 2022-10-05 15:20:36.532446657 +0800 CST m=+12.001108468
time.Now(): 2022-10-05 15:20:37.531670839 +0800 CST m=+13.000332645
package main

import (
	"fmt"
	"time"
)

func main() {


	t2 := time.NewTicker(time.Second)
	chanInt := make(chan int)
	go func() {
		//每秒钟向chanInt随机写一个数据
		for _ = range t2.C {
			select {
			case chanInt <- 1:
			case chanInt <- 2:
			case chanInt <- 3:
			case chanInt <- 4:
			case chanInt <- 5:

			}
		}

	}()
	sum := 0
	for v := range chanInt {
		sum += v
		fmt.Printf("v: %v\n", v)
		if sum >= 10 {
			close(chanInt)
			break
		}
	}

	print("main end...\n")

}

运行结果

v: 1
v: 4
v: 1
v: 5
main end...

并发编程之原子操作

当多个协程操作同一个数据时,如果不加锁就可能会出现数据混乱的情况,除了使用mutex锁来解决这个问题外,golang还提供了原子操作这一概念来解决这个问题。

atomic提供的原子操作能够确保任一时刻只有一个goroutine对变量进行操作,善用atomic能够避程序中出现大量的锁操作。

atomic常见的操作有:

  • 增减:Add
  • 加载(读):Load
  • 存储(写):Store
  • 比较和交换:cas
  • 交换:swap

增减操作

// AddInt32 atomically adds delta to *addr and returns the new value.
func AddInt32(addr *int32, delta int32) (new int32)

// AddUint32 atomically adds delta to *addr and returns the new value.
// To subtract a signed positive constant value c from x, do AddUint32(&x, ^uint32(c-1)).
// In particular, to decrement x, do AddUint32(&x, ^uint32(0)).
func AddUint32(addr *uint32, delta uint32) (new uint32)

// AddInt64 atomically adds delta to *addr and returns the new value.
func AddInt64(addr *int64, delta int64) (new int64)

// AddUint64 atomically adds delta to *addr and returns the new value.
// To subtract a signed positive constant value c from x, do AddUint64(&x, ^uint64(c-1)).
// In particular, to decrement x, do AddUint64(&x, ^uint64(0)).
func AddUint64(addr *uint64, delta uint64) (new uint64)

// AddUintptr atomically adds delta to *addr and returns the new value.
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

加载操作

// LoadInt32 atomically loads *addr.
func LoadInt32(addr *int32) (val int32)

// LoadInt64 atomically loads *addr.
func LoadInt64(addr *int64) (val int64)

// LoadUint32 atomically loads *addr.
func LoadUint32(addr *uint32) (val uint32)

// LoadUint64 atomically loads *addr.
func LoadUint64(addr *uint64) (val uint64)

// LoadUintptr atomically loads *addr.
func LoadUintptr(addr *uintptr) (val uintptr)

// LoadPointer atomically loads *addr.
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

存储操作

// StoreInt32 atomically stores val into *addr.
func StoreInt32(addr *int32, val int32)

// StoreInt64 atomically stores val into *addr.
func StoreInt64(addr *int64, val int64)

// StoreUint32 atomically stores val into *addr.
func StoreUint32(addr *uint32, val uint32)

// StoreUint64 atomically stores val into *addr.
func StoreUint64(addr *uint64, val uint64)

// StoreUintptr atomically stores val into *addr.
func StoreUintptr(addr *uintptr, val uintptr)

// StorePointer atomically stores val into *addr.
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

比较和交换操作

// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value.
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

// CompareAndSwapInt64 executes the compare-and-swap operation for an int64 value.
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

// CompareAndSwapUint32 executes the compare-and-swap operation for a uint32 value.
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)

// CompareAndSwapUint64 executes the compare-and-swap operation for a uint64 value.
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)

// CompareAndSwapUintptr executes the compare-and-swap operation for a uintptr value.
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)

// CompareAndSwapPointer executes the compare-and-swap operation for a unsafe.Pointer value.
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

交换操作

// SwapInt32 atomically stores new into *addr and returns the previous *addr value.
func SwapInt32(addr *int32, new int32) (old int32)

// SwapInt64 atomically stores new into *addr and returns the previous *addr value.
func SwapInt64(addr *int64, new int64) (old int64)

// SwapUint32 atomically stores new into *addr and returns the previous *addr value.
func SwapUint32(addr *uint32, new uint32) (old uint32)

// SwapUint64 atomically stores new into *addr and returns the previous *addr value.
func SwapUint64(addr *uint64, new uint64) (old uint64)

// SwapUintptr atomically stores new into *addr and returns the previous *addr value.
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)

// SwapPointer atomically stores new into *addr and returns the previous *addr value.
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

var N int32 = 100
var wg sync.WaitGroup

func add() {
	atomic.AddInt32(&N, 1) //加一操作
	defer wg.Add(-1)
}

func sub() {
	atomic.AddInt32(&N, -1) //减一操作
	defer wg.Add(-1)
}
func main() {
	for i := 0; i < 100000; i++ {
		go add()
		wg.Add(1)
		go sub()
		wg.Add(1)
	}

	wg.Wait()
	fmt.Printf("N: %v\n", N)
}

运行结果

N: 100
package main

import (
	"fmt"
	"sync/atomic"
)

func main() {

	var i int32 = 100
	fmt.Printf("i: %v\n", i)
	atomic.AddInt32(&i, 100) //加100
	fmt.Printf("i: %v\n", i)
	atomic.AddInt32(&i, -50) //减50
	fmt.Printf("i: %v\n", i)

	fmt.Printf("atomic.LoadInt32(&i): %v\n", atomic.LoadInt32(&i)) //读取i的值
	atomic.StoreInt32(&i, 1000)                                    //修改i的值为1000
	fmt.Printf("i: %v\n", i)

	//cas 其他操作的根基
	success := atomic.CompareAndSwapInt32(&i, i, 666) //修改i为666
	if success {

		fmt.Printf("修改成功!i: %v\n", i)
	}

	//swap 直接交换 比较暴力
	b := atomic.CompareAndSwapInt32(&i, i, 888)
	if b {
		fmt.Printf("修改成功!i: %v\n", i)
	}

}

运行结果

i: 100
i: 200
i: 150
atomic.LoadInt32(&i): 150
i: 1000
修改成功!i: 666
修改成功!i: 888