基于jira数据的seatable的本地脚本开发与任务管理实践

背景

seatable 是一款可扩展性很强的在线图表软件,特别是它的本地部署功能,脚本结合丰富的api,完全可以实现超越excel的数据协作体验。

以我的工作实际应用为例,大部分的数据和协作都是在jira上完成的,但是jira的数据展示维度与页面加载效率着实一般。于是有了下方应用实践。

数据应用展示

版本排期数据

1640090391569.png

版本维度的任务分组合集

1640091248914.png

bug数据跟踪与报告

1640091253115.png

必要的环境

我是以内网部署的开发者版本为例的,所以对于数据库、admin账号等资源可以随意拿捏,不过目前除了依赖官方原版的admin接口权限外,没有任何崴脚的骚操作。
这里建议本地运行脚本,不必受限于云端python支持的库。

  • python 3+
  • seatable 2.5.0
  • jira 8.0

核心脚本

获取jira数据并同步到指定表

本脚本仅核心脚本方法示例,并不完整,无法直接运行哦。

# coding=utf8
import requests, json, re, time
from logzero import logger
from seatable_api import Base
from datetime import datetime
from syncUserFromOa import syncUser

JIRA_DOMAIN = 'http://jira.xxx.com'

server_url = 'http://seatable.xxxx.com'
api_token = 'xxxxxxxxxx'

base = Base(api_token, server_url)
base.auth()


