获取Prometheus监控数据
获取Prometheus target数据
调用http://<prometheus.address>/api/v1/targets并解析。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
def getTargetsStatus(address): url = address + '/api/v1/targets' response = requests.request( 'GET' , url) if response.status_code = = 200 : targets = response.json()[ 'data' ][ 'activeTargets' ] aliveNum, totalNum = 0 , 0 downList = [] for target in targets: totalNum + = 1 if target[ 'health' ] = = 'up' : aliveNum + = 1 else : downList.append(target[ 'labels' ][ 'instance' ]) print ( '-----------------------TargetsStatus--------------------------' ) print ( str (aliveNum) + ' in ' + str (totalNum) + ' Targets are alive !!!' ) print ( '--------------------------------------------------------------' ) for down in downList: print ( '\033[31m\033[1m' + down + '\033[0m' + ' down !!!' ) print ( '-----------------------TargetsStatus--------------------------' ) else : print ( '\033[31m\033[1m' + 'Get targets status failed!' + '\033[0m' ) |
获取Prometheus 监控信息(cpu、mem、disks)
调用http://<prometheus.address>/api/v1/query?query=<expr>并解析,其中expr为prometheus的查询语句。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
### 定义cpu、mem、disks使用率的空字典 diskUsageDict = {} cpuUsageDict = {} memUsageDict = {} ### 定义采集时间间隔 s monitorInterval = 5 ### 定义超时告警时间 s diskAlertTime = 5 cpuAlertTime = 300 memAlertTime = 300 ### 定义告警阈值 % diskThreshold = 80 cpuThreshold = 60 memThreshold = 70 def queryUsage(address, expr): url = address + '/api/v1/query?query=' + expr try : return json.loads(requests.get(url = url).content.decode( 'utf8' , 'ignore' )) except Exception as e: print (e) return {} def orderUsageDict(usageDict, currentTime, monitorInterval): ''' :param usageDict: 资源使用率字典 :param usageDict: 资源使用率字典 :param currentTime: 当前获取监控数据的时间节点 :return: :description: 剔除字典中不满足连续超出阈值的数据 ''' for key in list (usageDict.keys()): if currentTime - usageDict[key][ 1 ] > = monitorInterval: usageDict.pop(key) def getCurrentUsageGreater(address, record, threshold, usageDict, monitorInterval): ''' :param address: Prometheus address :param record: Prometheus rules record :param threshold: 阈值 :param usageDict: 资源使用率字典 :param monitorInterval: 监控时间间隔 :return: :description: 获取资源使用率大于阈值的数据 ''' expr = record + '>=' + str (threshold) usage = queryUsage(address = address, expr = expr) currentTime = 0 if 'data' in usage and usage[ 'data' ][ 'result' ]: for metric in usage[ 'data' ][ 'result' ]: instance = metric[ 'metric' ][ 'instance' ] if record = = 'node:fs_usage:ratio' or record = = 'node:fs_root_usage:ratio' : metricLabel = instance + ':' + metric[ 'metric' ][ 'mountpoint' ] else : metricLabel = instance utctime = metric[ 'value' ][ 0 ] value = metric[ 'value' ][ 1 ] describe = record.split( ':' )[ 1 ] if not metricLabel in usageDict.keys(): usageDict[metricLabel] = (utctime, utctime, describe, value) else : startTime = usageDict.get(metricLabel)[ 0 ] usageDict[metricLabel] = (startTime, utctime, describe, value) currentTime = utctime orderUsageDict(usageDict = usageDict, currentTime = currentTime, monitorInterval = monitorInterval) def printUsageDict(usageDict, alertTime): ''' :param usageDict: 资源使用率字典 :param alertTime: 监控告警时间 :return: :description: 打印出超过监控告警时间的数据 ''' for key, value in usageDict.items(): deltaT = value[ 1 ] - value[ 0 ] if deltaT > = alertTime: print (key + ' ----- ' + value[ 2 ] + '\033[31m\033[1m ' + str (value[ 3 ]) + '\033[0m ----- lasted for\033[31m\033[1m %.2f \033[0mseconds' % deltaT) def monitorUsageGreater(address): ''' :param address: Prometheus address :return: :description: 持续监控并输出数据 ''' while True : getCurrentUsageGreater(address, 'node:fs_usage:ratio' , diskThreshold, diskUsageDict, monitorInterval) printUsageDict(diskUsageDict, alertTime = diskAlertTime) getCurrentUsageGreater(address, 'node:memory_usage:ratio' , cpuThreshold, memUsageDict, monitorInterval) printUsageDict(memUsageDict, alertTime = memAlertTime) getCurrentUsageGreater(address, 'node:cpu_usage:ratio' , memThreshold, cpuUsageDict, monitorInterval) printUsageDict(cpuUsageDict, alertTime = cpuAlertTime) time.sleep(monitorInterval) |
其中有一些使用细节,比如统一资源标识符URI的构建,将HttpEntity用UTF-8编码方式转换为字符串再解析为JSON对象,我都写在注释里了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
String paramValue = "http_requests_total" ; / / HTTP客户端连接工具 CloseableHttpClient httpClient = HttpClients.createDefault(); / / 参数里有特殊字符,不能直接写成String(会报Illegal Character错误),用URIBuilder构造。 URIBuilder uri = null; HttpGet get = null; try { / / 一对参数,使用addParameter(param: ,value:)这个方法添加参数。 / / 若多对参数,使用第二种方法(但其实在这里没有这种情况):uri.addParameters( List <NameValuePair>); / / 这里的ip,port换成你的Prometheus的ip + port。paramValue要自己定义,比如http_request_total uri = new URIBuilder( "http://ip:port/api/v1/query" ); uri.addParameter( "query" ,paramValue); / / uri此时是http: / / ip:port / api / v1 / query?query = http_requests_total get = new HttpGet(uri.build()); } catch (URISyntaxException e) { e.printStackTrace(); } JSONObject jsonObject = null; CloseableHttpResponse response = null; try { / / 执行请求并接收 + 转换 得到jsonObject就可以解析了。 response = httpClient.execute(get); String resStr = EntityUtils.toString(response.getEntity(), "UTF-8" ); jsonObject = JSONObject.parseObject(resStr); |
通过promsql读取prometheus内的数据
需求是python读取prometheus内的数据,做数据处理后入库到mysql。这里主要说一下,python如何使用官方api通过promsql查询prom内的数据。
官方提供的api为:
1
|
http: / / ip:port / api / v1 / query?query = |
样例如下:
1
2
3
|
html = urllib.request.urlopen( 'http://ip:port/api/v1/query?query=count(node_cpu_seconds_total{job="%s",mode="idle"})' % (s)) data = html.read().decode( "utf-8" ) json = json.loads(data) |
返回值为json类型,如下图:
具体的json各位自己分析,瞬时值为value,值内数据,第一位是时间戳,第二位为查询的结果值
区间向量返回值为values,也比较好理解。
还有个需求需要查询之前的数据,比如前一天,月初一周之类的,可以使用如下api:
1
|
http: / / ip:port / api / v1 / query_range?query = avg( 1 - avg(rate(node_cpu_seconds_total{job = "%s" ,mode = "idle" }[ 5m ]))by(instance))&start = '+start+' &end = '+end+' &step = 15s |
其中start为采集开始时间,end为采集结束时间,step为步长,即多久设个采集点。
start和end的格式如下:
2021-11-01T00:00:00Z
获取方式可以采取以下方式:
获取每月的第一周数据,所以从每月一号零点开始到八号的零点
1
2
3
4
5
6
|
now = datetime.datetime.now() start = datetime.datetime(now.year, now.month, 1 ) end = datetime.datetime(now.year, now.month, 8 ) # 格式转换:yyyy-mm-ddThh:MM:ssZ start_trans = "T" .join( str (start).split( " " )) + "Z" end_trans = "T" .join( str (end).split( " " )) + "Z" |
获取前一周的时间
1
2
3
4
5
6
7
|
now_time = datetime.datetime.now() one_week_ago_time = now_time + datetime.timedelta(days = - 7 ) # 精确到毫秒 now = now_time.strftime( "%Y-%m-%dT%H:%M:%S.%f" ) one_week_ago = one_week_ago_time.strftime( "%Y-%m-%dT%H:%M:%S.%f" ) n = now[ 0 : len (now) - 7 ] + "Z" one_week = one_week_ago[ 0 : len (one_week_ago) - 7 ] + "Z" |
如果获取时间周期太长,返回数据太多会导致报错,这时候可调整step大小,或者将时间段分成几天获取。
主要还是了解两个api,其他的都是小问题
以上为个人经验,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/u014305062/article/details/98636139