├── README.md
├── citys.json 280个url
├── go.mod
├── go.sum
├── local.json 数据库配置
├── main.go
├── parser
│   └── parser.go 处理逻辑的函数
└── scheduler
├── scheduler.go 调度器
└── types.go

func NewScheduler(config *configor.Config) *Scheduler{
 key := "local_spider"
 from_confs,_ := config.Get("from")
 fm := from_confs.(map[string]interface{})
 mysql_config := fm["mysql"].(map[string]interface{})[key]
 mc := backends.NewMysqlConfig(mysql_config)
 store := backends.NewMysqlClient(mc)
 requestQueue := make(chan Request,0)
 piplineQueue := make(chan Item,0)
 s := &Scheduler{
  Config:config,
  RequestQueue:requestQueue,
  PiplineQueue:piplineQueue,
  Store:store,
 }
 return s

}

func (s *Scheduler)Start(r *Request){
 content,_ := s.Crawl(r.Url)
 if content == nil{
  return 
 }
 r.ParseFunc(content,s,r)

}

func (s *Scheduler)BindRequest(){
 //监听请求队列 得到新请求进来则开始请求网址解析入库
 for {
  request := <- s.RequestQueue
  go s.Start(&request)

 }
}

//入库
func (s *Scheduler)ConsumerPipline(){
 for {
  item := <- s.PiplineQueue
  fmt.Println(item)
  s.SaveItem(&item)

 }
}

//启动函数
func (s *Scheduler) Run(start_urls ...Request){
 defer s.Store.Close()
 //监听请求队列
 go s.BindRequest()
 //将初始请求扔到队列
 for _,request := range start_urls{
  s.RequestQueue <- request
 }
 //读取pipline通道 入库
 s.ConsumerPipline()
}

func main(){
 //解析json配置文件 读取/data/JsonConfigFiles/local.json
 config_path := "/data/JsonConfigFiles"
 env := "local"
 config := configor.NewConfig(config_path,env)
 file := "citys.json"
 //加载json文件 生成每个城市需要抓取的url
 data,_ := ParseJsonFile(file)
 urls := data["citys"].([]interface{})
 start_urls := make([]scheduler.Request,0)
 for _,url := range urls{
  _url := url.(string)
  for i :=1;i<26;i++{
   city_url := _url + fmt.Sprintf("p%s/",strconv.Itoa(i))
   start_urls = append(start_urls,scheduler.Request{
    Url: city_url,
    ParseFunc: parser.ParseCity,
   })

  }
  
 }
 s := scheduler.NewScheduler(config)
 s.Run(start_urls...)
}