基于jira数据的seatable的本地脚本开发与任务管理实践

背景

seatable 是一款可扩展性很强的在线图表软件,特别是它的本地部署功能,脚本结合丰富的api,完全可以实现超越excel的数据协作体验。

以我的工作实际应用为例,大部分的数据和协作都是在jira上完成的,但是jira的数据展示维度与页面加载效率着实一般。于是有了下方应用实践。

数据应用展示

版本排期数据

1640090391569.png

版本维度的任务分组合集

1640091248914.png

bug数据跟踪与报告

1640091253115.png

必要的环境

我是以内网部署的开发者版本为例的,所以对于数据库、admin账号等资源可以随意拿捏,不过目前除了依赖官方原版的admin接口权限外,没有任何崴脚的骚操作。
这里建议本地运行脚本,不必受限于云端python支持的库。

  • python 3+
  • seatable 2.5.0
  • jira 8.0

核心脚本

获取jira数据并同步到指定表

本脚本仅核心脚本方法示例,并不完整,无法直接运行哦。

# coding=utf8
import requests, json, re, time
from logzero import logger
from seatable_api import Base
from datetime import datetime
from syncUserFromOa import syncUser

JIRA_DOMAIN = 'http://jira.xxx.com'

server_url = 'http://seatable.xxxx.com'
api_token = 'xxxxxxxxxx'

base = Base(api_token, server_url)
base.auth()


