- 创建调度程序,接收任务并将任务存储起来
- 执行调度任务,通过一定的调度算法将任务调度到合适的 worker 中执行
- 创建指定数量的 worker,完成实际任务的处理
- 创建数据处理协程,对爬取到的数据进行进一步处理
scheduler/scheduler.go
package engine
import (
"github.com/funbinary/go_example/example/crawler/13/collect"
"go.uber.org/zap"
)
type ScheduleEngine struct {
requestCh chan *collect.Request //负责接收请求
workerCh chan *collect.Request //负责分配任务给 worker
WorkCount int //为执行任务的数量,可以灵活地去配置。
Fetcher collect.Fetcher
Logger *zap.Logger
out chan collect.ParseResult 负责处理爬取后的数据,完成下一步的存储操作。schedule 函数会创建调度程序,负责的是调度的核心逻辑。
Seeds []*collect.Request
}
func (s *ScheduleEngine) Run() {
s.requestCh = make(chan *collect.Request)
s.workerCh = make(chan *collect.Request)
s.out = make(chan collect.ParseResult)
go s.Schedule()
// 创建指定数量的 worker,完成实际任务的处理
// 其中
for i := 0; i < s.WorkCount; i++ {
go s.CreateWork()
}
s.HandleResult()
}
func (s *ScheduleEngine) Schedule() {
// workerCh
var reqQueue = s.Seeds
go func() {
for {
var req *collect.Request
var ch chan *collect.Request
//如果任务队列 reqQueue 大于 0,意味着有爬虫任务,这时我们获取队列中第一个任务,并将其剔除出队列。
if len(reqQueue) > 0 {
req = reqQueue[0]
reqQueue = reqQueue[1:]
ch = s.workerCh
}
select {
case r := <-s.requestCh:
// 接收来自外界的请求,并将请求存储到 reqQueue 队列中
reqQueue = append(reqQueue, r)
case ch <- req:
// ch <- req 会将任务发送到 workerCh 通道中,等待 worker 接收。
}
}
}()
}
func (s *ScheduleEngine) CreateWork() {
for {
// 接收到调度器分配的任务;
r := <-s.workerCh
// 访问服务器
body, err := s.Fetcher.Get(r)
if err != nil {
s.Logger.Error("can't fetch ",
zap.Error(err),
)
continue
}
//解析服务器返回的数据
result := r.ParseFunc(body, r)
// 将返回的数据发送到 out 通道中,方便后续的处理。
s.out <- result
}
}
func (s *ScheduleEngine) HandleResult() {
for {
select {
// 接收所有 worker 解析后的数据
case result := <-s.out:
// 要进一步爬取的 Requests 列表将全部发送回 s.requestCh 通道
for _, req := range result.Requesrts {
s.requestCh <- req
}
//包含了我们实际希望得到的结果,所以我们先用日志把结果打印出来
for _, item := range result.Items {
// todo: store
s.Logger.Sugar().Info("get result", item)
}
}
}
}
main.go
package main
import (
"fmt"
"github.com/funbinary/go_example/example/crawler/13/collect"
"github.com/funbinary/go_example/example/crawler/13/engine"
"github.com/funbinary/go_example/example/crawler/13/log"
"github.com/funbinary/go_example/example/crawler/13/parse/doubangroup"
"github.com/funbinary/go_example/example/crawler/13/proxy"
"go.uber.org/zap/zapcore"
"time"
)
// xpath
func main() {
plugin := log.NewStdoutPlugin(zapcore.InfoLevel)
logger := log.NewLogger(plugin)
logger.Info("log init end")
proxyURLs := []string{"http://127.0.0.1:10809", "http://127.0.0.1:10809"}
p, err := proxy.RoundRobinProxySwitcher(proxyURLs...)
if err != nil {
logger.Error("RoundRobinProxySwitcher failed")
}
// douban cookie
var seeds []*collect.Request
for i := 0; i <= 0; i += 25 {
str := fmt.Sprintf("https://www.douban.com/group/szsh/discussion?start=%d", i)
seeds = append(seeds, &collect.Request{
Url: str,
WaitTime: 1 * time.Second,
Cookie: "bid=-UXUw--yL5g; dbcl2=\"214281202:q0BBm9YC2Yg\"; __yadk_uid=jigAbrEOKiwgbAaLUt0G3yPsvehXcvrs; push_noty_num=0; push_doumail_num=0; __utmz=30149280.1665849857.1.1.utmcsr=accounts.douban.com|utmccn=(referral)|utmcmd=referral|utmcct=/; __utmv=30149280.21428; ck=SAvm; _pk_ref.100001.8cb4=%5B%22%22%2C%22%22%2C1665925405%2C%22https%3A%2F%2Faccounts.douban.com%2F%22%5D; _pk_ses.100001.8cb4=*; __utma=30149280.2072705865.1665849857.1665849857.1665925407.2; __utmc=30149280; __utmt=1; __utmb=30149280.23.5.1665925419338; _pk_id.100001.8cb4=fc1581490bf2b70c.1665849856.2.1665925421.1665849856.",
ParseFunc: doubangroup.ParseURL,
})
}
var f collect.Fetcher = &collect.BrowserFetch{
Timeout: 3000 * time.Millisecond,
Logger: logger,
Proxy: p,
}
s := engine.ScheduleEngine{
WorkCount: 5,
Logger: logger,
Fetcher: f,
Seeds: seeds,
}
s.Run()
}
通道
特性
package main
import (
"fmt"
)
func main() {
var ch chan *int
go func() {
<-ch
}()
select {
case ch <- nil:
fmt.Println("it's time")
}
}
//fatal error: all goroutines are asleep - deadlock!
函数选项模式
// 基本调度器
func NewBaseSchedule() *Schedule {
return &Schedule{
WorkCount: 1,
Fetcher:baseFetch,
}
}
// 多worker调度器
func NewMultiWorkSchedule(workCount int) *Schedule {
return &Schedule{
WorkCount: workCount,
Fetcher:baseFetch,
}
}
// 代理调度器
func NewProxySchedule(proxy string) *Schedule {
return &Schedule{
WorkCount: 1,
Fetcher:proxyFetch(proxy),
}
}
type Config struct {
WorkCount int
Fetcher collect.Fetcher
Logger *zap.Logger
Seeds []*collect.Request
}
func NewSchedule(c *Config) *Schedule {
var s = &Schedule{}
if c.Seeds != nil {
s.Seeds = c.Seeds
}
if c.Fetcher != nil {
s.Fetcher = c.Fetcher
}
if c.Logger != nil {
s.Logger = c.Logger
}
...
return s
}
- 我们要对 schedule 结构进行改造,把可以配置的参数放入到options 结构中:
type Schedule struct {
requestCh chan *collect.Request
workerCh chan *collect.Request
out chan collect.ParseResult
options
}
type options struct {
WorkCount int
Fetcher collect.Fetcher
Logger *zap.Logger
Seeds []*collect.Request
}
- 我们需要书写一系列的闭包函数,这些函数的返回值是一个参数为 options 的函数:
type Option func(opts *options)
func WithLogger(logger *zap.Logger) Option {
return func(opts *options) {
opts.Logger = logger
}
}
func WithFetcher(fetcher collect.Fetcher) Option {
return func(opts *options) {
opts.Fetcher = fetcher
}
}
func WithWorkCount(workCount int) Option {
return func(opts *options) {
opts.WorkCount = workCount
}
}
func WithSeeds(seed []*collect.Request) Option {
return func(opts *options) {
opts.Seeds = seed
}
}
- 创建一个生成 schedule 的新函数,函数参数为 Option 的可变参数列表。defaultOptions 为默认的 Option,代表默认的参数列表,然后循环遍历可变函数参数列表并执行。
func NewSchedule(opts ...Option) *Schedule {
options := defaultOptions
for _, opt := range opts {
opt(&options)
}
s := &Schedule{}
s.options = options
return s
}
- 在 main 函数中调用 NewSchedule。让我们来看看函数式选项模式的效果:
func main(){
s := engine.NewSchedule(
engine.WithFetcher(f),
engine.WithLogger(logger),
engine.WithWorkCount(5),
engine.WithSeeds(seeds),
)
s.Run()
}
函数式选项模式的好处
- API 具有可扩展性,高度可配置化,新增参数不会破坏现有代码;
- 参数列表非常简洁,并且可以使用默认的参数;
- option 函数使参数的含义非常清晰,易于开发者理解和使用;
- 如果将 options 结构中的参数设置为小写,还可以限制这些参数的权限,防止这些参数在 package 外部使用。
通道的实现并没有想象中复杂。它利用互斥锁实现了并发安全,只不过 Go 运行时为我们屏蔽了底层的细节。通道包括两种类型,一种是无缓冲的通道,另一种是带缓冲区的通道。通道的结构如下:
可以看到,通道中包含了数据的类型、大小、数量,堵塞协程队列,以及用于缓存区的诸多字段。
无缓冲区的通道
通道需要有多个协程分别完成读和写的功能,这样才能保证数据传输是顺畅的。对于无缓冲区的通道来说,如果有一个协程正在将数据写入通道,但是当前没有协程读取数据,那么写入协程将立即陷入到休眠状态。写入协程堵塞之前协程会被封装到 sudog 结构中,并存储到写入的堵塞队列 sendq 中,之后协程陷入休眠。
之前我们介绍过,协程的堵塞是位于用户态的,协程切换时,运行时会保存当前协程的状态、并调用 gopark 函数切换到 g0 完成新一轮的调度。如果之后有协程读取数据,那么读取协程会立即读取 sendq 队列中第一个等待的协程,并将该协程对应的元素拷贝到读取协程中,同时调用 goready 唤醒写入协程,将写入协程放入到可运行队列中等待被调度器调度。
带缓冲区的通道
而对于带缓冲区的通道来说,假设缓存队列的数量为 N,那么如果写入的数据量不大于 N,写入协程就不会陷入到休眠状态,所有数据都会存储在缓冲队列中。
缓冲队列可以在一定程度上削峰填谷,加快处理速度。但是如果写入速度始终大于读取数据,那么缓冲区迟早也有写满的时候,到时候仍然会陷入堵塞,只是延迟了问题的暴露并带来内存的浪费。因此缓冲区的容量不可以过大,我们可以根据实际情况给出一个经验值。例如上面的爬虫案例中,我们就可以给接收任务的 requestCh 通道加上缓存区,先将缓存区设置为 500,这样就不会频繁堵塞住调度器了。
对于有缓存的通道,存储在 buf 中的数据虽然是线性的数组,但是这些数组和序号 recvx、recvq 模拟了一个环形队列。recvx 可以定位到 buf 是从哪个位置读取的通道中的元素,而 sendx 则能够找到写入时放入 buf 的位置,这样做主要是为了再次利用已经使用过的空间。从 recvx 到 sendx 的距离 qcount 就是通道队列中的元素数量。
Select 机制的底层原理
原由: 受到通道特性的限制,如果单个通道被堵塞,协程就无法继续执行了。
那有没有一种机制可以像网络中的多路复用一样,监听多个通道,使后续处理协程能够及时地运行?
其实就和网络中把 select 用于 Socket 的多路复用机制一样,Go 中也可以用 select 语句实现这样的多路复用机制。
select 语句中的每一个 case 都对应着一个待处理的读取或写入通道。
举个简单实用的例子,下面的程序如果 800 毫秒之后也接受不到通道 c 中的数据,定时器 time.After 就会接收到数据,从而打印 timeout。
select {
case <-c:
fmt.Println("random 01")
case <-time.After(800 * time.Millisecond):
fmt.Println("timeout")
}
Select 底层调用了 selectgo 函数,它的工作可以分为三个部分:
-
第一部分涉及到遍历。
- selectgo 首先循环查找当前准备就绪的通道,如果能够找到,则正常进行后续处理。
在具体的实现方式上,由于 select 内部的 scases 数组存储了所有需要管理的通道,所以很容易想到循环遍历 scases ,最终找到可用的通道。 - 不过这可能导致一个问题,那就是如果前面的通道始终有数据,后面的通道将永远得不到执行的机会。
为了解决这一问题,Go 语言为 select 加入了随机性,会利用洗牌算法随机打散数组顺序,保证了所有通道都有执行的机会。
- selectgo 首先循环查找当前准备就绪的通道,如果能够找到,则正常进行后续处理。
-
第二部分涉及到协程的休眠。如果 select 找不到准备就绪的通道,这时和单个协程的堵塞一样,它会将当前协程放入到所有通道的等待队列中,并陷入到休眠状态。
-
第三部分涉及到协程的唤醒。如果有任意一个通道准备就绪,当前的协程将会被唤醒,并到准备就绪的 case 处继续执行。
要注意的一点是,最后 selectgo 会将 sudog 结构体从其他通道的等待队列中移出,因为当前协程已经能够正常运行,不再需要被其他通道唤醒了。
在我们的课程中,schedule 函数其实有一个 bug,您能看出来吗?你觉得可以用什么方式找出这样的 Bug?
会丢失发给worker的任务。 case r := <-s.requestCh:的情况下,如果req不是nil,应该把req再添加到reqQueue头部
「此文章为4月Day4学习笔记,内容来源于极客时间《Go分布式爬虫实战》,强烈推荐该课程!/推荐该课程」