我们上文中介绍了GO语言封装了所有的系统调用,因此凡是被阻塞的操作,都能被调度器感知,通过阅读goroutine的代码位置在GOPATH\src\runtime\proc.go中,其中retake函数的功能是实现抢占式调度,调度逻辑如下,

首先如果目前正在运行的goroutine被阻塞那么即可直接让度CPU,不需要抢占。

如果正在运行的goroutine距上次调度的时间超过了一定的阈值,那么就调用preemptone函数,强制使当前处于运行态的goroutine出让CPU并加入全局就绪态的队列中。然后schedule()调度就绪态队列中的另外一个 G进行运行态。

如果没有包含上两种情况的goroutine则不进行操作,具体代码如下:

gfunc retake(now int64) uint32 {
	n := 0
	// Prevent allp slice changes. This lock will be completely
	// uncontended unless we're already stopping the world.
	lock(&allpLock)
	// We can't use a range loop over allp because we may
	// temporarily drop the allpLock. Hence, we need to re-fetch
	// allp each time around the loop.
	for i := 0; i < len(allp); i++ {
		_p_ := allp[i]
		if _p_ == nil {
			// This can happen if procresize has grown
			// allp but not yet created new Ps.
			continue
		}
		pd := &_p_.sysmontick
		s := _p_.status
		sysretake := false
		if s == _Prunning || s == _Psyscall {
			// Preempt G if it's running for too long.
			t := int64(_p_.schedtick)
			if int64(pd.schedtick) != t {
				pd.schedtick = uint32(t)
				pd.schedwhen = now
			} else if pd.schedwhen+forcePreemptNS <= now {
				preemptone(_p_)
				// In case of syscall, preemptone() doesn't
				// work, because there is no M wired to P.
				sysretake = true
			}
		}
		if s == _Psyscall {
			// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
			t := int64(_p_.syscalltick)
			if !sysretake && int64(pd.syscalltick) != t {
				pd.syscalltick = uint32(t)
				pd.syscallwhen = now
				continue
			}
			// On the one hand we don't want to retake Ps if there is no other work to do,
			// but on the other hand we want to retake them eventually
			// because they can prevent the sysmon thread from deep sleep.
			if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
				continue
			}
			// Drop allpLock so we can take sched.lock.
			unlock(&allpLock)
			// Need to decrement number of idle locked M's
			// (pretending that one more is running) before the CAS.
			// Otherwise the M from which we retake can exit the syscall,
			// increment nmidle and report deadlock.
			incidlelocked(-1)
			if atomic.Cas(&_p_.status, s, _Pidle) {
				if trace.enabled {
					traceGoSysBlock(_p_)
					traceProcStop(_p_)
				}
				n++
				_p_.syscalltick++
				handoffp(_p_)
			}
			incidlelocked(1)
			lock(&allpLock)
		}
	}
	unlock(&allpLock)
	return uint32(n)
}


通过阅读代码我们可以知道goroutine的调度机制并不是基于优先级的,各个goroutine之间的优先级并没有区别,只要主goroutine被执行的时间够长,其实并无需主动调用sleep等函数让其陷入阻塞态也能使其让出CPU,因此很多书上所言说sleep等阻塞操作是必须的,其实并不准确。
比如下面我们在main函数中加入一个足够长的循环体,也能使子goroutine有被执行的机会。

package main

import (
	"fmt"
	//"time"
)

func running() {
	var times int
	for { // 构建一个无限循环
		times++
		fmt.Println("this is runing function's tick", times) //从笔者的机器上看runing函数中的ticks累加到1500左右
	}
}
func main() {

	go running() //通过普通函数调用goroutine
	go func() {  //通过匿名函数调用goroutine
		var times int 
		for {
			times++
			fmt.Println("this is anonymous function's tick ", times) //从笔者的机器上看runing函数中的ticks累加到1500左右
		}
	}()
	
	for i := 0; i < 1000; i++ {//从笔者的机器上此循环长度设置在1000左右即可使子goroutine有被执行的机会

		fmt.Println("this is main gorourtine's tick ", i) 
	}

}

那么讲到这里,我们再回到之前的任务,就是让GO语言并发的帮忙你清理策略,具体代码如下:

package main

import ( //
	"fmt" // fmt 包使用函数实现 I/O 格式化(类似于 C 的 printf 和 scanf 的函数), 格式化参数源自C,但更简单
	"io/ioutil"

	"sync"
	"time"
)

//var wg sync.WaitGroup

//var mapGuard sync.Mutex

func Product(path string, ch chan<- string) {
	//fmt.Println(path)
	ch <- path
	fs, _ := ioutil.ReadDir(path)

	for _, file := range fs {
		if file.IsDir() {
			//fmt.Println(path + file.Name())
			go Product(path+"/"+file.Name(), ch)
			//ch <- path + "/" + file.Name()

		}

	}

}

func Consumer(fileNameSizeMap sync.Map, exFileList []string, ch <-chan string) {

	for {

		path := <-ch

		fs, _ := ioutil.ReadDir(path)
		for _, file := range fs {
			//mapGuard.Lock()

			//fileNameSizeMap.Range(func(k, v interface{}) bool {

			//	fmt.Println("iterate:", k, v)
			//	return true
			//})
			if file.Size() > 1000000 {
				fileSize, _ := fileNameSizeMap.Load(file.Name())

				if fileSize == file.Size() {
					fmt.Println(path + "/" + file.Name())

					exFileList = append(exFileList, path+file.Name())
				} else {
					//fmt.Println(file.Name())
					fileNameSizeMap.Store(file.Name(), file.Size())
				}
			}
			//mapGuard.Unlock()
		}

	}
}

func main() {
	//方式一
	//fileNameSizeMap := make(map[string]int64, 10000)
	var fileNameSizeMap sync.Map
	exFileList := make([]string, 100, 1000)
	//wg.Add(1)
	ch := make(chan string, 5)

	go Product("D:/test", ch)
	go Consumer(fileNameSizeMap, exFileList, ch)
	//wg.Wait()
	time.Sleep(time.Second * 5) //在本例中使用休眠的方式来阻塞main函数创建的主goroutine

}

下节课呢,我们来介绍一下如何使用信号量的方式来进行进程的同步与流程控制。