class JiraDataSync:
    ....
    # 封装请求方法
    def request_center(self, method, path, data=None):
        url = '{}{}'.format(JIRA_DOMAIN, path)
        if method == 'POST':
            if data:
                data = json.dumps(data)
            try:
                res = self.session.post(url, data=data, headers=self.headers)
                resp = res.json()
                return resp
            except Exception as e:
                logger.error(str(e))
        if method == 'GET':
            try:
                res = self.session.get(url, headers=self.headers)
                resp = res.json()
                return resp
            except Exception as e:
                logger.error(str(e))
        return None
    # 登录jira
    def loginJira(self):
        path = '/rest/auth/latest/session'
        data = {
            'username': self.username,
            'password': self.password,
        }
        resp = self.request_center('POST', path, data)
        if resp and 'loginInfo' in resp.keys() and resp['loginInfo']:
            logger.info('{} 登录成功!'.format(self.username))
        else:
            raise ('{} 登录失败!'.format(self.username))
    
    # 切分大列表数据
    def list_split(self, items, n):
        return [items[i:i + n] for i in range(0, len(items), n)]

    # 获取版本号信息
    def get_project_version(self, projectIdOrKey):
        path = f'/rest/api/2/project/{projectIdOrKey}/versions'
        resp = self.request_center('GET', path)
        if not resp:
            logger.info('获取版本信息失败')
        # 过滤掉已发布的数据,暂时不用。
        # unreleased_list = [item for item in resp if not item['released']]
        data_list = sorted(resp, key=lambda x: x['id'], reverse=True)
        version_list = []
        for item in data_list:
            title, test_start_time, p_time, release_time = '', '', '', ''
            if not self.is_after_limit_time(item):
                continue
            if not self.is_name_in_limit_tag(item):
                continue
            if 'description' in item.keys():
                try:
                    description = item['description']
                    title = description.split("//")[0]
                    # 我们的版本排期在描述里有固定的格式,可以直接解析获取。
                    test_start_time = re.search(r'提测:(.*?),', description).group(1)
                    p_time = re.search(r'P版:(.*?),', description).group(1)
                    release_time = re.search(r'发布:(.*?)】', description).group(1)
                except Exception as e:
                    logger.error(str(e))
                    # logger.info('error:', item)
            jira_id = item['self'].split('/')[-1]
            link = '{}/projects/{}/versions/{}'.format(JIRA_DOMAIN, projectIdOrKey, jira_id)
            item['title'] = title
            item['test_start_time'] = test_start_time
            item['p_time'] = p_time
            item['release_time'] = release_time
            item['status'] = '已发布' if item['released'] else '未发布'
            item['version_type'] = self.defind_version_type(item['name'])
            item['link'] = link
            version_list.append(item)
        return version_list
    
    # 列表转字典
    def list_to_dict(self, list_data, key, sub_key=None):
        new_dict = {}
        for item in list_data:
            if key in item.keys():
                dict_key = f'{item[key]}_{item[sub_key]}' if sub_key and sub_key in item.keys() else item[key]
                new_dict[dict_key] = item
            else:
                logger.warning(f'error version data: {str(item)}')
        return new_dict

    # 拼接企业邮箱获取用户在seatable中的数据
    def get_user_info(self, user_key):
        user_mail = f"{user_key}@xxxx.com"
        user_is_exited, user_system_id = self.sync.assert_user_exits_callback_info(user_mail)
        if user_is_exited:
            return user_system_id
        return None

    def get_version_table_rows(self):
        table_rows = base.list_rows(self.version_table_name)
        dict_version_rows = self.list_to_dict(table_rows, 'name')
        return dict_version_rows

    def update_version_link_tester(self):
        table_rows = base.list_rows(self.version_table_name)
        update_list = []
        for row in table_rows:
            if row['task_user']:
                tester = []
                for user in row['task_user']:
                    if user and user not in tester:
                        tester.append(user)
                update_list.append({
                    'row_id': row['_id'],
                    'row': {
                        'excu_tester': tester
                    }
                })
        split_rows = self.list_split(update_list, 100)
        for split_data in split_rows:
            time.sleep(1)
            base.batch_update_rows(self.version_table_name, rows_data=split_data)

    def update_task_table(self, version_issues):
        table_rows = base.list_rows(self.task_table_name)
        if table_rows:
            dict_rows = self.list_to_dict(table_rows, 'key', 'fetch_version')
            add_task_list = []
            update_task_list = []
            delete_task_list = []
            new_key_list = []
            for item in version_issues:
                key_sub_key = '{}_{}'.format(item['key'], item['fetch_version'])
                if key_sub_key in dict_rows.keys():
                    update_task_list.append({
                        "row_id": dict_rows[key_sub_key]['_id'],
                        "row": item
                    })
                else:
                    add_task_list.append(item)
                new_key_list.append(key_sub_key)
            for old_task_table_key in dict_rows:
                if old_task_table_key not in new_key_list:
                    delete_task_list.append(old_task_table_key)
            if delete_task_list:
                base.batch_delete_rows(self.task_table_name, delete_task_list)
                logger.info('批量删除任务信息: {} 条'.format(len(delete_task_list)))
            if add_task_list:
                split_list_data = self.list_split(add_task_list, 50)
                for split_data in split_list_data:
                    time.sleep(1)
                    base.batch_append_rows(self.task_table_name, split_data)
                logger.info('批量新增任务信息: {} 条'.format(len(add_task_list)))
            if update_task_list:
                split_list_data = self.list_split(update_task_list, 50)
                for split_data in split_list_data:
                    time.sleep(1)
                    base.batch_update_rows(self.task_table_name, split_data)
                logger.info('批量更新任务信息: {} 条'.format(len(update_task_list)))
        else:
            split_list_data = self.list_split(version_issues, 50)
            for split_data in split_list_data:
                time.sleep(1)
                base.batch_append_rows(self.task_table_name, split_data)
            logger.info('批量新增任务信息: {} 条'.format(len(version_issues)))
  
    # 循环遍历数据
    def sync_sub_task(self):
        project_keys = tuple(self.projects)
        user_dict = self.sync.get_group_users(self.sync_sub_group_id)

        startAt = 0
        maxResults = 100
        all_sub_task_list = []
        has_next = True

        while has_next:
            logger.info(f'获取起始位置{startAt}')
            sub_task_issues, has_next, startAt = self.get_sub_tasks(project_keys, user_dict, startAt, maxResults)
            all_sub_task_list += sub_task_issues

        self.update_sub_task_table(all_sub_task_list)

    def main(self):
        self.loginJira()
        all_version_issues = []
        for project_key in self.projects:
            version_list = self.get_project_version(project_key)
            self.update_version_table(version_list)
            dict_version_rows = self.get_version_table_rows()
            for version_name, version_row in dict_version_rows.items():
                version_issues = self.get_jira_search_list(project_key, version_row)
                if version_issues:
                    all_version_issues += version_issues
        all_version_issues = sorted(all_version_issues, key=lambda x: x['jira_id'], reverse=False)
        self.update_task_table(all_version_issues)
        self.update_task_table_version_link()
        self.update_version_link_tester()
        self.sync_sub_task()


if __name__ == '__main__':
    username = 'xxxxx'  # jira 用户名
    password = 'xxxxx'  # jira 用户密码
    limit_start_date = '2021-01-01'   # jira数据获取时间限制
    version_tags = ['2021', '2022']
    projects = ['xxx', 'xxxxx']   # jira 项目key
    sync_sub_group_id = 'xxx'  # seatable中目标用户的分组id
    seatable_admin_username = 'xxxx@xxx.com'  # seatable管理账号
    seatable_admin_password = 'xxxxx'   # seatable管理密码
    JiraDataSync(username, password, limit_start_date, version_tags, projects, seatable_admin_username,
                 seatable_admin_password, sync_sub_group_id)

自定义数据同步

seatable上支持设置脚本每天运行一次,或者手动立刻执行。但是我们数据需要一定实时性,但又不需要过于高频更新,所以自己基于 schedule 撸了套定时任务系统。

from logzero import logger
from data.mock_data import *
from common.scheduler_tools import execute_task

def init_check_module_scheduler_task():
  logger.info('启动检查定时任务')
  for task_item in task_list:
    logger.info("启动定时任务:{} | {}".format(task_item['id'], task_item['name']))
    execute_task(task_item['id'], task_item['cron'])

if __name__ == '__main__':
  init_check_module_scheduler_task()

支持基于cron表达式的定时任务执行,灵活高效。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容