前言
在上一篇文章《Golang实现简单爬虫框架(4)——队列实现并发任务调度》中,我们使用用队列实现了任务调度,接下来首先对两种并发方式做一个同构,使代码统一。然后添加数据存储模块。
注意:本次并发是在上一篇文章简单并发实现的基础上修改,所以没有贴出全部代码,只是贴出部分修改部分,要查看完整项目代码,可以查看上篇文章,或者从github下载项目源代码查看
1、项目重构
(1)并发引擎
workerchannelworkerchannelWorkerChan()workerchannelworkerchannelConfigMasterWorkerChan
在项目文件concurrent.go中我们定义一个任务调度器Scheduler,如下:
// 任务调度器
type Scheduler interface {
Submit(request Request) // 提交任务
ConfigMasterWorkerChan(chan Request)
WorkerReady(w chan Request)
Run()
}
SubmitConfigMasterWorkerChanconcurrent.go
createworkerWorkerReadySchedulerReadyNotifiercreateworkerReadyNotifier
修改后的任务调度如下:
type Scheduler interface {
ReadyNotifier
Submit(request Request) // 提交任务
WorkerChan() chan Request
Run()
}
type ReadyNotifier interface {
WorkerReady(chan Request)
}
此时创建goroutine修改如下:
// 创建 goroutine
for i := 0; i < e.WorkerCount; i++ {
//任务是每个 worker 一个 channel 还是 所有 worker 共用一个 channel 由WorkerChan 来决定
createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler)
}
修改后的concurrent.go文件如下:
package engine
import (
"log"
)
// 并发引擎
type ConcurrendEngine struct {
Scheduler Scheduler
WorkerCount int
}
// 任务调度器
type Scheduler interface {
ReadyNotifier
Submit(request Request) // 提交任务
WorkerChan() chan Request
Run()
}
type ReadyNotifier interface {
WorkerReady(chan Request)
}
func (e *ConcurrendEngine) Run(seeds ...Request) {
out := make(chan ParseResult)
e.Scheduler.Run()
// 创建 goruntine
for i := 0; i < e.WorkerCount; i++ {
// 任务是每个 worker 一个 channel 还是 所有 worker 共用一个 channel 由WorkerChan 来决定
createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler)
}
// engine把请求任务提交给 Scheduler
for _, request := range seeds {
e.Scheduler.Submit(request)
}
itemCount := 0
for {
// 接受 Worker 的解析结果
result := <-out
for _, item := range result.Items {
log.Printf("Got item: #%d: %v\n", itemCount, item)
itemCount++
}
// 然后把 Worker 解析出的 Request 送给 Scheduler
for _, request := range result.Requests {
e.Scheduler.Submit(request)
}
}
}
func createWorker(in chan Request, out chan ParseResult, ready ReadyNotifier) {
go func() {
for {
ready.WorkerReady(in) // 告诉调度器任务空闲
request := <-in
result, err := worker(request)
if err != nil {
continue
}
out <- result
}
}()
}
(2)简单并发调度器
scheduler/simple.go
package scheduler
import "crawler/engine"
type SimpleScheduler struct {
workerChan chan engine.Request
}
func (s *SimpleScheduler) WorkerChan() chan engine.Request {
// 此时所有 worker 共用同一个 channel,直接返回即可
return s.workerChan
}
func (s *SimpleScheduler) WorkerReady(w chan engine.Request) {
}
func (s *SimpleScheduler) Run() {
// 创建出 workchannel
s.workerChan = make(chan engine.Request)
}
func (s *SimpleScheduler) Submit(request engine.Request) {
// send request down to worker chan
go func() {
s.workerChan <- request
}()
}
(3)队列实现调度器
scheduler/queued.go
WorkerChan()
package scheduler
import "crawler/engine"
// 使用队列来调度任务
type QueuedScheduler struct {
requestChan chan engine.Request
workerChan chan chan engine.Request
}
func (s *QueuedScheduler) WorkerChan() chan engine.Request {
// 对于队列实现来讲,每个 worker 共用一个 channel
return make(chan engine.Request)
}
// 提交请求任务到 requestChan
func (s *QueuedScheduler) Submit(request engine.Request) {
s.requestChan <- request
}
// 告诉外界有一个 worker 可以接收 request
func (s *QueuedScheduler) WorkerReady(w chan engine.Request) {
s.workerChan <- w
}
func (s *QueuedScheduler) Run() {
s.workerChan = make(chan chan engine.Request)
s.requestChan = make(chan engine.Request)
go func() {
// 创建请求队列和工作队列
var requestQ []engine.Request
var workerQ []chan engine.Request
for {
var activeWorker chan engine.Request
var activeRequest engine.Request
if len(requestQ) > 0 && len(workerQ) > 0 {
activeWorker = workerQ[0]
activeRequest = requestQ[0]
}
select {
case r := <-s.requestChan: // 当 requestChan 收到数据
requestQ = append(requestQ, r)
case w := <-s.workerChan: // 当 workerChan 收到数据
workerQ = append(workerQ, w)
case activeWorker <- activeRequest: // 当请求队列和认读队列都不为空时,给任务队列分配任务
requestQ = requestQ[1:]
workerQ = workerQ[1:]
}
}
}()
}
(4)main函数
经过上述同构,在main函数中如需切换不同调度器,只需要相应的配置即可。
package main
import (
"crawler/engine"
"crawler/scheduler"
"crawler/zhenai/parser"
)
func main() {
e := engine.ConcurrendEngine{
//Scheduler: &scheduler.QueuedScheduler{}, // 队列实现调度器
Scheduler: &scheduler.SimpleScheduler{}, // 简单并发调度
WorkerCount: 50,
}
e.Run(engine.Request{
Url: "http://www.zhenai.com/zhenghun",
ParseFunc: parser.ParseCityList,
})
}
2、数据存储
(1)Mgo的介绍安装
爬取到的数据不能仅仅在控制台打印出来,所以我们还要给爬虫添加数据存储模块。我们本次选择使用mongodb来存储我们的数据。
首先我们要安装mgo,打开终端,输入下面代码完成安装
go get gopkg.in/mgo.v2
mgo基本操作都很简单,有数据库操作经验都可以很快上手。
(2)爬虫引擎与数据格式
channelconcurrent.goItemChan
爬取到数据需要把数据发送到数据存储模块,
package engine
// 并发引擎
type ConcurrendEngine struct {
Scheduler Scheduler // 任务调度器
WorkerCount int // 并发任务数量
ItemChan chan Item // 数据保存 channel
}
// ...
for {
// 接受 Worker 的解析结果
result := <-out
for _, item := range result.Items {
// 当抓取一组数据后,进行保存
go func(item2 Item) {
e.ItemChan <- item2
}(item)
}
// ...
}
// ...
engine/types.go
package engine
// 请求结构
type Request struct {
Url string // 请求地址
ParseFunc func([]byte) ParseResult
}
// 解析结果结构
type ParseResult struct {
Requests []Request // 解析出的请求
Items []Item // 解析出的内容
}
// 解析出的用户数据格式
type Item struct {
Url string // 个人信息Url地址
Type string // table
Id string // Id
Payload interface{} // 详细信息
}
func NilParseFun([]byte) ParseResult {
return ParseResult{}
}
(3)存储模块的实现
在根目录下创建persist文件夹,然后创建itemsaver.go文件
// persist/itemsaver.go
package persist
import (
"context"
"crawler/engine"
"errors"
"gopkg.in/mgo.v2"
"gopkg.in/olivere/elastic.v5"
"log"
)
func ItemSaver(index string) (chan engine.Item, error) {
// mongodb connect
session, err := mgo.Dial("localhost:27017")
if err != nil {
panic(err)
}
out := make(chan engine.Item)
go func() {
itemCount := 0
for {
// 接收到发送的 item
item := <-out
log.Printf("Item Saver: got item #%d: %v\n",
itemCount, item)
itemCount++
// Save data in mongodb
err := mongo_save(session, index, item)
if err != nil {
// if have err, ignore it
log.Printf("Item Saver: error, saving item %v: %v",
item, err)
}
}
}()
return out, nil
}
// 使用 MongoDB 保存数据
func mongo_save(session *mgo.Session, dbName string, item engine.Item) error {
if item.Type == "" {
return errors.New("must supply Type")
}
c := session.DB(dbName).C(item.Type) // 选择要操作的数据库与集合
err := c.Insert(item) // 插入数据
if err != nil {
log.Fatal(err)
}
return nil
}
(4)存储测试文件
我们把一条数据存入mongodb,然后再取出来,比对读出的数据和写入的数据是否相同
// persist/itemsaver_test.gp
package persist
import (
"crawler/engine"
"crawler/model"
"encoding/json"
"fmt"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"log"
"testing"
)
func TestMongoSave(t *testing.T) {
// mongodb connect
session, err := mgo.Dial("localhost:27017")
if err != nil {
panic(err)
}
expected := engine.Item{
Url: "http://album.zhenai.com/u/1946858930",
Type: "zhenai",
Id: "1946858930",
Payload: model.Profile{
Name: "為你垨候",
Gender: "女士",
Age: 40,
Height: 163,
Weight: 54,
Income: "5-8千",
Marriage: "未婚",
Address: "佛山顺德区",
},
}
// 保存数据
err = mongo_save(session, "crawler", expected)
if err != nil {
panic(err)
}
c := session.DB("crawler").C("zhenai")
var result engine.Item
// 查询数据
err = c.Find(bson.M{"id": "1946858930"}).One(&result)
// result 为 Json 类型
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s, %s, %v\n", result.Url, result.Id, result.Payload)
}
(5)parser模块
parse/profile.go
// ...
result := engine.ParseResult{
Items: []engine.Item{
{
Url: url,
Type: "zhenai",
Id: extractString([]byte(url), idUrlRe),
Payload: profile,
},
},
}
// ...
(6)main函数
package main
import (
"crawler/engine"
"crawler/persist"
"crawler/scheduler"
"crawler/zhenai/parser"
)
func main() {
itemChan, err := persist.ItemSaver()
if err != nil {
panic(err)
}
e := engine.ConcurrendEngine{
//Scheduler: &scheduler.QueuedScheduler{},
Scheduler: &scheduler.SimpleScheduler{},
WorkerCount: 100,
ItemChan: itemChan,
}
e.Run(engine.Request{
Url: "http://www.zhenai.com/zhenghun",
ParseFunc: parser.ParseCityList,
})
}
运行项目,打开mongodb可视化工具,可以看到爬取了54410条数据
3、总结
我们首先把两种并发方式做一个同构,使代码统一,直接在main函数中使用不同的配置就可以切换调度器,简单方便。然后使用Mgo驱动操作数据,添加到mongodb中。内容有点多,很多代码没有完整的展示出来,希望大家可以下载项目源代码,回滚到对应提交记录查看,效果会更好。 别无所求,只求随手给个star
下篇博客中我们会再当前博客的基础上添加数据展示功能
如果想获取Google工程师深度讲解go语言视频资源的,可以在评论区留下邮箱。
如果觉得文章还可以,劳烦大人随手点个赞。。。