因技术需要优化慢接口,故需抓取响应超过2s的接口。skywalking中有超过2s的所有接口的报警,代码对最近一周接口响应超过2s的接口地址进行了提取。
# -*- coding: utf-8 -*-
'''
@Time : 2023/4/20 16:50
@Author : Celeste
@File : zq_req_2.py
'''
import csv
import urllib
import requests
import json
import re
# 爬取连接2s以上接口的方法,分页处理
def get_alarm_api(current_page):
headers = {"Content-Type": "application/json"}
post_param = {
"query": "query queryAlarms($keyword: String, $scope: Scope, $duration:Duration!, $tags:[AlarmTag], $paging: Pagination!) {\n getAlarm(keyword: $keyword, scope: $scope, duration: $duration, paging: $paging, tags: $tags) {\n items: msgs {\n key: id\n message\n startTime\n scope\n tags {\n key\n value\n }\n events {\n uuid\n source {\n service serviceInstance endpoint\n }\n name\n type\n message\n parameters {\n key\n value\n }\n startTime\n endTime\n }\n }\n }}",
"variables": {
"duration": {
"start": "2023-04-20 09",
"end": "2023-04-25 09",
"step": "HOUR"
},
"paging": {
"pageNum": current_page,
"pageSize": 2
}
}
}
return_data = requests.post("http://××.×.×.××:8080/graphql",headers=headers,data=json.dumps(post_param))
resp_data = return_data.json()
return resp_data
# 对爬取的返回数据处理,提取出接口信息
def get_api_list(resp_data):
api_list = []
for mesg in resp_data['data']['getAlarm']['items']:
# c = mesg['message'].split()
# c1 = c.split()
#matches = re.findall(r'(?<!\w)(\/\w+)', c)
for m_api in mesg['message'].split():
#print(j)
if m_api.startswith("/"):
api_list.append(m_api)
page_api = set(api_list)
return page_api
上一步是一页的所有接口地址去重;
此步是取出30页数据中的接口地址,再对所有接口地址进行去重
# 挨个调用方法
def page_api_list():
pageNum = 30
page_apis = []
for current_page in range(1, pageNum + 1):
resp_data = get_alarm_api(current_page)
page_api = get_api_list(resp_data)
page_apis += page_api
qc_page_apis = set(page_apis)
return qc_page_apis
# #将爬出的数据写入到csv表格
def scrpe_csv(qc_page_apis):
# 写模式打开csv文件
with open('api.csv','a+',encoding="utf-8") as csv_obj:
# 写入一行标题
csv.writer(csv_obj).writerow(["api地址"])
#
for i in list(qc_page_apis):
# 逐个写入api信息
print("==========正在写入api为: %s,的信息=======" %(i))
csv.writer(csv_obj).writerow(i.split()) #csv.writer(csv_obj).writerow([i])
print("finshed")
if __name__ == '__main__':
data = page_api_list()
scrpe_csv(data)