简介
dolphinscheduler是一个可视化DAG工作流任务调度平台,在大数据领域做任务调用非常流行
提供了类似azkaban工作流调度,比azkaban更强的可视化DAG,支持大数据领域flink,spark,shell,python,java,scala,http等各种类型任务
官网传送门: https://dolphinscheduler.apache.org/zh-cn/
自动化
为什么需要自动化任务处理,当你的dolphinscheduler有几百上千个任务,管理是非常耗时的,如果每个任务都配置邮件告警,那一有问题整天都在救火
此时就需要任务结果监控和任务重跑来解决 失败任务和任务自动重跑,避免浪费过多时间在维护dolphinscheduler任务上
使用
在调用api之前需要为用户申请token,按图操作
dolphinscheduler提供类似swagge接口UI工具,访问doc地址访问
http://ip:12345/dolphinscheduler/doc.html?language=zh_CN&lang=cn
例子
该demo还是使用了http请求包(HttpRequest),json数据搜索包(go-jmespath)
任务结果检查
填坑说明
- 日期处理: 使用了%20转译空格,使用Sprintf方法拼接字符串
- 多种数据类型: 使用interface{}来支持int,string等多种数据类型
- 数据转换1: 将byte数据转成json格式,方便搜索
- 数据转换2: 将interface{}数据转成字符串切片,方便使用
该方法可以做成周期性任务运行,将失败的job查出来,后续是要告警通知,还是根据job名称查出对应id进行重跑任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
package main import ( "encoding/json" "fmt" "github.com/jmespath/go-jmespath" "github.com/kirinlabs/HttpRequest" "time" ) var ( url = "http://ip:12345/dolphinscheduler" token = "xxxxxxx" req *HttpRequest.Request ) func init() { req = HttpRequest.NewRequest().Debug( true ).SetTimeout(time.Second* 5 ). SetHeaders( map [ string ] string { "token" :token, }) } func main() { //testConn() jobCheck() } func jobCheck() { //获取日期 today := time.Now().Format( "2006-01-02" ) tomorrow := time.Now().AddDate( 0 , 0 , + 1 ).Format( "2006-01-02" ) //拼接日期 %20是空格的转译 fmt. Println (fmt.Sprintf( "%v%v" ,today, "%2000:00:00" )) fmt. Println (fmt.Sprintf( "%v%v" ,tomorrow, "%2000:00:00" )) //需要检查的项目名称 projects := [] string { "jdOrder" , "jdPlay" } //需要检查的时间段 页码是int类型,日期是string类型 m := make ( map [ string ] interface {}) m[ "pageNo" ] = 1 m[ "pageSize" ] = 22 m[ "stateType" ] = "FAILURE" m[ "startDate" ] = fmt.Sprintf( "%v%v" ,today, "%2000:00:00" ) m[ "endDate" ] = fmt.Sprintf( "%v%v" ,tomorrow, "%2000:00:00" ) for _, project := range projects { resp, _ := req.Get(url+ "/projects/" +project+ "/task-instance/list-paging" ,m) if resp.StatusCode() != 200 { fmt. Println ( "job检查状态码不符期望: " ,resp.StatusCode()) return } fmt. Println ( "resp" ,resp) //将返回数据从byte转成json格式 body, _ := resp.Body() var i interface {} var s [] string _ = json.Unmarshal(body, &i) //搜索出需要的字段对应数据 processInstanceNames, _ := jmespath.Search( "data.totalList[*].processInstanceName" , i) //将interface转成[]string for _,v := range processInstanceNames.([] interface {}) { s = append (s,v.( string )) } //打印出结果 for _,v := range s { fmt. Println (v) } } } |
测试连接
如果上小节任务跑不成功,可以先运行该方法,测试连接正确性
1
2
3
4
5
6
7
8
|
func testConn() { resp, _ := req.Get(url + "/projects/query-project-list" ) fmt. Println ( "resp" ,resp) body, _ := resp.Body() var i interface {} _ = json.Unmarshal(body, &i) fmt. Println ( "i" ,i) } |
重跑任务
重跑任务其实就是再次启动任务,直接调用start_job既可
项目名称和ID需要通过该接口获取,这个是固定的
http://ip:12345/dolphinscheduler/projects/monitor/process/list-paging
调用示例: startJob("ads_jd_order",678)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func startJob(projectName string ,projectId int ) { m := make ( map [ string ] interface {}) m[ "failureStrategy" ] = "CONTINUE" m[ "warningGroupId" ] = 0 m[ "warningType" ] = "NONE" m[ "runMode" ] = "RUN_MODE_SERIAL" m[ "processInstancePriority" ] = "MEDIUM" m[ "workerGroup" ] = "default" m[ "processDefinitionId" ] = projectId resp, _ := req.JSON().Post(url+ "projects/" + projectName+ "/executors/start-process-instance" ,m) if resp.StatusCode() != 200 { fmt. Println ( "job开始状态码不符期望: " ,resp.StatusCode()) return } } |
小结
dolphinscheduler api调用有文档,不太复杂,但网上资料较少,需要自行摸索,以上就是Go语言dolphinscheduler任务调度处理的详细内容,更多关于Go语言dolphinscheduler任务调度的资料请关注服务器之家其它相关文章!
原文链接:https://juejin.cn/post/7035286827703468063