背景
seatable 是一款可扩展性很强的在线图表软件,特别是它的本地部署功能,脚本结合丰富的api,完全可以实现超越excel的数据协作体验。
以我的工作实际应用为例,大部分的数据和协作都是在jira上完成的,但是jira的数据展示维度与页面加载效率着实一般。于是有了下方应用实践。
数据应用展示
版本排期数据
版本维度的任务分组合集
bug数据跟踪与报告
必要的环境
我是以内网部署的开发者版本为例的,所以对于数据库、admin账号等资源可以随意拿捏,不过目前除了依赖官方原版的admin接口权限外,没有任何崴脚的骚操作。
这里建议本地运行脚本,不必受限于云端python支持的库。
- python 3+
- seatable 2.5.0
- jira 8.0
核心脚本
获取jira数据并同步到指定表
本脚本仅核心脚本方法示例,并不完整,无法直接运行哦。
# coding=utf8
import requests, json, re, time
from logzero import logger
from seatable_api import Base
from datetime import datetime
from syncUserFromOa import syncUser
JIRA_DOMAIN = 'http://jira.xxx.com'
server_url = 'http://seatable.xxxx.com'
api_token = 'xxxxxxxxxx'
base = Base(api_token, server_url)
base.auth()
class JiraDataSync:
....
# 封装请求方法
def request_center(self, method, path, data=None):
url = '{}{}'.format(JIRA_DOMAIN, path)
if method == 'POST':
if data:
data = json.dumps(data)
try:
res = self.session.post(url, data=data, headers=self.headers)
resp = res.json()
return resp
except Exception as e:
logger.error(str(e))
if method == 'GET':
try:
res = self.session.get(url, headers=self.headers)
resp = res.json()
return resp
except Exception as e:
logger.error(str(e))
return None
# 登录jira
def loginJira(self):
path = '/rest/auth/latest/session'
data = {
'username': self.username,
'password': self.password,
}
resp = self.request_center('POST', path, data)
if resp and 'loginInfo' in resp.keys() and resp['loginInfo']:
logger.info('{} 登录成功!'.format(self.username))
else:
raise ('{} 登录失败!'.format(self.username))
# 切分大列表数据
def list_split(self, items, n):
return [items[i:i + n] for i in range(0, len(items), n)]
# 获取版本号信息
def get_project_version(self, projectIdOrKey):
path = f'/rest/api/2/project/{projectIdOrKey}/versions'
resp = self.request_center('GET', path)
if not resp:
logger.info('获取版本信息失败')
# 过滤掉已发布的数据,暂时不用。
# unreleased_list = [item for item in resp if not item['released']]
data_list = sorted(resp, key=lambda x: x['id'], reverse=True)
version_list = []
for item in data_list:
title, test_start_time, p_time, release_time = '', '', '', ''
if not self.is_after_limit_time(item):
continue
if not self.is_name_in_limit_tag(item):
continue
if 'description' in item.keys():
try:
description = item['description']
title = description.split("//")[0]
# 我们的版本排期在描述里有固定的格式,可以直接解析获取。
test_start_time = re.search(r'提测:(.*?),', description).group(1)
p_time = re.search(r'P版:(.*?),', description).group(1)
release_time = re.search(r'发布:(.*?)】', description).group(1)
except Exception as e:
logger.error(str(e))
# logger.info('error:', item)
jira_id = item['self'].split('/')[-1]
link = '{}/projects/{}/versions/{}'.format(JIRA_DOMAIN, projectIdOrKey, jira_id)
item['title'] = title
item['test_start_time'] = test_start_time
item['p_time'] = p_time
item['release_time'] = release_time
item['status'] = '已发布' if item['released'] else '未发布'
item['version_type'] = self.defind_version_type(item['name'])
item['link'] = link
version_list.append(item)
return version_list
# 列表转字典
def list_to_dict(self, list_data, key, sub_key=None):
new_dict = {}
for item in list_data:
if key in item.keys():
dict_key = f'{item[key]}_{item[sub_key]}' if sub_key and sub_key in item.keys() else item[key]
new_dict[dict_key] = item
else:
logger.warning(f'error version data: {str(item)}')
return new_dict
# 拼接企业邮箱获取用户在seatable中的数据
def get_user_info(self, user_key):
user_mail = f"{user_key}@xxxx.com"
user_is_exited, user_system_id = self.sync.assert_user_exits_callback_info(user_mail)
if user_is_exited:
return user_system_id
return None
def get_version_table_rows(self):
table_rows = base.list_rows(self.version_table_name)
dict_version_rows = self.list_to_dict(table_rows, 'name')
return dict_version_rows
def update_version_link_tester(self):
table_rows = base.list_rows(self.version_table_name)
update_list = []
for row in table_rows:
if row['task_user']:
tester = []
for user in row['task_user']:
if user and user not in tester:
tester.append(user)
update_list.append({
'row_id': row['_id'],
'row': {
'excu_tester': tester
}
})
split_rows = self.list_split(update_list, 100)
for split_data in split_rows:
time.sleep(1)
base.batch_update_rows(self.version_table_name, rows_data=split_data)
def update_task_table(self, version_issues):
table_rows = base.list_rows(self.task_table_name)
if table_rows:
dict_rows = self.list_to_dict(table_rows, 'key', 'fetch_version')
add_task_list = []
update_task_list = []
delete_task_list = []
new_key_list = []
for item in version_issues:
key_sub_key = '{}_{}'.format(item['key'], item['fetch_version'])
if key_sub_key in dict_rows.keys():
update_task_list.append({
"row_id": dict_rows[key_sub_key]['_id'],
"row": item
})
else:
add_task_list.append(item)
new_key_list.append(key_sub_key)
for old_task_table_key in dict_rows:
if old_task_table_key not in new_key_list:
delete_task_list.append(old_task_table_key)
if delete_task_list:
base.batch_delete_rows(self.task_table_name, delete_task_list)
logger.info('批量删除任务信息: {} 条'.format(len(delete_task_list)))
if add_task_list:
split_list_data = self.list_split(add_task_list, 50)
for split_data in split_list_data:
time.sleep(1)
base.batch_append_rows(self.task_table_name, split_data)
logger.info('批量新增任务信息: {} 条'.format(len(add_task_list)))
if update_task_list:
split_list_data = self.list_split(update_task_list, 50)
for split_data in split_list_data:
time.sleep(1)
base.batch_update_rows(self.task_table_name, split_data)
logger.info('批量更新任务信息: {} 条'.format(len(update_task_list)))
else:
split_list_data = self.list_split(version_issues, 50)
for split_data in split_list_data:
time.sleep(1)
base.batch_append_rows(self.task_table_name, split_data)
logger.info('批量新增任务信息: {} 条'.format(len(version_issues)))
# 循环遍历数据
def sync_sub_task(self):
project_keys = tuple(self.projects)
user_dict = self.sync.get_group_users(self.sync_sub_group_id)
startAt = 0
maxResults = 100
all_sub_task_list = []
has_next = True
while has_next:
logger.info(f'获取起始位置{startAt}')
sub_task_issues, has_next, startAt = self.get_sub_tasks(project_keys, user_dict, startAt, maxResults)
all_sub_task_list += sub_task_issues
self.update_sub_task_table(all_sub_task_list)
def main(self):
self.loginJira()
all_version_issues = []
for project_key in self.projects:
version_list = self.get_project_version(project_key)
self.update_version_table(version_list)
dict_version_rows = self.get_version_table_rows()
for version_name, version_row in dict_version_rows.items():
version_issues = self.get_jira_search_list(project_key, version_row)
if version_issues:
all_version_issues += version_issues
all_version_issues = sorted(all_version_issues, key=lambda x: x['jira_id'], reverse=False)
self.update_task_table(all_version_issues)
self.update_task_table_version_link()
self.update_version_link_tester()
self.sync_sub_task()
if __name__ == '__main__':
username = 'xxxxx' # jira 用户名
password = 'xxxxx' # jira 用户密码
limit_start_date = '2021-01-01' # jira数据获取时间限制
version_tags = ['2021', '2022']
projects = ['xxx', 'xxxxx'] # jira 项目key
sync_sub_group_id = 'xxx' # seatable中目标用户的分组id
seatable_admin_username = 'xxxx@xxx.com' # seatable管理账号
seatable_admin_password = 'xxxxx' # seatable管理密码
JiraDataSync(username, password, limit_start_date, version_tags, projects, seatable_admin_username,
seatable_admin_password, sync_sub_group_id)
自定义数据同步
seatable上支持设置脚本每天运行一次,或者手动立刻执行。但是我们数据需要一定实时性,但又不需要过于高频更新,所以自己基于 schedule
撸了套定时任务系统。
from logzero import logger
from data.mock_data import *
from common.scheduler_tools import execute_task
def init_check_module_scheduler_task():
logger.info('启动检查定时任务')
for task_item in task_list:
logger.info("启动定时任务:{} | {}".format(task_item['id'], task_item['name']))
execute_task(task_item['id'], task_item['cron'])
if __name__ == '__main__':
init_check_module_scheduler_task()
支持基于cron表达式的定时任务执行,灵活高效。