Golang 高效实践之并发实践
我是码客 2022-11-05
unbuffered := make(chan int)

a := <- unbuffered // 阻塞

unbuffered  := make(chan int) 

// 1) 阻塞

a := <- unbuffered

// 2) 阻塞

unbuffered <- 

// 3) 同步

go func() { <-unbuffered }()

unbuffered <- 
buffered := make(chan int, )

// 4) 阻塞

a := <- buffered

// 5) 不阻塞

buffered <-

// 6) buffer满,阻塞

buffered <-
c := make(chan int)

close(c)

fmt.Println(<-c) //接收并输出chan类型的零值,这里int是0 
func TryReceive(c <-chan int) (data int, more, ok bool) {

  select {

  case data, more = <- c:

    return data, more, true

  }

  default:

    return , true, false

}
func TryReceiveWithTimeout(c <-chan int, duration time.Duration) (data int, more, ok bool) {

  select {

  case data, more = <-c:

    return data, more, true

  case <- time.After(duration):

    return , true, false
  }
}
package main

import (
"fmt"
"runtime"
"time"
) var i int64 = func main() {
runtime.GOMAXPROCS()
go func() {
for {
fmt.Println("i is", i)
time.Sleep(time.Second)
}
}() for {
i +=
}
}
==================

WARNING: DATA RACE

Read at 0x00000121e848 by goroutine :

  main.main.func1()

      /Users/saas/src/awesomeProject/datarace/main.go: +0x3e

Previous write at 0x00000121e848 by main goroutine:

  main.main()

      /Users/saas/src/awesomeProject/datarace/main.go: +0x7b

Goroutine  (running) created at:

  main.main()

      /Users/saas/src/awesomeProject/datarace/main.go: +0x4f

==================
package main

import (
"fmt"
"runtime"
"sync"
"time"
) var i int64 = func main() {
runtime.GOMAXPROCS()
var m sync.Mutex
go func() {
for {
m.Lock()
fmt.Println("i is", i)
m.Unlock()
time.Sleep(time.Second)
}
}() for {
m.Lock()
i +=
m.Unlock()
}
}
package main

import (
"fmt"
"runtime"
"time"
) var i int64 = func main() {
runtime.GOMAXPROCS()
c := make(chan int64)
go func() {
for {
fmt.Println("i is", <-c)
time.Sleep(time.Second)
}
}() for {
i +=
c<-i
}
}
type Spinlock struct {

  state *int32

}

const free = int32()

func (l *Spinlock) Lock() {

  for !atomic.CompareAndSwapInt32(l.state, free, ) { //如果state等于0就赋值为42

    runtime.Gosched() //让出CPU

  }

}

func (l *Spinlock) Unlock(){

  atomic.StoreInt32(l.state, free)  // 所有操作state变量的操作都应该是原子的

}
func restore(repos []string) error {
errChan := make(chan error, )
sem := make(chan int, ) // four jobs at once
var wg sync.WaitGroup
wg.Add(len(repos))
for _, repo := range repos {
sem <-
go func() {
defer func() {
wg.Done()
<- sem
}()
if err := fetch(repo); err != nil {
errChan <- err
}
}()
}
wg.Wait()
close(sem)
close(errChan)
return <- errChan
}
func restore(repos []string) error {
errChan := make(chan error, )
sem := make(chan int, ) // four jobs at once
var wg sync.WaitGroup
wg.Add(len(repos))
for _, repo := range repos {
go worker(repo, sem, &wg, errChan)
}
wg.Wait()
close(errChan)
return <- errChan
} Func worker(repo string, sem chan int, wg *sync.WaitGroup, errChan chan err) {
defer wg.Done()
sem <-
if err := fetch(repo); err != nil {
select {
case errChan <- err:
// we are the first worker to fail
default:
// some other failure has already happened, drop this one
}
}
<- sem
}