python+tapd+企微机器人+jenkins实现工单机器人

需求背景:

目前业务线上问题登记在tapd的项目中,客服提单后,各产研无法感知到有新工单进来。为了提高产研对线上问题的感知度、提高工单收敛速度。开发了实时机器人每5分钟拉取数据推送到产研群里,开发了超时机器人每日下班前推送还未解决的工单。

技术栈:

python +mysql+tapd+企微机器人+jenkins

实现思路:

一、读取tapd数据

二、组装企微机器人消息体

参考企微官方-群机器人配置api
https://developer.work.weixin.qq.com/document/path/91770

群机器人

三、企微群中新建机器人,拿到webhookurl

新建群机器人

四、python脚本调用企微机器人发送接口推送消息

超时机器人效果图
实时机器人效果图

五、持续集成到jenkins定时执行推送

调用click模块封装好,编写shell脚本调用,jenkins中构建添加shell脚本就行。

实时机器人示例:python xx/tapd_remind_robot.py -rtu https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=x -rtn 请处理xx人抓紧处理新工单! -t xx产品中心 -g xx 

完整代码
-初学python写的很臭很长勿喷

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# @Time    : 2021/5/14 17:06
# @Author  : yanfa
# @File    : tapd_remind_robot.py
# TAPD工单统计机器人-超时、实时

import logging
import click
import pymysql
from sshtunnel import SSHTunnelForwarder
import requests
import json

# ssh
SSH_HOST = "xxx"
SSH_PORT = 22
SSH_USERNAME = "xxxx"
SSH_PASSWORD = "xxxx"
REMOTE_BIND_HOST = "xxx"
REMOTE_BIND_PORT = xxxx

# mysql
MYSQL_HOST = "127.0.0.1"
MYSQL_PORT = ""
MYSQL_USER = "xxx"
MYSQL_PASSWD = "xxx"
MYSQL_DB = "xxx"  # tadp主库
MYSQL_DB_01 = "xxx"  # 获取用户userid
MYSQL_CHARSET = "utf8"

# 定义工单链接前半部分
project_path = "https://www.tapd.cn/xxx/bugtrace/bugs/view?bug_id="


