公司的内部代理总是出现问题,所以写了这个脚本进行代理检测
#!/usr/bin/python3
# coding=utf-8
import telnetlib
import threading
import json
import requests
from multiprocessing.dummy import Pool as ThreadPool
def msg(text):
# 钉钉报警使用
headers = {'Content-Type': 'application/json;charset=utf-8'}
api_url = "https://oapi.dingtalk.com/robot/send"
json_text = {
"msgtype": "text",
"text": {
"content": text
},
"at": {
"atMobiles": [
"11"
],
"isAtAll": False
}
}
print(requests.post(api_url, json.dumps(json_text), headers=headers).content)
def proxy():
with open("/root/work/proxy_check/proxyip.txt", 'r', encoding="utf-8") as f:
for line in f:
line = line.strip().split(' ')
# ip_list_queue.put(line)
ip_arg.append(line)
def check(ip,ip_port):
try:
telnetlib.Telnet(ip, port=ip_port, timeout=5)
#requests.get('http://ip.chinaz.com/',timeout=5, proxies={ "http": "http://"+ ip + ":" + ip_port }) # 用于判断爬取网站的代理是否被封禁
except:
print("%s 连接失败" % ip)
warn_list.append(ip)
else:
pass
print("%s 连接成功" % ip)
warn_list=[]
ip_arg=[]
proxy()
pool = ThreadPool(30)
# pool.map(check,ip_arg) # 只支持传入一个参数
pool.starmap(check,ip_arg)
pool.close()
pool.join()
if warn_list:
msg("%s 端口不通" % warn_list)
#print("%s 端口不通或网页返回状态码异常请检查" % warn_list)
else:
#print("正常")
msg("正常")
文本格式为这种:
使用shell处理成下面这种格式的ip+port就可以
xxx.xxx.xxx.146 59723
xxx.xxx.xxx.147 59724
xxx.xxx.xxx.148 59725
xxx.xxx.xxx.149 59726
xxx.xxx.xxx.150 59727