简介
dolphinscheduler是一个可视化DAG工作流任务调度平台,在大数据领域做任务调用非常流行
提供了类似azkaban工作流调度,比azkaban更强的可视化DAG,支持大数据领域flink,spark,shell,python,java,scala,http等各种类型任务
官网传送门: https://dolphinscheduler.apache.org/zh-cn/
自动化
为什么需要自动化任务处理,当你的dolphinscheduler有几百上千个任务,管理是非常耗时的,如果每个任务都配置邮件告警,那一有问题整天都在救火
此时就需要任务结果监控和任务重跑来解决 失败任务和任务自动重跑,避免浪费过多时间在维护dolphinscheduler任务上
使用
在调用api之前需要为用户申请token,按图操作
dolphinscheduler提供类似swagge接口UI工具,访问doc地址访问
http://ip:12345/dolphinscheduler/doc.html?language=zh_CN&lang=cn
例子
该demo还是使用了http请求包(HttpRequest),json数据搜索包(go-jmespath)
任务结果检查
填坑说明
- 日期处理: 使用了%20转译空格,使用Sprintf方法拼接字符串
- 多种数据类型: 使用interface{}来支持int,string等多种数据类型
- 数据转换1: 将byte数据转成json格式,方便搜索
- 数据转换2: 将interface{}数据转成字符串切片,方便使用
该方法可以做成周期性任务运行,将失败的job查出来,后续是要告警通知,还是根据job名称查出对应id进行重跑任务
package main import ( "encoding/json" "fmt" "github.com/jmespath/go-jmespath" "github.com/kirinlabs/HttpRequest" "time" ) var ( url = "http://ip:12345/dolphinscheduler" token = "xxxxxxx" req *HttpRequest.Request ) func init() { req = HttpRequest.NewRequest().Debug(true).SetTimeout(time.Second*5). SetHeaders(map[string]string{ "token":token, }) } func main() { //testConn() jobCheck() } func jobCheck() { //获取日期 today := time.Now().Format("2006-01-02") tomorrow := time.Now().AddDate(0, 0, +1).Format("2006-01-02") //拼接日期 %20是空格的转译 fmt.Println(fmt.Sprintf("%v%v",today,"%2000:00:00")) fmt.Println(fmt.Sprintf("%v%v",tomorrow,"%2000:00:00")) //需要检查的项目名称 projects := []string{"jdOrder","jdPlay"} //需要检查的时间段 页码是int类型,日期是string类型 m := make(map[string]interface{}) m["pageNo"] = 1 m["pageSize"] = 22 m["stateType"] = "FAILURE" m["startDate"] = fmt.Sprintf("%v%v",today,"%2000:00:00") m["endDate"] = fmt.Sprintf("%v%v",tomorrow,"%2000:00:00") for _, project := range projects { resp, _ := req.Get(url+"/projects/"+project+"/task-instance/list-paging",m) if resp.StatusCode() != 200 { fmt.Println("job检查状态码不符期望: ",resp.StatusCode()) return } fmt.Println("resp",resp) //将返回数据从byte转成json格式 body, _ := resp.Body() var i interface{} var s []string _ = json.Unmarshal(body, &i) //搜索出需要的字段对应数据 processInstanceNames, _ := jmespath.Search("data.totalList[*].processInstanceName", i) //将interface转成[]string for _,v := range processInstanceNames.([]interface{}) { s = append(s,v.(string)) } //打印出结果 for _,v := range s { fmt.Println(v) } } }
测试连接
如果上小节任务跑不成功,可以先运行该方法,测试连接正确性
func testConn() { resp, _ := req.Get(url + "/projects/query-project-list") fmt.Println("resp",resp) body, _ := resp.Body() var i interface{} _ = json.Unmarshal(body, &i) fmt.Println("i",i) }
重跑任务
重跑任务其实就是再次启动任务,直接调用start_job既可
项目名称和ID需要通过该接口获取,这个是固定的
http://ip:12345/dolphinscheduler/projects/monitor/process/list-paging
调用示例: startJob("ads_jd_order",678)
func startJob(projectName string,projectId int) { m := make(map[string]interface{}) m["failureStrategy"] = "CONTINUE" m["warningGroupId"] = 0 m["warningType"] = "NONE" m["runMode"] = "RUN_MODE_SERIAL" m["processInstancePriority"] = "MEDIUM" m["workerGroup"] = "default" m["processDefinitionId"] = projectId resp, _ := req.JSON().Post(url+"projects/" + projectName+"/executors/start-process-instance",m) if resp.StatusCode() != 200 { fmt.Println("job开始状态码不符期望: ",resp.StatusCode()) return } }