class Tapd_Remind_Robot():
    # 初始化必传的url
    def __init__(self, url):
        self.url = url

    # SSH跳板机链接MySQL数据库-tadp主库
    def connect_mysql(self):
        server = SSHTunnelForwarder(ssh_address_or_host=(SSH_HOST, SSH_PORT),
                                    ssh_username=SSH_USERNAME,
                                    ssh_password=SSH_PASSWORD,
                                    remote_bind_address=(REMOTE_BIND_HOST, REMOTE_BIND_PORT))
        server.start()
        conn = pymysql.connect(host='127.0.0.1',
                               port=server.local_bind_port,
                               user=MYSQL_USER,
                               passwd=MYSQL_PASSWD,
                               db=MYSQL_DB,
                               charset=MYSQL_CHARSET)
        return conn

    # SSH跳板机链接MySQL数据库-获取用户userid
    def connect_mysql01(self):
        server = SSHTunnelForwarder(ssh_address_or_host=(SSH_HOST, SSH_PORT),
                                    ssh_username=SSH_USERNAME,
                                    ssh_password=SSH_PASSWORD,
                                    remote_bind_address=(REMOTE_BIND_HOST, REMOTE_BIND_PORT))
        server.start()
        conn = pymysql.connect(host='127.0.0.1',
                               port=server.local_bind_port,
                               user=MYSQL_USER,
                               passwd=MYSQL_PASSWD,
                               db=MYSQL_DB_01,
                               charset=MYSQL_CHARSET)
        return conn

    # 执行SQL命令-tapd库
    def execute_sql(self, sql):
        db = self.connect_mysql()
        cursor = db.cursor()

        try:
            cursor.execute(sql)
            results = cursor.fetchall()
            return results
        except:
            print("Error")

        db.close()

    # 执行SQL命令-db_user_center库
    def execute_sql01(self, sql):
        db = self.connect_mysql01()
        cursor = db.cursor()

        try:
            cursor.execute(sql)
            results = cursor.fetchall()
            return results
        except:
            print("Error")

        db.close()

    # 推送消息
    def post_bot(self, url, data):
        headers = {'Content-Type': 'application/json'}
        response = requests.post(url=url, headers=headers, data=json.dumps(data))
        # 后续可能有用
        return response

    # 定义tapd_name转化为userid方法
    def find_userid(self, pending_list_01):
        # 不以;结尾特殊的元素末尾添加;
        list0 = []
        for j in pending_list_01:
            if j.endswith(';'):
                list0.append(j)
            else:
                j = j + ';'
                list0.append(j)
        # print("处理不以;结尾的元素后:", list0)
        # 第一次去重
        pending_list = list(set(list0))
        # print("第一次去重后的列表为:", pending_list)

        # 定义一个字符串存储拼接的用户信息
        list1 = ''
        for i in pending_list:
            list1 += i
        # print("拼接用户信息后的列表:", list1)

        # 去除字符最后的字符";"
        list2 = list1.strip(';')
        # print("字符串去除尾部标点为:", list2)

        # 切割
        list3 = list2.split(";")
        # print("字符串分割后:", list3)

        # 第二次去重
        list4 = list(set(list3))
        # print("第二次去重后的列表为:", list4)

        # 强转成元祖后再强转为str拼接在sql
        list5 = str(tuple(list4))
        # print("列表转为元祖再转为str:",list5)

        # 特殊处理元祖只有单个元素的','
        if len(list4) == 1:
            list6 = []
            for i in list5:
                list6.append(i)
            # print("遍历字符串:", list6)

            # 删除倒数第二个元素','
            del list6[-2]
            # print('处理,后:', list6)

            # 重新合并
            list7 = "".join(list6)
            # print('重新合并:', str(list7))

            # 拼接sql
            userid_list = "SELECT a.userid FROM t_admin_user a where a.tapd_name in" + list7 + ";"
            print("查询userid的sql为:", userid_list)
        else:
            # 拼接sql
            userid_list = "SELECT a.userid FROM t_admin_user a where a.tapd_name in" + list5 + ";"
            print("查询userid的sql为:", userid_list)
        userid_list01 = self.execute_sql01(userid_list)
        # print("处理人userid元祖为:", userid_list01)

        # 定义一个列表存储遍历元祖的值
        list8 = []
        for e in userid_list01:
            list8.append(e[0])  # 只查询子元祖第一个值userid
        print('最终处理人列表为:', list8)
        return list8

    # 定义发送消息主方法
    def send_msg_time_out(self, timeout_notice, title, group_new):
        # 定义查询语句-有截止时间,取【当前时间 - 截止时间】>=0 算超时
        # 定义被删除的部分
        deleted = """('xxx')"""
        sql01 = "SELECT count(0) FROM(SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( deadline, '%Y-%m-%d'), CURDATE() ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题' AND SUBSTR(a.id,1,12)=114249027600 AND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline !='0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND a.date >= 0;"
        sql02 = "SELECT count(0) FROM(SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( deadline, '%Y-%m-%d'), CURDATE() ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题' AND SUBSTR(a.id,1,12)=xxxAND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline !='0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND  a.date >=3 AND a.date <5;"
        sql03 = "SELECT count(0) FROM(SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( deadline, '%Y-%m-%d'), CURDATE() ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题' AND SUBSTR(a.id,1,12)=xxxAND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline !='0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND a.date >= 5 AND a.date <7;"
        sql04 = "SELECT count(0) FROM(SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( deadline, '%Y-%m-%d'), CURDATE() ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题' AND SUBSTR(a.id,1,12)=xxxAND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline !='0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND a.date >= 7;"

        # 定义查询语句-无截止时间,取【当前时间 - 创建时间】>=3 算超时
        sql05 = "SELECT count(0) FROM(SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( created, '%Y-%m-%d'), CURDATE() ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题'     AND SUBSTR(a.id,1,12)=xxx AND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline ='0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND a.date >= 3 AND a.date <5;"
        sql06 = "SELECT count(0) FROM(SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( created, '%Y-%m-%d'), CURDATE() ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题'     AND SUBSTR(a.id,1,12)=114249027600 AND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline ='0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND a.date >= 5 AND a.date <7;"
        sql07 = "SELECT count(0) FROM(SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( created, '%Y-%m-%d'), CURDATE() ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题'     AND SUBSTR(a.id,1,12)=114249027600 AND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline ='0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND a.date >= 7;"
        sql08 = "SELECT count(0) FROM(SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( created, '%Y-%m-%d'), CURDATE() ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题'     AND SUBSTR(a.id,1,12)=114249027600 AND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline ='0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND a.date >= 3;"
        sql09 = "SELECT * FROM((SELECT a.id, a.title AS '工单标题', a.created AS '创建时间', a.current_owner AS '处理人', a.custom_field_two AS '所属小组', a.date AS '超时天数' FROM (SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( deadline, '%Y-%m-%d'), CURDATE()) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题' AND SUBSTR(a.id,1,12)=114249027600 AND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + "AND a.deadline != '0000-00-00 00:00:00' AND a.id NOT IN('xxx','xx','xx') AND a.date >= 0 ) UNION (SELECT a.id, a.title AS '工单标题', a.created AS '创建时间', a.current_owner AS '处理人', a.custom_field_two AS '所属小组', a.date AS '超时天数' FROM (SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( created, '%Y-%m-%d' ), CURDATE( ) ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题' AND SUBSTR(a.id,1,12)=114249027600 AND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline = '0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND a.date >= 3 )) AS e ORDER BY e.`超时天数` desc;"
        # print("sql01为:%s\nsql02为:%s\nsql03为:%s\nsql04为:%s\nsql05为:%s\nsql06为:%s\nsql06为:%s\nsql08为:%s\nsql09为:%s"%(sql01,sql02,sql03,sql04,sql05,sql06,sql07,sql08,sql09))

        # 超时定义:
        # 如果填写了截止时间或者截止时间不等于00: 00:00 - ---则取当前时间 - 截止时间 >= 0算超时
        # 否则没填写截止时间,则用当前时间 - 创建时间 >= 3才算超时
        # 获取超时工单总数/各天数,数据库返回一个元祖且元素也是元祖,通过下标取2层值

        # 执行后相加处理
        bug_count = self.execute_sql(sql01)[0][0] + self.execute_sql(sql08)[0][0]
        bug_03 = self.execute_sql(sql02)[0][0] + self.execute_sql(sql05)[0][0]
        bug_05 = self.execute_sql(sql03)[0][0] + self.execute_sql(sql06)[0][0]
        bug_07 = self.execute_sql(sql04)[0][0] + self.execute_sql(sql07)[0][0]
        bug_all = self.execute_sql(sql09)
        print("全部超时个数为:", bug_count)
        # print("超时3天个数为:", bug_03)
        # print("超时5天个数为:", bug_05)
        # print("超时7天个数为:", bug_07)
        # print("sql09为:",sql09)
        # print("bug_all为:", bug_all)

        # 推送的报文 %d格式化数值,%s格式化字符,多个值用%(a,b,c)
        # 定义汇总部分
        head = """>【%s】\n超时工单数量汇总:<font color = "warning">%d单</font>\n>超时三天: <font color = "comment">%d单</font>\n>超时五天: <font color = "comment">%d单</font>\n>超时七天: <font color = "comment">%d单</font>""" % (
            title, bug_count, bug_03, bug_05, bug_07)

        # 定义详细清单-压缩前
        body = ''
        pending_list_01 = []
        for i in bug_all:
            # 将处理人遍历存储至新列表pending_list_01
            pending_list_01.append(i[3])
            body01 = """\n\n>工单标题:<font color="comment">%s</font>\n>工单链接:[%s%s](%s%s)\n>创建时间:<font color="comment">%s</font>\n>处理人:<font color="comment">%s</font>\n>所属小组:<font color="comment">%s</font>\n>超时天数:<font color="warning">%s天</font>""" % (
                i[1], project_path, i[0], project_path, i[0], i[2], i[3], i[4], i[5])
            # 前后加空格换行,+=遍历叠加
            body += '\n\n' + body01 + '\n\n'
        # print("处理人列表为:", pending_list_01)
        # 定义请求报文,将2部分字符串拼接
        markdown = {
            "msgtype": "markdown",
            "markdown": {
                "content": head + body
            }
        }

        # 定义详细清单-压缩后(因为接口content字节数不能大于4096,所以需要压缩)
        body_short = ''
        pending_list_01 = []
        for i in bug_all:
            # 将处理人遍历存储至新列表pending_list_01
            pending_list_01.append(i[3])
            body_short_01 = """\n\n>工单标题:<font color="comment">%s</font>\n>处理人:<font color="comment">%s</font>\n>超时天数:<font color="warning">%s天</font>""" % (
                i[1], i[3], i[5])
            # 前后加空格换行,+=遍历叠加
            body_short += '\n\n' + body_short_01 + '\n\n'
        # print("处理人列表为:", pending_list_01)
        # 定义请求报文,将2部分字符串拼接
        markdown_short = {
            "msgtype": "markdown",
            "markdown": {
                "content": head + body_short
            }
        }

        # 获取content字节数,注意采用 UTF-8 编码方式,先
        # 使用 encode() 方法,将字符串进行编码后再获取它的字节数
        count_byte = len((head + body).encode())
        short_count_byte = len((head + body_short).encode())
        print("压缩前content字节数为:", count_byte)
        print("压缩后content字节数为:", short_count_byte)

        # 添加判断,超时工单数为0且大于4096字节则不执行
        if bug_count != 0:
            if count_byte <= 4096:
                # print("超时工单汇总明细为:",markdown)
                resp01 = self.post_bot(url=self.url, data=markdown)
                # print("超时工单汇总明细-响应为:", resp01.text)
                assert resp01.text.__contains__("""errmsg":"ok""")
                print("超时工单汇总明细-发送成功")

                # 定义@处理人的消息
                notice = {
                    "msgtype": "text",
                    "text": {
                        "content": timeout_notice,
                        "mentioned_list": self.find_userid(pending_list_01)  # 调用用户userid转化方法
                    }
                }
                resp02 = self.post_bot(url=self.url, data=notice)
                print("通知处理人消息为:", notice)
                # print("通知处理人消息-响应为", resp02.text)
                assert resp02.text.__contains__("""errmsg":"ok""")
                print("通知处理人-发送成功")
            elif short_count_byte <= 4096:
                # print("超时工单汇总明细为:", markdown_short)
                resp01 = self.post_bot(self.url, markdown_short)
                # print("超时工单汇总明细-响应为:", resp01.text)
                assert resp01.text.__contains__("""errmsg":"ok""")
                print("超时工单汇总明细-发送成功")

                # 定义@处理人的消息
                notice = {
                    "msgtype": "text",
                    "text": {
                        "content": timeout_notice,
                        "mentioned_list": self.find_userid(pending_list_01)  # 调用用户userid转化方法
                    }
                }
                print("通知处理人消息为:", notice)
                resp02 = self.post_bot(self.url, notice)
                # print("通知处理人消息-响应为:", resp02.text)
                assert resp02.text.__contains__("""errmsg":"ok""")
                print("通知处理人-发送成功")
            else:
                # print("压缩前-超时工单汇总明细为:",markdown)
                # print("压缩后-超时工单汇总明细为:", markdown_short)
                head = """>\n【%s】\n超时工单数量汇总:<font color = "warning">%d单</font>\n>超时三天: <font color = "comment">%d单</font>\n>超时五天: <font color = "comment">%d单</font>\n>超时七天: <font color = "comment">%d单</font>\n\n""" % (
                    title, bug_count, bug_03, bug_05, bug_07)
                body_short = ''
                top = '\n【超时工单Top10】\n'
                pending_list_01 = []
                # 压缩后还是大于4098则只取top10
                for i in bug_all[:10]:
                    # 将处理人遍历存储至新列表pending_list_01
                    pending_list_01.append(i[3])
                    body_short_01 = """>工单标题:<font color="comment">%s</font>\n>处理人:<font color="comment">%s</font>\n>超时天数:<font color="warning">%s天</font>""" % (
                        i[1], i[3], i[5])
                    # 前后加空格换行,+=遍历叠加
                    body_short += '\n\n' + body_short_01 + '\n\n'
                print("前10处理人列表为:", pending_list_01)
                short_count_byte = len((head + top + body_short).encode())
                print('压缩后前10条长度为:', short_count_byte)

                # 定义请求报文,将2部分字符串拼接
                markdown_short = {
                    "msgtype": "markdown",
                    "markdown": {
                        "content": head + top + body_short
                    }
                }

                # 定义@处理人的消息
                notice = {
                    "msgtype": "text",
                    "text": {
                        "content": timeout_notice,
                        "mentioned_list": self.find_userid(pending_list_01)  # 调用用户userid转化方法
                    }
                }

                resp01 = self.post_bot(self.url, markdown_short)
                # print("超时工单汇总明细-响应为:", resp01.text)
                assert resp01.text.__contains__("""errmsg":"ok""")
                print("超时工单汇总明细-发送成功")

                print("通知处理人消息为:", notice)
                resp02 = self.post_bot(self.url, notice)
                # print("通知处理人消息-响应为:", resp02.text)
                assert resp02.text.__contains__("""errmsg":"ok""")
                print("通知处理人-发送成功")
                # logging.error("字节数为%s,markdown内容最长不超过4096个字节" % short_count_byte)
        else:
            logging.error("暂无超时记录")

    def send_msg_real_time(self, realtime_notice, title, group_new):
        # 定义查询语句
        realtime_count_sql = "SELECT COUNT(0) from(SELECT min(b.created),b.bug_id FROM t_tapd_bug_order a, t_tapd_bug_order_change b WHERE a.id = b.bug_id AND b.bug_id IN(SELECT b.bug_id FROM t_tapd_bug_order_change b WHERE b.created >= DATE_SUB( now(), INTERVAL 15 MINUTE ) AND SUBSTR( b.bug_id, 1, 12 ) = xxx AND b.old_value = 'feedback' AND b.new_value = 'unconfirmed') AND DATE_FORMAT( a.created, '%Y-%m-%d' ) = date_sub( curdate( ), INTERVAL 0 DAY ) AND a.module = '技术问题' AND SUBSTR( a.id, 1, 12 ) = xxx AND a.`status` = 'unconfirmed' AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " GROUP BY b.bug_id ORDER BY b.bug_id) AS c;"
        realtime_list_sql = "SELECT a.id, a.title AS '工单标题', a.custom_field_four AS '店铺', a.bugtype AS '类型', a.`status` AS '英文状态',( CASE WHEN a.`status` = 'feedback' THEN '待核实' WHEN a.`status` = 'acknowledged' THEN '已核实' WHEN a.`status` = 'unconfirmed' THEN '待评估' END) AS '状态', a.current_owner AS '处理人', a.custom_field_two AS '所属小组' FROM t_tapd_bug_order a, t_tapd_bug_order_change b WHERE a.id = b.bug_id AND b.created IN ( SELECT min( b.created ) FROM t_tapd_bug_order a, t_tapd_bug_order_change b WHERE a.id = b.bug_id AND b.bug_id IN ( SELECT DISTINCT b.bug_id FROM t_tapd_bug_order_change b WHERE b.created >= DATE_SUB( now( ), INTERVAL 15 MINUTE ) AND SUBSTR( b.bug_id, 1, 12 ) = xxx AND b.old_value = 'feedback' AND b.new_value = 'unconfirmed' ) AND DATE_FORMAT( a.created, '%Y-%m-%d' ) = date_sub( curdate( ), INTERVAL 0 DAY ) AND a.module = '技术问题' AND SUBSTR( a.id, 1, 12 ) = xxx AND a.`status` = 'unconfirmed' AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " GROUP BY b.bug_id ORDER BY b.bug_id );"

        print("第一个sql为:%s\n第二个sql为:%s" % (realtime_count_sql, realtime_list_sql))

        # 执行查询语句
        realtime_bug_count = self.execute_sql(realtime_count_sql)[0][0]
        realtime_bug_detail = self.execute_sql(realtime_list_sql)
        print("新增工单数为:", realtime_bug_count)

        # 推送的报文 %d格式化数值,%s格式化字符,多个值用%(a,b,c)
        # 定义汇总部分
        head = """>【%s】\n当前新增技术工单数:<font color = "warning">%d单</font>""" % (title, realtime_bug_count)

        # 定义详细清单-压缩前
        body = ''
        pending_list_01 = []
        for i in realtime_bug_detail:
            pending_list_01.append(i[6])
            # 链接需要[链接名称](http://www....)
            body01 = """\n\n>工单标题:<font color="comment">%s</font>\n>工单链接:[%s%s](%s%s)\n>店铺:<font color="comment">%s</font>\n>类型:<font color="comment">%s</font>\n>状态:<font color="comment">%s</font>\n>处理人:<font color="comment">%s</font>\n>所属小组:<font color="comment">%s</font>""" % (
                i[1], project_path, i[0], project_path, i[0], i[2], i[3], i[5], i[6], i[7])

            # 前后加空格换行,+=遍历叠加
            body += '\n\n' + body01 + '\n\n'

        # print("处理人列表为:", pending_list_01)

        # 定义请求报文,将2部分字符串拼接
        markdown = {
            "msgtype": "markdown",
            "markdown": {
                "content": head + body
            }
        }

        # 定义详细清单-压缩后
        body_short = ''
        pending_list_01 = []
        for i in realtime_bug_detail:
            pending_list_01.append(i[6])
            # 链接需要[链接名称](http://www....)
            body_short_01 = """\n\n>工单标题:<font color="comment">%s</font>\n>工单链接:[%s%s](%s%s)\n>处理人:<font color="comment">%s</font>""" % (
                i[1], project_path, i[0], project_path, i[0], i[6])

            # 前后加空格换行,+=遍历叠加
            body_short += '\n\n' + body_short_01 + '\n\n'

        markdown_short = {
            "msgtype": "markdown",
            "markdown": {
                "content": head + body_short
            }
        }

        # 获取content字节数,注意采用 UTF-8 编码方式,先
        # 使用 encode() 方法,将字符串进行编码后再获取它的字节数
        count_byte = len((head + body).encode())
        short_count_byte = len((head + body_short).encode())
        # print("content字节数为:", count_byte)

        # 添加判断,实时工单数为0且大于4096字节则不执行
        if realtime_bug_count != 0:
            # 定义@处理人的消息
            notice = {
                "msgtype": "text",
                "text": {
                    "content": realtime_notice,
                    "mentioned_list": self.find_userid(pending_list_01)  # 调用用户userid转化方法
                }
            }
            if count_byte <= 4096:
                # print("新工单汇总明细为:",markdown)
                resp01 = self.post_bot(self.url, markdown)
                # print("新工单汇总明细-响应为:", resp01.text)
                assert resp01.text.__contains__("""errmsg":"ok""")
                print("新工单汇总明细-发送成功")

                print("通知处理人消息为:", notice)
                resp02 = self.post_bot(self.url, notice)
                # print("通知处理人消息-响应为:", resp02.text)
                assert resp02.text.__contains__("""errmsg":"ok""")
                print("通知处理人-发送成功")
            elif short_count_byte <= 4096:
                # print("新工单汇总明细为:", markdown_short)
                resp01 = self.post_bot(self.url, markdown_short)
                # print("新工单汇总明细-响应为:", resp01.text)
                assert resp01.text.__contains__("""errmsg":"ok""")
                print("新工单汇总明细-发送成功")

                print("通知处理人消息为:", notice)
                resp02 = self.post_bot(self.url, notice)
                # print("通知处理人消息-响应为:", resp02.text)
                assert resp02.text.__contains__("""errmsg":"ok""")
                print("通知处理人-发送成功")
            else:
                logging.error("字节数为%s,markdown内容最长不超过4096个字节" % short_count_byte)
        else:
            logging.error("暂无新记录")

    # 定义产研自动转化分组方法
    def group_transform(self, group):
        group_list=['xx','xx','xx']
        if group in group_list:
            group = "'" + group + "产品中心'"
        else:
            logging.error("请填写正确的产研名称")
        return group

