go同步机制有:1、channel,着重并发问题中的数据流动,把流动的数据放到channel中,就能使用channel解决这个并发;2、Sync.Mutex,拥有Lock、Unlock两个方法,主要实现思想体现在Lock函数中;3、Sync.waitGroup;4、Sync.Once;5、Sync.context;6、Sync.pool;7、atomic包,针对变量进行操作。
本教程操作环境:windows7系统、GO 1.18版本、Dell G3电脑。
Golang的提供的同步机制有sync模块下的Mutex、WaitGroup以及语言自身提供的chan等。
1.channel
概述
Golang以如此明显的方式告诉我们:。
优点:channel的核心是数据流动,关注到并发问题中的数据流动,把流动的数据放到channel中,就能使用channel解决这个并发【相关推荐:Go视频教程、编程教学】
问题,而且使用channel是线程安全的并且不会有数据冲突,比锁好用多了
缺点:不太适应同步太复杂的场景,比如多协程的同步等待问题,而且存在死锁问题 ,channel死锁问题:死锁问题链接
分类
channel类型:无缓冲和缓冲类型
channel有两种形式的,一种是无缓冲的,一个线程向这个channel发送了消息后,会阻塞当前的这个线程,知道其他线程去接收这个channel的消息。无缓冲的形式如下:
intChan := make(chan int)
带缓冲的channel,是可以指定缓冲的消息数量,当消息数量小于指定值时,不会出现阻塞,超过之后才会阻塞,需要等待其他线程去接收channel处理,带缓冲的形式如下:
//3为缓冲数量
intChan := make(chan int, 3)
举例
type Person struct {
Name string
Age uint8
Address Addr
}
type Addr struct {
city string
district string
}
/*
测试channel传输复杂的Struct数据
*/
func testTranslateStruct() {
personChan := make(chan Person, 1)
person := Person{"xiaoming", 10, Addr{"shenzhen", "longgang"}}
personChan <- person
person.Address = Addr{"guangzhou", "huadu"}
fmt.Printf("src person : %+v \n", person)
newPerson := <-personChan
fmt.Printf("new person : %+v \n", newPerson)
}
在实际应用过程中,等待channel 结束信号的过程可能不是无期限的,一般会伴随一个timer,超时时间如下面所示:
/*
检查channel读写超时,并做超时的处理
*/
func testTimeout() {
g := make(chan int)
quit := make(chan bool)
go func() {
for {
select {
case v := <-g:
fmt.Println(v)
case <-time.After(time.Second * time.Duration(3)):
quit <- true
fmt.Println("超时,通知主线程退出")
return
}
}
}()
for i := 0; i < 3; i++ {
g <- i
}
<-quit
fmt.Println("收到退出通知,主线程退出")
}
2.Sync.Mutex
Mutex拥有Lock、Unlock两个方法,主要的实现思想都体现在Lock函数中。
Lock执行时,分三种情况:
-
无冲突 通过CAS操作把当前状态设置为加锁状态;
-
有冲突 开始自旋,并等待锁释放,如果其他Goroutine在这段时间内释放了该锁, 直接获得该锁;如果没有释放,进入3;
-
有冲突,且已经过了自旋阶段 通过调用semacquire函数来让当前Goroutine进入等待状态。
无冲突时是最简单的情况;有冲突时,首先进行自旋,是从效率方面考虑的, 因为大多数的Mutex保护的代码段都很短,经过短暂的自旋就可以获得;如果自旋等待无果,就只好通过信号量来让当前 Goroutine进入等待了。
3. Sync.waitGroup
Channel在某些同步场景下,使用略显复杂,不管是使用多个channel还是使用channel数组,如下:
func coordinateWithChan() {
sign := make(chan struct{}, 2)
num := int32(0)
fmt.Printf("The number: %d [with chan struct{}]\n", num)
max := int32(10)
go addNum(&num, 1, max, func() {
sign <- struct{}{}
})
go addNum(&num, 2, max, func() {
sign <- struct{}{}
})
<-sign
<-sign
}
所以Sync.waitGroup 就显得更为优雅,Sync.waitGroup 用来等待一组goroutines的结束,在主Goroutine里声明,并且设置要等待的goroutine的个数,每个goroutine执行完成之后调用 Done,最后在主Goroutines 里Wait即可。类似于JAVA中的CountDownLatch或者循环屏障,并且Sync.waitGroup可以被重复使用,提供了如下API:
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
但是Sync.waitGroup的使用需要遵循一些规则,避免抛出Panic:
a. 错误调用Done方法, 导致waitGroup内部计数值出现负数的情况
b. 错误的调用Add方法,在waitGroup内部计数值到达0的时候,Add方法被调用,导致应该被唤起的goroutine没有被唤起,就开始了新的一轮计数周期
所以在调用的时候,就要遵循一下原则:
先统一Add,再并发Done,最后Wait
4. Sync.Once
Sync.once实现方式是内部包含一个int32位的标志,用来判断方式是否被执行过,标志值更改的时机为方法执行完之后,当有多个goroutine进行调用的时候,使用double-check方式进行验证,首先在在没有同步方式的情况下,进行标志值的判定,为0则竞争获取mutex锁,进入临界区内,此时会在此进行标志值的判断,确保方法真的被执行一次。double-check第一次是为了更快的进行判断,但是存在错误的情况,第二次check是为了正确的确定标志值此时的状态。
使用:
func main() {
var once sync.Once
onceBody := func() {
time.Sleep(3e9)
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
j := i
go func(int) {
once.Do(onceBody)
fmt.Println(j)
done <- true
}(j)
}
//给一部分时间保证能够输出完整【方法一】
//for i := 0; i < 10; i++ {
// <-done
//}
//给一部分时间保证能够输出完整【方法二】
<-done
time.Sleep(3e9)
}
5. Sync.context
场景
当需要进行多批次的计算任务同步,或者需要一对多的协作流程的时候
使用举例
func coordinateWithContext() {
total := 12
var num int32
fmt.Printf("The number: %d [with context.Context]\n", num)
cxt, cancelFunc := context.WithCancel(context.Background())
for i := 1; i <= total; i++ {
go addNum(&num, i, func() {
if atomic.LoadInt32(&num) == int32(total) {
cancelFunc()
}
})
}
<-cxt.Done()
fmt.Println("End.")
}
注意事项
a.如何生成自己的context
通过WithCancel、WithDeadline、WithTimeout和WithValue四个方法从context.Background中派生出自己的子context
注意context.background这个上下文根节点仅仅是一个最基本的支点,它不提供任何额外的功能,也就是说,它既不可以被撤销(cancel),也不能携带任何数据,在使用是必须通过以上4种方法派生出自己的context
b.子context是会继承父context的值
c.撤销消息的传播
撤销消息会按照深度遍历的方式传播给子context(注意因为多routine调用的原因,最终的撤销顺序可能不会是深度遍历的顺序)
,在遍历的过程中,通过WithCancel、WithDeadline、WithTimeout派生的context会被撤销,但是通过WithValue方法派生的context不会被撤销
6. Sync.pool
7.atomic包,针对变量进行操作
我们调用sync/atomic中的几个函数可以对几种简单的类型进行原子操作。这些类型包括int32,int64,uint32,uint64,uintptr,unsafe.Pointer,共6个。这些函数的原子操作共有5种:增或减,比较并交换、载入、存储和交换它们提供了不同的功能,切使用的场景也有区别。
增或减
顾名思义,原子增或减即可实现对被操作值的增大或减少。因此该操作只能操作数值类型。
被用于进行增或减的原子操作都是以“Add”为前缀,并后面跟针对具体类型的名称。
//方法源码
func AddUint32(addr *uint32, delta uint32) (new uint32)
增
栗子:(在原来的基础上加n)
atomic.AddUint32(&addr,n)
减
栗子:(在原来的基础上加n(n为负数))
atomic.AddUint32(*addr,uint32(int32(n)))
//或
atomic.AddUint32(&addr,^uint32(-n-1))
比较并交换
比较并交换----Compare And Swap 简称CAS
他是假设被操作的值未曾被改变(即与旧值相等),并一旦确定这个假设的真实性就立即进行值替换
如果想安全的并发一些类型的值,我们总是应该优先使用CAS
//方法源码
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
栗子:(如果addr和old相同,就用new代替addr)
ok:=atomic.CompareAndSwapInt32(&addr,old,new)
载入
如果一个写操作未完成,有一个读操作就已经发生了,这样读操作使很糟糕的。
为了原子的读取某个值sync/atomic代码包同样为我们提供了一系列的函数。这些函数都以"Load"为前缀,意为载入。
//方法源码
func LoadInt32(addr *int32) (val int32)
栗子
fun addValue(delta int32){
for{
v:=atomic.LoadInt32(&addr)
if atomic.CompareAndSwapInt32(&v,addr,(delta+v)){
break;
}
}
}
存储
与读操作对应的是写入操作,sync/atomic也提供了与原子的值载入函数相对应的原子的值存储函数。这些函数的名称均以“Store”为前缀
在原子的存储某个值的过程中,任何cpu都不会进行针对进行同一个值的读或写操作。如果我们把所有针对此值的写操作都改为原子操作,那么就不会出现针对此值的读操作读操作因被并发的进行而读到修改了一半的情况。
原子操作总会成功,因为他不必关心被操作值的旧值是什么。
//方法源码
func StoreInt32(addr *int32, val int32)
栗子
atomic.StoreInt32(被操作值的指针,新值)
atomic.StoreInt32(&value,newaddr)
交换
原子交换操作,这类函数的名称都以“Swap”为前缀。
与CAS不同,交换操作直接赋予新值,不管旧值。
会返回旧值
//方法源码
func SwapInt32(addr *int32, new int32) (old int32)
栗子
atomic.SwapInt32(被操作值的指针,新值)(返回旧值)
oldval:=atomic.StoreInt32(&value,newaddr)
扩展知识:Sync包简述
1. 什么是Sync包?
Package sync provides basic synchronization primitives such as mutual exclusion locks. Other than the Once and WaitGroup types, most are intended for use by low-level library routines. Higher-level synchronization is better done via channels and communication.
Values containing the types defined in this package should not be copied.
这句话大意是说:
Sync包同步提供基本的同步原语,如互斥锁。 除了Once和WaitGroup类型之外,大多数类型都是供低级库例程使用的。 通过Channel和沟通可以更好地完成更高级别的同步。并且此包中的值在使用过后不要拷贝。
从描述中可以看到的是,golang 并不推荐这个包中的大多数并发控制方法,但还是提供了相关方法,主要原因是golang中提倡以共享内存的方式来通信:
不要以共享内存的方式来通信,作为替代,我们应该以通信的手段来共享内存
共享内存的方式使得多线程中的通信变得简单,但是在并发的安全性控制上将变得异常繁琐。
正确性不是我们唯一想要的,我们想要的还有系统的可伸缩性,以及可理解性,我觉得这点非常重要,比如现在广泛使用的Raft算法。
2. 包中的Type
包中主要有: Locker, Cond, Map, Mutex, Once, Pool,
RWMutex, WaitGroup
type Locker interface {
Lock()
Unlock()
}
type Cond struct {
// L is held while observing or changing the condition
L Locker
}
3. 什么是锁,为什么需要锁?
锁是sync包中的核心,他主要有两个方法,加锁和解锁。
在单线程运行的时候程序是顺序执行的,程序对数据的访问也是:
读取 => 一顿操作(加减乘除之类的) => 写回原地址
但是一旦程序中进行了并发编程,也就是说,某一个函数可能同时被不同的线程执行的时候,以时间为维度会发生以下情况:
可以看到的是,A地址的数字被执行了两次自增,若A=5,我们在执行完成后预期的A值是7,但是在这种情况下我们得到的A却是6,bug了~
还有很多类似的并发错误,所以才有锁的引入。若是我们在线程2读取A的值的时候对A进行加锁,让线程2等待,线程1执行完成之后在执行线程2,这样就能够保证数据的正确性。但是正确性不是我们唯一想要的。
4 写更优雅的代码
在很多语言中我们经常为了保证数据安全正确,会在并发的时候对数据加锁
Lock()
doSomething()
Unlock()
Golang在此包中也提供了相关的锁,但是标明了"most are intended for use by low-level library routines" 所以我这里只对 Once and WaitGroup types做简述。
5.Once 对象
Once 是一个可以被多次调用但是只执行一次,若每次调用Do时传入参数f不同,但是只有第一个才会被执行。
func (o *Once) Do(f func())
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
如果你执行这段代码会发现,虽然调用了10次,但是只执行了1次。BTW:这个东西可以用来写单例。
6. WaitGroup
下面是个官方的例子:
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// Increment the WaitGroup counter.
wg.Add(1)
// Launch a goroutine to fetch the URL.
go func(url string) {
// Decrement the counter when the goroutine completes.
defer wg.Done()
// Fetch the URL.
http.Get(url)
}(url)
}
// Wait for all HTTP fetches to complete.
wg.Wait()
7. 简述
Golang中高级的并发可以通过channel来实现,这是golang所倡导的,但是go也提供了锁等先关操作。
更多编程相关知识,请访问:编程视频!!
本文转载自【PHP中文网】,希望能给您带来帮助,苟日新、日日新、又日新,生命不息,学习不止。