class JiraDataSync:
    ....
    # 封装请求方法
    def request_center(self, method, path, data=None):
        url = '{}{}'.format(JIRA_DOMAIN, path)
        if method == 'POST':
            if data:
                data = json.dumps(data)
            try:
                res = self.session.post(url, data=data, headers=self.headers)
                resp = res.json()
                return resp
            except Exception as e:
                logger.error(str(e))
        if method == 'GET':
            try:
                res = self.session.get(url, headers=self.headers)
                resp = res.json()
                return resp
            except Exception as e:
                logger.error(str(e))
        return None
    # 登录jira
    def loginJira(self):
        path = '/rest/auth/latest/session'
        data = {
            'username': self.username,
            'password': self.password,
        }
        resp = self.request_center('POST', path, data)
        if resp and 'loginInfo' in resp.keys() and resp['loginInfo']:
            logger.info('{} 登录成功!'.format(self.username))
        else:
            raise ('{} 登录失败!'.format(self.username))
    
    # 切分大列表数据
    def list_split(self, items, n):
        return [items[i:i + n] for i in range(0, len(items), n)]

    # 获取版本号信息
    def get_project_version(self, projectIdOrKey):
        path = f'/rest/api/2/project/{projectIdOrKey}/versions'
        resp = self.request_center('GET', path)
        if not resp:
            logger.info('获取版本信息失败')
        # 过滤掉已发布的数据,暂时不用。
        # unreleased_list = [item for item in resp if not item['released']]
        data_list = sorted(resp, key=lambda x: x['id'], reverse=True)
        version_list = []
        for item in data_list:
            title, test_start_time, p_time, release_time = '', '', '', ''
            if not self.is_after_limit_time(item):
                continue
            if not self.is_name_in_limit_tag(item):
                continue
            if 'description' in item.keys():
                try:
                    description = item['description']
                    title = description.split("//")[0]
                    # 我们的版本排期在描述里有固定的格式,可以直接解析获取。
                    test_start_time = re.search(r'提测:(.*?),', description).group(1)
                    p_time = re.search(r'P版:(.*?),', description).group(1)
                    release_time = re.search(r'发布:(.*?)】', description).group(1)
                except Exception as e:
                    logger.error(str(e))
                    # logger.info('error:', item)
            jira_id = item['self'].split('/')[-1]
            link = '{}/projects/{}/versions/{}'.format(JIRA_DOMAIN, projectIdOrKey, jira_id)
            item['title'] = title
            item['test_start_time'] = test_start_time
            item['p_time'] = p_time
            item['release_time'] = release_time
            item['status'] = '已发布' if item['released'] else '未发布'
            item['version_type'] = self.defind_version_type(item['name'])
            item['link'] = link
            version_list.append(item)
        return version_list
    
    # 列表转字典
    def list_to_dict(self, list_data, key, sub_key=None):
        new_dict = {}
        for item in list_data:
            if key in item.keys():
                dict_key = f'{item[key]}_{item[sub_key]}' if sub_key and sub_key in item.keys() else item[key]
                new_dict[dict_key] = item
            else:
                logger.warning(f'error version data: {str(item)}')
        return new_dict

    # 拼接企业邮箱获取用户在seatable中的数据
    def get_user_info(self, user_key):
        user_mail = f"{user_key}@xxxx.com"
        user_is_exited, user_system_id = self.sync.assert_user_exits_callback_info(user_mail)
        if user_is_exited:
            return user_system_id
        return None

    def get_version_table_rows(self):
        table_rows = base.list_rows(self.version_table_name)
        dict_version_rows = self.list_to_dict(table_rows, 'name')
        return dict_version_rows

    def update_version_link_tester(self):
        table_rows = base.list_rows(self.version_table_name)
        update_list = []
        for row in table_rows:
            if row['task_user']:
                tester = []
                for user in row['task_user']:
                    if user and user not in tester:
                        tester.append(user)
                update_list.append({
                    'row_id': row['_id'],
                    'row': {
                        'excu_tester': tester
                    }
                })
        split_rows = self.list_split(update_list, 100)
        for split_data in split_rows:
            time.sleep(1)
            base.batch_update_rows(self.version_table_name, rows_data=split_data)

    def update_task_table(self, version_issues):
        table_rows = base.list_rows(self.task_table_name)
        if table_rows:
            dict_rows = self.list_to_dict(table_rows, 'key', 'fetch_version')
            add_task_list = []
            update_task_list = []
            delete_task_list = []
            new_key_list = []
            for item in version_issues:
                key_sub_key = '{}_{}'.format(item['key'], item['fetch_version'])
                if key_sub_key in dict_rows.keys():
                    update_task_list.append({
                        "row_id": dict_rows[key_sub_key]['_id'],
                        "row": item
                    })
                else:
                    add_task_list.append(item)
                new_key_list.append(key_sub_key)
            for old_task_table_key in dict_rows:
                if old_task_table_key not in new_key_list:
                    delete_task_list.append(old_task_table_key)
            if delete_task_list:
                base.batch_delete_rows(self.task_table_name, delete_task_list)
                logger.info('批量删除任务信息: {} 条'.format(len(delete_task_list)))
            if add_task_list:
                split_list_data = self.list_split(add_task_list, 50)
                for split_data in split_list_data:
                    time.sleep(1)
                    base.batch_append_rows(self.task_table_name, split_data)
                logger.info('批量新增任务信息: {} 条'.format(len(add_task_list)))
            if update_task_list:
                split_list_data = self.list_split(update_task_list, 50)
                for split_data in split_list_data:
                    time.sleep(1)
                    base.batch_update_rows(self.task_table_name, split_data)
                logger.info('批量更新任务信息: {} 条'.format(len(update_task_list)))
        else:
            split_list_data = self.list_split(version_issues, 50)
            for split_data in split_list_data:
                time.sleep(1)
                base.batch_append_rows(self.task_table_name, split_data)
            logger.info('批量新增任务信息: {} 条'.format(len(version_issues)))
  
    # 循环遍历数据
    def sync_sub_task(self):
        project_keys = tuple(self.projects)
        user_dict = self.sync.get_group_users(self.sync_sub_group_id)

        startAt = 0
        maxResults = 100
        all_sub_task_list = []
        has_next = True

        while has_next:
            logger.info(f'获取起始位置{startAt}')
            sub_task_issues, has_next, startAt = self.get_sub_tasks(project_keys, user_dict, startAt, maxResults)
            all_sub_task_list += sub_task_issues

        self.update_sub_task_table(all_sub_task_list)

    def main(self):
        self.loginJira()
        all_version_issues = []
        for project_key in self.projects:
            version_list = self.get_project_version(project_key)
            self.update_version_table(version_list)
            dict_version_rows = self.get_version_table_rows()
            for version_name, version_row in dict_version_rows.items():
                version_issues = self.get_jira_search_list(project_key, version_row)
                if version_issues:
                    all_version_issues += version_issues
        all_version_issues = sorted(all_version_issues, key=lambda x: x['jira_id'], reverse=False)
        self.update_task_table(all_version_issues)
        self.update_task_table_version_link()
        self.update_version_link_tester()
        self.sync_sub_task()


if __name__ == '__main__':
    username = 'xxxxx'  # jira 用户名
    password = 'xxxxx'  # jira 用户密码
    limit_start_date = '2021-01-01'   # jira数据获取时间限制
    version_tags = ['2021', '2022']
    projects = ['xxx', 'xxxxx']   # jira 项目key
    sync_sub_group_id = 'xxx'  # seatable中目标用户的分组id
    seatable_admin_username = 'xxxx@xxx.com'  # seatable管理账号
    seatable_admin_password = 'xxxxx'   # seatable管理密码
    JiraDataSync(username, password, limit_start_date, version_tags, projects, seatable_admin_username,
                 seatable_admin_password, sync_sub_group_id)

自定义数据同步

seatable上支持设置脚本每天运行一次,或者手动立刻执行。但是我们数据需要一定实时性,但又不需要过于高频更新,所以自己基于 schedule 撸了套定时任务系统。

from logzero import logger
from data.mock_data import *
from common.scheduler_tools import execute_task

def init_check_module_scheduler_task():
  logger.info('启动检查定时任务')
  for task_item in task_list:
    logger.info("启动定时任务:{} | {}".format(task_item['id'], task_item['name']))
    execute_task(task_item['id'], task_item['cron'])

if __name__ == '__main__':
  init_check_module_scheduler_task()

支持基于cron表达式的定时任务执行,灵活高效。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容