@click.command()
@click.option('-rtu', '--realtime_url', required=False, help='定义实时工单机器人webhook,为url格式,如 https://qyapi.weixin.qq.com')
@click.option('-rtn', '--realtime_notice', required=False, default='请处理人抓紧处理新工单!', type=str, help='定义实时工单处理人的提醒文案')
@click.option('-tou', '--timeout_url', required=False, help='定义超时工单机器人webhook,为url格式,如 https://qyapi.weixin.qq.com')
@click.option('-ton', '--timeout_notice', required=False, default='请处理人抓紧处理超时工单!', type=str, help='定义超时工单处理人的提醒文案')
@click.option('-t', '--title', required=False, default='数据产品中心', type=str, help='定义标题,如【数据产品中心】')
@click.option('-g', '--group', required=False, default='数据', type=str,
              help="定义处理人所在产研,如基础或者9,映射关系为:1-直播 2-营销 3-xxxx 4-xxx ")
# 定义机器人运行方法
# 实时机器人示例:python test/py_interface_automation/tapd_remind_robot.py -rtu https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=x -rtn 请处理xx人抓紧处理新工单! -t xx产品中心 -g xx
# 超时机器人示例:python test/py_interface_automation/tapd_remind_robot.py -tou https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx -ton 请处理人抓紧处理超时工单!-t xx产品中心 -g xx
def run_remind(timeout_url, realtime_url, timeout_notice='', realtime_notice='', title='', group=''):
    # 添加判断超时机器人url对象存在且实时机器人url对象不存在
    if timeout_url is not None and realtime_url is None:
        print("url为:%s\n提醒信息为:%s\n标题为:%s\n处理分组前:%s" % (timeout_url, timeout_notice, title, group))
        # 实例化
        trr = Tapd_Remind_Robot(timeout_url)
        # 调用发送超时工单提醒方法
        group_new = trr.group_transform(group)
        print("处理分组后:", group_new)
        trr.send_msg_time_out(timeout_notice, title, group_new)
    elif realtime_url is not None and timeout_url is None:
        print("url为:%s\n提醒信息为:%s\n标题为:%s\n处理分组前:%s" % (realtime_url, realtime_notice, title, group))
        # 实例化
        trr = Tapd_Remind_Robot(realtime_url)
        # 调用发送实时工单提醒方法
        group_new = trr.group_transform(group)
        print("处理分组后:", group_new)
        trr.send_msg_real_time(realtime_notice, title, group_new)
    else:
        logging.error("请录入正确的url")


if __name__ == '__main__':
    # 调用主运行方法
    run_remind()```





©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351

推荐阅读更多精彩内容