在日常办公中,经常免不了和Excel打交道,每次手工处理数据,稍微不细心点。数据可能就出错了。而且重复的任务又会占据大量的工作时间。那有没有办法可以解决这些问题呢?
今天介绍一种方法,可以解决日常工作的重复工作,节省时间,从而优化自己工作。今天说的就是通过Python语言的selenium+numpy+js+pandas库。完成自动化办公。
首先,需要通过Python+Selenium+Js 封装一个函数get,用于在公司管理平台下载各类所需的数据表(如果数据是直接以excel提供,则忽略此步骤)。自动化办公目录图片如下:
封装的数据获取的get函数:
from selenium import webdriver
import time,random
import json,os
from selenium.webdriver.support.select import Select
import cv2
'''封装函数,自动登录 //*[@id="img"]验证码xpath'''
class cook:
def __init__(self):
self.driver = webdriver.Chrome()
self.driver.implicitly_wait(120)
try:
self.cook_login()
time.sleep(3)
if self.driver.title != '需求管理平台':
print('cookie login fail')
'''弹出框处理#send_keys(Keys.ENTER)'''
self.driver.switch_to.alert.accept()
except:
time.sleep(2)
self.driver.delete_all_cookies()
self.driver.get('url')
self.login()
self.cookies_write()
#cookies写入,从而每次调用可以跳过验证码
def cookies_write(self):
dictCookies = self.driver.get_cookies()
jsonCookies = json.dumps(dictCookies)
os.remove('C:/Users/lenovo/Downloads/cookies4.json')
time.sleep(2)
with open('C:/Users/lenovo/Downloads/cookies4.json', 'w') as f:
f.write(jsonCookies)
print(type(jsonCookies),jsonCookies)
print('收集完成cookie')
'''
:param 账号密码
'''
def login(self,user='账号',pwd='密码'):
time.sleep(1.5)
self.driver.find_element_by_id('j_username').send_keys(user)
time.sleep(1)
self.driver.find_element_by_id('j_password').send_keys(pwd)
# img_yzm = self.driver.find_element_by_xpath('//*[@id="img"]')
# img_yzm.screenshot(r'D:\360MoveData\Users\lenovo\Music\Desktop\yzm.jpg')
# time.sleep(1)
# image1 = cv2.imread(r'D:\360MoveData\Users\lenovo\Music\Desktop\yzm.jpg')
# cv2.imshow('image', image1)
# cv2.waitKey(0)
# cv2.destroyAllWindows()
code = input('\n请输入验证码:')
self.driver.find_element_by_id('code').send_keys(code)
log_in = self.driver.find_element_by_xpath("//input[@value='登录']")
log_in.click()
while self.driver.title == '首页登录':
self.login()
def cook_login(self):
with open('C:/Users/lenovo/Downloads/cookies4.json', 'r') as f:
#, encoding='utf-8')
listCookies = json.loads(f.read())
self.driver.get('http://iimp.sh.cmcc/rip/login.do')
self.driver.delete_all_cookies()
for i in listCookies:
self.driver.add_cookie(i)
self.driver.get('http://iimp.sh.cmcc/rip/chaxun.do')
self.cookies_write()
''':return 网址dict'''
#返回各类数据的一个字典,从而可以通过链接进行下载
def back(self):
report_list = []
for link in self.driver.find_elements_by_xpath("//*[@id='sf-menu2']/li/a"):
herf = link.get_attribute('href')
text = link.get_attribute('text')
lis1 = [text, herf]
report_list.append(lis1)
report_list = dict(report_list)
return report_list
'''更改时间
:param form H5标签的id
:param t 更改的时间'''
def alter_time(self,form, t):
js = "document.getElementById('%s').removeAttribute('readonly');" % form
self.driver.execute_script(js)
js_value = "document.getElementById('%s').value='%s'" % (form, t)
self.driver.execute_script(js_value)
'''导出报表函数'''
def report(self,name,start,end):
report_list = self.back()
if name == '政企业务需求报表':
self.driver.get(report_list['政企业务需求报表'])
self.driver.find_element_by_xpath(
'//*[@id="bd"]/div[1]/div[2]/table/tbody/tr/td/table/tbody/tr/td/input').click()
elif name == '政企KPI跟踪报表':
self.driver.get(report_list['政企KPI跟踪报表'])
self.driver.find_element_by_xpath(
'//*[@id="bd"]/div[1]/div[2]/table/tbody/tr/td/table/tbody/tr/td/input').click()
time.sleep(3)
elif name == '需求确认时间查询报表':
self.driver.get(report_list['需求确认时间查询报表'])
self.alter_time('confirmTimeStart', start)
self.alter_time('confirmTimeEnd', end)
self.driver.find_element_by_xpath('// *[ @ id = "export"]').click()
elif name == 'PATCH计划':
self.driver.get(report_list['PATCH计划'])
self.alter_time('planTimeStart', start)
self.alter_time('planTimeEnd', end)
self.driver.find_element_by_xpath(
'//*[@id="bd"]/div/div[2]/table/tbody/tr/td/table/tbody/tr[10]/td/input[2]').click()
elif name == '业支厂商工时报表':
self.driver.get(report_list['业支厂商工时报表'])
self.alter_time('planTimeStart', start)
self.alter_time('planTimeEnd', end)
self.driver.find_element_by_xpath(
'//*[@id="bd"]/div/div[2]/table/tbody/tr/td/table/tbody/tr[2]/td/input[2]').click()
elif name == '测试进度报表':
self.driver.get(report_list['测试进度'])
self.alter_time('planTimeStart', start)
self.alter_time('planTimeEnd', end)
time.sleep(1)
self.driver.find_element_by_xpath('//*[@id="testPerson"]').clear()
self.driver.find_element_by_xpath(
'//*[@id="bd"]/div/form/div[1]/table/tbody/tr/td/table/tbody/tr[3]/td/input[2]').click()
elif name == '缺陷工单修复时长报表':
self.driver.get(report_list['缺陷工单修复时长报表'])
self.alter_time('planOnlineTimeStart', start)
self.alter_time('planOnlineTimeEnd', end)
self.driver.find_element_by_xpath(
'//*[@id="bd"]/div/div[2]/table/tbody/tr/td/table/tbody/tr[5]/td/input[2]').click()
elif name == '需求查询':
self.driver.get(report_list['需求查询'])
self.alter_time('time', start)
self.alter_time('time1', end)
'''下拉选框选择'''
Select(self.driver.find_element_by_xpath("//select[@id='remandType']")).select_by_visible_text('业务支撑')
time.sleep(2)
Select(self.driver.find_element_by_xpath("//select[@id='secondSelect']")).select_by_visible_text('集团下发')
self.driver.find_element_by_xpath("//*[@name='importBtn']").click()
elif name == '敏捷工时统计报表':
self.driver.get(report_list['敏捷工时统计报表'])
self.alter_time('planTimeStart', start)
self.alter_time('planTimeEnd', end)
self.driver.find_element_by_xpath('//*[@id="bd"]/div[1]/div[2]/table/tbody/tr/td/table/tbody/tr[4]/td/input[2]').click()
else:
print('发生错误,请检查判断条件')
def quit(self):
self.driver.quit()
准备工作完成后,然后利用Pandas对表格进行处理,处理数据的步骤:读取、排序、筛选、去重、计算时间差、透视表、交叉表,删除数据、时间修改以及excel的函数:计算首次上线、计算上线次数、计算最后一次上线等等。
以一个每日任务代码为例,代码如下:
'''政企KPI任务报表'''
from control.common.get import cook
from control.common import demand
import time, os, datetime
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
# 每次记得更新时间
start = '2020-01-01'
end = '2021-01-31'
'''政企任务的报表下载'''
report1 = cook()
report1.report('PATCH计划', start, end)
report1.report('政企业务需求报表', start, end)
report1.report('政企KPI跟踪报表', start, end)
time.sleep(3)
#定义需要剔除考核的工单列表
zq_data = [政企工单列表]
sj_data = [数据工单列表]
#创建,处理后数据的导出路径
t1 = demand.ltime(0)
t1path = r'D:\数据统计\12.政企KPI日报/' + t1
if not os.path.exists(t1path):
os.makedirs(t1path)
os.chdir(t1path)
def clean():
time.sleep(2)
#数据读取
data1 = pd.read_excel(r'C:\Users\lenovo\Downloads\政企业务需求报表.xls')
data2 = pd.read_excel(r'C:\Users\lenovo\Downloads\PATCH计划2.xls')
df1 = data1.copy()
df1.index = df1['工单编号']
#数据排序和计算
df1.sort_values(by=['工单编号', '上线时间'], inplace=True, ascending=True)
df1['外部工时'][df1['是否重复'] == '是'] = df1[df1['是否重复'] == '是']['外部工时'] / 2
'''删除指定的工单'''
for i in zq_data:
demand.drop_data(df1, i)
'''修改时间'''
df1.loc[工单编号1, '上线时间'] = '2018-04-24'
df1.loc[工单编号2, '上线时间'] = '2018-03-30'
df1.loc[工单编号3, '创建时间'] = '2018-06-12'
df1.loc[工单编号4, '创建时间'] = '2018-06-12'
df1.loc[工单编号5, '创建时间'] = '2018-06-28'
#更改值为时间格式
df1['需求确认'] = pd.to_datetime(df1['需求确认']) # ,format='%Y-%m-%d')
df1['到达时间'] = pd.to_datetime(df1['到达时间'])
df1['上线时间'] = pd.to_datetime(df1['上线时间'])
lst = df1[df1['到达时间'].isnull()].工单编号.unique().tolist()
print(lst)
#替换指定工单时间
for i in lst:
df1.loc[i]['到达时间'] = df1.loc[i]['需求确认'] - datetime.timedelta(2)
df1.loc['政企-2018-17184', '到达时间'] = pd.to_datetime('2018-12-08')
# 修改时间为年-月-日 去除时分秒
df1['需求确认'] = df1['需求确认'].apply(lambda x: x.strftime('%Y-%m-%d'))
df1['到达时间'] = df1['到达时间'].apply(lambda x: x.strftime('%Y-%m-%d'))
df1['上线时间'] = df1['上线时间'].apply(lambda x: x.strftime('%Y-%m-%d'))
df2 = data2.copy()
df2.index = df2['工单编号']
df2['计划上线时间'] = pd.to_datetime(df2['计划上线时间'])
df2.sort_values(by='计划上线时间', inplace=True, ascending=True)
#计算最后一次上线工单
df2 = df2.groupby('工单编号').last()
df2['计划上线时间'] = df2['计划上线时间'].apply(lambda x: x.strftime('%Y-%m-%d'))
# 筛选出指定部门的工单
df2 = df2[df2['提出部门'].str.contains('数据业务中心') | df2['提出部门'].str.contains('产品运营支撑中心') | df2['提出部门'].str.contains('客户响应与产品运营中心')]
# 筛选出指定需求负责人的工单,后期用字典实现筛选
df2 = df2[df2['需求负责人'].str.contains('name1') | df2['需求负责人'].str.contains('name2') |
df2['需求负责人'].str.contains('name3') | df2['需求负责人'].str.contains('name4') |
df2['需求负责人'].str.contains('name5') | df2['需求负责人'].str.contains('name6') |
df2['需求负责人'].str.contains('name7') | df2['需求负责人'].str.contains('name8') |
df2['需求负责人'].str.contains('name9') | df2['需求负责人'].str.contains('name10') |
df2['需求负责人'].str.contains('name11') | df2['需求负责人'].str.contains('name12')]
# 删除指定的行
for i in sj_data:
try:
demand.drop_data(df2, i)
except:
print('无法删除的行:%s' % i)
df2['工单编号'] = df2.index
data3 = pd.read_excel(r'D:\360MoveData\Users\lenovo\Music\Desktop\svn\政企KPI日报.xlsx')
data3.index = data3['工单编号']
a = data3[data3.index.str.contains('数据')][['需求确认', '到达时间']]
a['需求确认'] = pd.to_datetime(a['需求确认'])
a['到达时间'] = pd.to_datetime(a['到达时间'])
a['需求确认'] = a['需求确认'].apply(lambda x: x.strftime('%Y-%m-%d'))
a['到达时间'] = a['到达时间'].apply(lambda x: x.strftime('%Y-%m-%d'))
#两张数据表合并
df2 = pd.merge(df2, a, left_index=True, right_index=True, how='left')
concat2 = df2[['工单编号', '工单名称', '工单简介', '创建人', '需求负责人', '创建时间', '需求确认', '计划上线时间', '到达时间']]
#修改列名
concat2.rename(columns={'计划上线时间': '上线时间', '工单简介': '简介'}, inplace=True)
df = pd.concat([df1, concat2])
#数据排序,后期用insert插入方式实现
df = df[['工单编号', '工单名称', '简介', '创建人', '需求负责人', '创建时间', '需求确认', '上线时间', '到达时间',
'期望上线时间', '需求期望上线时间', '到达总控时间', '一级总控拆分', '来源部门', '工单大类', '工单类型',
'一级业务标签', '二级业务标签', '一级指标库标签', '二级指标库标签', '三级指标库标签', '上线次数', '外部工时',
'是否重复']]
print('数据已经处理完成,正在导出中……')
os.chdir(r'D:\360MoveData\Users\lenovo\Music\Desktop')
# 写入Excel,并导出
shet = pd.ExcelWriter('政企KPI日报.xlsx')
df.to_excel(shet, sheet_name='政企KPI报表', index=False)
df1.to_excel(shet, sheet_name='政企需求表')
df2.to_excel(shet, sheet_name='patch计划表')
shet.close()
print('本次政企据共%i条' % len(df1))
print('本次数据共%i条' % len(df2))
print('本次政企日报数据共%i条' % len(df))
print('\n在' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) + '分,导出处理后数据')
通过自动化,以前每次一小时左右完成的数据处理任务,现在只需要几分钟就可以完成。而且自动化处理的数据准确性高,防止手工操作可能由于一些原因导致数据的不准确性。自动化省出来工作时间,从而可以用于继续封装其他任务,提高整体效率。从而进入一个良性循环,低成本,高回报。