步骤详情:
1 定时任务 每天下午9点执行
简易功能代码如下:
schedule.every().day.at("16:00").do(job)
2 查询订单失败数据
3 截取有效信息后查询订单明细数据,并加上新的规则,协商好的code、msg
4 推送失败时,发送邮件
其他细节:
关闭命令行python脚本也会定时执行(生成日志文件到 ItemList_yu_gbk_0214.log),命令如下:
nohup python3 ItemList_yu_gbk_0214.py > ItemList_yu_gbk_0214.log
收到邮件效果
服务器上文件
总的功能代码如下
#!/usr/bin/python
# -*-coding:utf8 -*-
# nohup python3 ItemList_yu_gbk_0214.py > ItemList_yu_gbk_0214.log
from io import DEFAULT_BUFFER_SIZE
from time import sleep
import traceback
import pymysql
import smtplib
import zipfile
from email import encoders
from email.header import Header
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart # 使用MIMEMultipart来标示这个邮件是多个部分组成的
from email.mime.base import MIMEBase
from email.mime.text import MIMEText # 定义邮件内容
from email.utils import formataddr
import os,sys,multitesting
import time
import shutil
import datetime
import calendar
import schedule
import mimetypes
import pandas as pd
from pathlib import Path
item_list_dict = {}
item_dict = {}
PAUSE_TIME_LIST = [['0:10', '7:00'], ['12:00', '13:30']]
#---init Time---
begintime = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
def isPause():
_now_time = datetime.datetime.now()
for _time in PAUSE_TIME_LIST:
begin_time_str = _time[0]
end_time_str = _time[1]
begin_time = datetime.datetime.strptime(begin_time_str, '%H:%M')
end_time = datetime.datetime.strptime(end_time_str, '%H:%M')
_now_time_str = _now_time.strftime('%H:%M')
now_time = datetime.datetime.strptime(_now_time_str, '%H:%M')
if (now_time >= begin_time) and (now_time <= end_time):
return True
return False
def getAlltestID(_yhs, _begin_time, _end_time):
sql = "SELECT * FROM dertest_test_info where yhstx_nsrtest = '{}' and create_time > '{}' and create_time <= '{}' and dertest_status ='0'".format(
_yhs, _begin_time, _end_time)
global mycat_db
cursor = mycat_db.cursor()
cursor.execute(sql)
rows = cursor.fetchall()
if rows is None:
return ''
if len(rows) == 0:
return ''
# print("test:" + str(len(rows)))
return rows
# '?????,(0:?????;1:?????;2:??????;3:??????;)'
def getdertestIDBytestID(_PID):
sql = "SELECT * FROM dertest_intext_info WHERE dertest_protext_info_id = '" + str(
_PID) + "'"
# print(sql)
global mycat_db
cursor = mycat_db.cursor()
cursor.execute(sql)
rows = cursor.fetchall()
if len(rows) == 0:
return ''
else:
print("getdertestIDBytestID:" + str(len(rows)))
return rows
def getdertestInfoBytestID(_PID):
sql = "SELECT ifnull(ghf_yh,''),ifnull(ghf_zh,''),ifnull(ghf_dz,''),ifnull(ghf_dh,''),ifnull(nsrmc,''),ifnull(nsrtest,''),ifnull(bz,''),id FROM dertest_info WHERE test_id = '" + str(_PID) + "'"
# print(sql)
global mycat_db
cursor = mycat_db.cursor()
cursor.execute(sql)
rows = cursor.fetchall()
if len(rows) == 0:
return ''
else:
print("getGhfBankBytestID:" + str(len(rows)))
return rows
def getSQDHBytestID(_obr_id):
sql = "SELECT sqdh FROM dertest_batest_request WHERE id = '" + str(_obr_id) + "'"
# print(sql)
global mycat_db
cursor = mycat_db.cursor()
cursor.execute(sql)
row = cursor.fetchone()
if row is None:
return ""
return str(row[0])
def getIsMail(_dertest_id):
sql = "SELECT * FROM dertest_intest_info_ext WHERE dertest_id ='"+_dertest_id+"'"
global mycat_db
cursor = mycat_db.cursor()
cursor.execute(sql)
row = cursor.fetchone()
if row is None:
return "非邮寄"
else:
print("getIsMail:" + str(len(row)))
return "邮寄"
def getItemsBydertestID(_OID,_sqdh,_ddh,_fpdm, _fphm, _yhs_mc, _yhs, _ghf_mc, _ghf_nsrtest, _ghf_bank, _ghf_address,_kprq,_kpr,_kplx,_fpbeizhu,_kpzt,_qingdanbzhi,_zuofeibz,_dingdanlx,_youjiFangshi):
sql = "SELECT * FROM dertest_item_info WHERE dertest_info_id = '" + str(_OID) + "'"
global mycat_db
global item_list_dict
cursor = mycat_db.cursor()
cursor.execute(sql)
rows = cursor.fetchall()
# print("item:" + str(len(rows)))
for _item in rows:
if isBadItem(_item):
continue
itemInfo = _sqdh + ','
itemInfo +=str(_item[0]).split("*")[2]+','
itemInfo +=str(_item[0]).split("*")[1]+','
itemInfo += _youjiFangshi
writeItemInfo(_yhs,itemInfo)
def makeTitle(_yhs):
itemInfo = "申号,据号,银行账号,地址电话合计金额(含税),合计金额(不含税),合计税额,税率,订单类型,商品名称,简称,邮式"
writeItemInfo(_yhs, itemInfo)
# 'xmmc,ggxh,xmdw,xmdj,xmsl,xmje'
def isBadItem(_item):
# if _item[3] == '1090505010000000000':
# return False
# if _item[3] == '1090414010000000000':
# return False
# if _item[3] == '1030204020000000000':
# return False
return False
def makeALLItemsToList(_yhs, my_pid_list):
# print("----------------------------????????????????-----------------------")
makeTitle(_yhs)
for _pid in my_pid_list:
dertestid_list = getdertestIDBytestID(_pid[0])
if dertestid_list == '':
continue
ghf_info = getdertestInfoBytestID(_pid[0])
dertest_id = str(ghf_info[0][7])
youjifangshi = getIsMail(dertest_id)
qingdanbz = str(dertestid_list[0][-2])
OID = dertestid_list[0][0]
obr_id = str(_pid[3])
if qingdanbz == "1":
qingdanbz = "有"
if qingdanbz == "0":
qingdanbz = "无"
if (dertestid_list == ''):
continue
for _dertestid in dertestid_list:
while (isPause() == True):
sleep(60)
print('Sleeping........')
continue
getItemsBydertestID(OID,rere,ddrh,saf,sdf,nsr_mc,_yhs, dgf,gd,yui,gd,vgh,
iiy,a,gd,kpzt,gd,zfbz,h,fgh)
def writeItemInfo(yhs_mc,infos):
temp_s = infos
# path = yhs_mc+".csv"
path ='/home/tom/data/yu/'+yhs_mc+".csv"
f = open(path, "a+", encoding="gbk")
f.write("%s" % (temp_s) + "\n")
f.close()
def getyhsList():
file = open("yhs_list.txt")
# file = open("C:\\Users\\test\\Desktop\\111\\yhs_list.txt")
yhs_list = file.readlines()
return yhs_list
def countAllItems(_yhs_list, _begin_time, _end_time):
global item_dict
global item_list_dict
yhs_index = 0
for _yhs in _yhs_list:
yhs_index += 1
if os.path.exists(_yhs.strip('\n')+".csv"):
os.remove(_yhs.strip('\n')+".csv")
test_list = getAlltestID(_yhs.strip('\n'), _begin_time, _end_time)
makeALLItemsToList(_yhs.strip('\n'), test_list)
def makeDBTOConnection():
MYCAT_HOST = "101.72.237.88"
MYCAT_PORT = 8066
MYCAT_USER = "txds"
MYCAT_PASSWORD = "2erOxFSAyrIOeLKJODSA3g6k"
MYCAT_DATABASE = "txds_sales_dertest"
toDBcon = pymysql.connect(
host=MYCAT_HOST, # IP??MySQL??????????IP???
port=MYCAT_PORT, # ???????3306???????????
user=MYCAT_USER, # ??????????
password=MYCAT_PASSWORD, # ???????????
database=MYCAT_DATABASE, # ???????????
charset='utf8' # ????????????'utf-8'
)
return toDBcon
def makeDBTestConnection():
MYCAT_HOST = "102.14.234.89"
MYCAT_PORT = 3306
MYCAT_USER = "txds"
MYCAT_PASSWORD = "txds@123"
MYCAT_DATABASE = "txds_sales_dertest_hbq"
toDBcon = pymysql.connect(
host=MYCAT_HOST, # IP??MySQL??????????IP???
port=MYCAT_PORT, # ???????3306???????????
user=MYCAT_USER, # ??????????
password=MYCAT_PASSWORD, # ???????????
database=MYCAT_DATABASE, # ???????????
charset='utf8' # ????????????'utf-8'
)
return toDBcon
def sendtxtmail():
filepath = '/home/tom/data/yuData/腾讯读书每日发数明细汇总'+datetime.datetime.now().strftime("%Y-%m-%d")+".zip" #压缩后的文件名
smtp_server = "smtp.mxhichina.com" # 发送邮箱服务器
username = 'yang@163.com'
password = 'IPMS*5873957'
sender = 'yang@163.com' # 发送者的邮箱
receivers = ["gao@163.com","guo@163.com",
"guo@.163.cn","wang@.163.cn"]
EMAIL_FROM_NAME = '税组' # 自定义发件人名称
# time = datetime.datetime.today().strftime("%m-%d %H:%M")
msg = MIMEMultipart()
# 邮件正文
msg.attach(MIMEText(" \r\n 腾讯读书每日发数明细汇总,结果请查看附件。",'plain','utf-8')) # 文本内容换行\r\n
msg['From'] = formataddr(pair=(EMAIL_FROM_NAME, sender)) # 自定义发件人的名称
msg['To'] = ";".join(receivers) # 发送给多个好友
# subject = "{}腾讯读书每日发数明细汇总".format(time)
subject = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+"腾讯读书发数明细汇总"
msg['Subject'] = subject
data = open(filepath, 'rb')
ctype, encoding = mimetypes.guess_type(filepath)
if ctype is None or encoding is not None:
ctype = 'application/octet-stream'
maintype, subtype = ctype.split('/', 1)
file_msg = MIMEBase(maintype, subtype)
file_msg.set_payload(data.read())
data.close()
encoders.encode_base64(file_msg) # 把附件编码
file_msg.add_header('Content-Disposition', 'attachment', filename="腾讯读书每日发数明细汇总"+datetime.datetime.now().strftime("%Y-%m-%d")+".zip") # 修改邮件头
msg.attach(file_msg)
try:
server = smtplib.SMTP(smtp_server)
server.login(username,password)
server.sendmail(sender,receivers,msg.as_string())
server.quit()
print("=================================== 邮件发送成功 =================================== %s" % begintime)
print("删除压缩包文件路径:", filepath)
# 发送成功后删除压缩包文件
if os.path.exists(filepath):
os.remove(filepath)
print("=================================== 压缩包删除成功 =================================== %s" % begintime)
shutil.rmtree('/home/tom/data/yu')
os.mkdir('/home/tom/data/yu')
print("=================================== 压缩包删除成功 =================================== %s" % begintime)
except Exception as err:
print("=================================== 邮件发送失败 =================================== %s" % begintime)
print(err)
def batest_zip(start_dir,zip_file):
# start_dir要压缩的文件路径
# zip_file 输出zip文件的路径
zip_file = zip_file + '.zip'
z = zipfile.ZipFile(zip_file, 'w', zipfile.ZIP_DEFLATED)
print(z)
for path, dirname, file_name in os.walk(start_dir):
# print("文件夹根路径:", path)
fpath = path.replace(start_dir, '') # 去除根路径名称
# print("--去除根路径:", fpath)
fpath = fpath and fpath + os.sep # 在原fpath加上\
# print("****去除根路径+\ :", fpath)
for filename in file_name: # 逐个循环读取文档名称
# print('--', fpath+filename)
# fpath + filename完整构成每个文档的去根绝对路径
# s = os.path.join(path, filename) # 补齐全部的绝对路径
# print('*-*',s)
z.write(os.path.join(path, filename), fpath + filename) # 实现在输出路径的Zip压缩操作
z.close()
return zip_file
# 定时任务调用的函数
def job():
print("=================================== 汇总数据开始 =================================== %s" % begintime)
global mycat_db
mycat_db = makeDBTOConnection()
yhs_list = getyhsList()
# 1汇总数据
countAllItems(yhs_list, datetime.datetime.now().strftime("%Y-%m")+"-01 00:00:00", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
print("=================================== 汇总数据结束 =================================== %s" % begintime)
# 2压缩文件
print("=================================== 压缩文件开始 =================================== %s" % begintime)
# start_dir要压缩的文件路径
# zip_file 输出zip文件的路径
batest_zip("/home/tom/data/yu","/home/tom/data/yuData/腾讯读书每日发数明细汇总"+datetime.datetime.now().strftime("%Y-%m-%d"))
print("=================================== 压缩文件结束 =================================== %s" % begintime)
time.sleep(1)
# 3发送邮件并删除文件
print("=================================== 邮件开始发送 =================================== %s" % begintime)
sendtxtmail()
return 'main func over'
# 设置定时任务启动的时间 ,每天16:00 启动
schedule.every().day.at("16:00").do(job)
while True:
schedule.run_pending()