├── README.md
├── citys.json 280个url
├── go.mod
├── go.sum
├── local.json 数据库配置
├── main.go
├── parser
│ └── parser.go 处理逻辑的函数
└── scheduler
├── scheduler.go 调度器
└── types.go
func NewScheduler(config *configor.Config) *Scheduler{
key := "local_spider"
from_confs,_ := config.Get("from")
fm := from_confs.(map[string]interface{})
mysql_config := fm["mysql"].(map[string]interface{})[key]
mc := backends.NewMysqlConfig(mysql_config)
store := backends.NewMysqlClient(mc)
requestQueue := make(chan Request,0)
piplineQueue := make(chan Item,0)
s := &Scheduler{
Config:config,
RequestQueue:requestQueue,
PiplineQueue:piplineQueue,
Store:store,
}
return s
}
func (s *Scheduler)Start(r *Request){
content,_ := s.Crawl(r.Url)
if content == nil{
return
}
r.ParseFunc(content,s,r)
}
func (s *Scheduler)BindRequest(){
//监听请求队列 得到新请求进来则开始请求网址解析入库
for {
request := <- s.RequestQueue
go s.Start(&request)
}
}
//入库
func (s *Scheduler)ConsumerPipline(){
for {
item := <- s.PiplineQueue
fmt.Println(item)
s.SaveItem(&item)
}
}
//启动函数
func (s *Scheduler) Run(start_urls ...Request){
defer s.Store.Close()
//监听请求队列
go s.BindRequest()
//将初始请求扔到队列
for _,request := range start_urls{
s.RequestQueue <- request
}
//读取pipline通道 入库
s.ConsumerPipline()
}
func main(){
//解析json配置文件 读取/data/JsonConfigFiles/local.json
config_path := "/data/JsonConfigFiles"
env := "local"
config := configor.NewConfig(config_path,env)
file := "citys.json"
//加载json文件 生成每个城市需要抓取的url
data,_ := ParseJsonFile(file)
urls := data["citys"].([]interface{})
start_urls := make([]scheduler.Request,0)
for _,url := range urls{
_url := url.(string)
for i :=1;i<26;i++{
city_url := _url + fmt.Sprintf("p%s/",strconv.Itoa(i))
start_urls = append(start_urls,scheduler.Request{
Url: city_url,
ParseFunc: parser.ParseCity,
})
}
}
s := scheduler.NewScheduler(config)
s.Run(start_urls...)
}