需求背景:
目前业务线上问题登记在tapd的项目中,客服提单后,各产研无法感知到有新工单进来。为了提高产研对线上问题的感知度、提高工单收敛速度。开发了实时机器人每5分钟拉取数据推送到产研群里,开发了超时机器人每日下班前推送还未解决的工单。
技术栈:
python +mysql+tapd+企微机器人+jenkins
实现思路:
一、读取tapd数据
-
1:调用tapd官方api,直接读取项目下的各类信息
https://www.tapd.cn/help/show#1120003271001000086
2:利用方法1读取后定时写入mysql数据库-本次使用方法2
二、组装企微机器人消息体
参考企微官方-群机器人配置api
https://developer.work.weixin.qq.com/document/path/91770
三、企微群中新建机器人,拿到webhookurl
四、python脚本调用企微机器人发送接口推送消息
五、持续集成到jenkins定时执行推送
调用click模块封装好,编写shell脚本调用,jenkins中构建添加shell脚本就行。
实时机器人示例:python xx/tapd_remind_robot.py -rtu https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=x -rtn 请处理xx人抓紧处理新工单! -t xx产品中心 -g xx
完整代码:
-初学python写的很臭很长勿喷
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# @Time : 2021/5/14 17:06
# @Author : yanfa
# @File : tapd_remind_robot.py
# TAPD工单统计机器人-超时、实时
import logging
import click
import pymysql
from sshtunnel import SSHTunnelForwarder
import requests
import json
# ssh
SSH_HOST = "xxx"
SSH_PORT = 22
SSH_USERNAME = "xxxx"
SSH_PASSWORD = "xxxx"
REMOTE_BIND_HOST = "xxx"
REMOTE_BIND_PORT = xxxx
# mysql
MYSQL_HOST = "127.0.0.1"
MYSQL_PORT = ""
MYSQL_USER = "xxx"
MYSQL_PASSWD = "xxx"
MYSQL_DB = "xxx" # tadp主库
MYSQL_DB_01 = "xxx" # 获取用户userid
MYSQL_CHARSET = "utf8"
# 定义工单链接前半部分
project_path = "https://www.tapd.cn/xxx/bugtrace/bugs/view?bug_id="
class Tapd_Remind_Robot():
# 初始化必传的url
def __init__(self, url):
self.url = url
# SSH跳板机链接MySQL数据库-tadp主库
def connect_mysql(self):
server = SSHTunnelForwarder(ssh_address_or_host=(SSH_HOST, SSH_PORT),
ssh_username=SSH_USERNAME,
ssh_password=SSH_PASSWORD,
remote_bind_address=(REMOTE_BIND_HOST, REMOTE_BIND_PORT))
server.start()
conn = pymysql.connect(host='127.0.0.1',
port=server.local_bind_port,
user=MYSQL_USER,
passwd=MYSQL_PASSWD,
db=MYSQL_DB,
charset=MYSQL_CHARSET)
return conn
# SSH跳板机链接MySQL数据库-获取用户userid
def connect_mysql01(self):
server = SSHTunnelForwarder(ssh_address_or_host=(SSH_HOST, SSH_PORT),
ssh_username=SSH_USERNAME,
ssh_password=SSH_PASSWORD,
remote_bind_address=(REMOTE_BIND_HOST, REMOTE_BIND_PORT))
server.start()
conn = pymysql.connect(host='127.0.0.1',
port=server.local_bind_port,
user=MYSQL_USER,
passwd=MYSQL_PASSWD,
db=MYSQL_DB_01,
charset=MYSQL_CHARSET)
return conn
# 执行SQL命令-tapd库
def execute_sql(self, sql):
db = self.connect_mysql()
cursor = db.cursor()
try:
cursor.execute(sql)
results = cursor.fetchall()
return results
except:
print("Error")
db.close()
# 执行SQL命令-db_user_center库
def execute_sql01(self, sql):
db = self.connect_mysql01()
cursor = db.cursor()
try:
cursor.execute(sql)
results = cursor.fetchall()
return results
except:
print("Error")
db.close()
# 推送消息
def post_bot(self, url, data):
headers = {'Content-Type': 'application/json'}
response = requests.post(url=url, headers=headers, data=json.dumps(data))
# 后续可能有用
return response
# 定义tapd_name转化为userid方法
def find_userid(self, pending_list_01):
# 不以;结尾特殊的元素末尾添加;
list0 = []
for j in pending_list_01:
if j.endswith(';'):
list0.append(j)
else:
j = j + ';'
list0.append(j)
# print("处理不以;结尾的元素后:", list0)
# 第一次去重
pending_list = list(set(list0))
# print("第一次去重后的列表为:", pending_list)
# 定义一个字符串存储拼接的用户信息
list1 = ''
for i in pending_list:
list1 += i
# print("拼接用户信息后的列表:", list1)
# 去除字符最后的字符";"
list2 = list1.strip(';')
# print("字符串去除尾部标点为:", list2)
# 切割
list3 = list2.split(";")
# print("字符串分割后:", list3)
# 第二次去重
list4 = list(set(list3))
# print("第二次去重后的列表为:", list4)
# 强转成元祖后再强转为str拼接在sql
list5 = str(tuple(list4))
# print("列表转为元祖再转为str:",list5)
# 特殊处理元祖只有单个元素的','
if len(list4) == 1:
list6 = []
for i in list5:
list6.append(i)
# print("遍历字符串:", list6)
# 删除倒数第二个元素','
del list6[-2]
# print('处理,后:', list6)
# 重新合并
list7 = "".join(list6)
# print('重新合并:', str(list7))
# 拼接sql
userid_list = "SELECT a.userid FROM t_admin_user a where a.tapd_name in" + list7 + ";"
print("查询userid的sql为:", userid_list)
else:
# 拼接sql
userid_list = "SELECT a.userid FROM t_admin_user a where a.tapd_name in" + list5 + ";"
print("查询userid的sql为:", userid_list)
userid_list01 = self.execute_sql01(userid_list)
# print("处理人userid元祖为:", userid_list01)
# 定义一个列表存储遍历元祖的值
list8 = []
for e in userid_list01:
list8.append(e[0]) # 只查询子元祖第一个值userid
print('最终处理人列表为:', list8)
return list8
# 定义发送消息主方法
def send_msg_time_out(self, timeout_notice, title, group_new):
# 定义查询语句-有截止时间,取【当前时间 - 截止时间】>=0 算超时
# 定义被删除的部分
deleted = """('xxx')"""
sql01 = "SELECT count(0) FROM(SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( deadline, '%Y-%m-%d'), CURDATE() ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题' AND SUBSTR(a.id,1,12)=114249027600 AND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline !='0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND a.date >= 0;"
sql02 = "SELECT count(0) FROM(SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( deadline, '%Y-%m-%d'), CURDATE() ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题' AND SUBSTR(a.id,1,12)=xxxAND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline !='0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND a.date >=3 AND a.date <5;"
sql03 = "SELECT count(0) FROM(SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( deadline, '%Y-%m-%d'), CURDATE() ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题' AND SUBSTR(a.id,1,12)=xxxAND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline !='0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND a.date >= 5 AND a.date <7;"
sql04 = "SELECT count(0) FROM(SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( deadline, '%Y-%m-%d'), CURDATE() ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题' AND SUBSTR(a.id,1,12)=xxxAND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline !='0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND a.date >= 7;"
# 定义查询语句-无截止时间,取【当前时间 - 创建时间】>=3 算超时
sql05 = "SELECT count(0) FROM(SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( created, '%Y-%m-%d'), CURDATE() ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题' AND SUBSTR(a.id,1,12)=xxx AND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline ='0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND a.date >= 3 AND a.date <5;"
sql06 = "SELECT count(0) FROM(SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( created, '%Y-%m-%d'), CURDATE() ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题' AND SUBSTR(a.id,1,12)=114249027600 AND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline ='0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND a.date >= 5 AND a.date <7;"
sql07 = "SELECT count(0) FROM(SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( created, '%Y-%m-%d'), CURDATE() ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题' AND SUBSTR(a.id,1,12)=114249027600 AND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline ='0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND a.date >= 7;"
sql08 = "SELECT count(0) FROM(SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( created, '%Y-%m-%d'), CURDATE() ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题' AND SUBSTR(a.id,1,12)=114249027600 AND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline ='0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND a.date >= 3;"
sql09 = "SELECT * FROM((SELECT a.id, a.title AS '工单标题', a.created AS '创建时间', a.current_owner AS '处理人', a.custom_field_two AS '所属小组', a.date AS '超时天数' FROM (SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( deadline, '%Y-%m-%d'), CURDATE()) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题' AND SUBSTR(a.id,1,12)=114249027600 AND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + "AND a.deadline != '0000-00-00 00:00:00' AND a.id NOT IN('xxx','xx','xx') AND a.date >= 0 ) UNION (SELECT a.id, a.title AS '工单标题', a.created AS '创建时间', a.current_owner AS '处理人', a.custom_field_two AS '所属小组', a.date AS '超时天数' FROM (SELECT *, TIMESTAMPDIFF( DAY, DATE_FORMAT( created, '%Y-%m-%d' ), CURDATE( ) ) AS date FROM t_tapd_bug_order ) AS a WHERE a.module = '技术问题' AND SUBSTR(a.id,1,12)=114249027600 AND a.`status` IN ( 'in_progress', 'unconfirmed') AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " AND a.deadline = '0000-00-00 00:00:00' AND a.id NOT IN " + deleted + " AND a.date >= 3 )) AS e ORDER BY e.`超时天数` desc;"
# print("sql01为:%s\nsql02为:%s\nsql03为:%s\nsql04为:%s\nsql05为:%s\nsql06为:%s\nsql06为:%s\nsql08为:%s\nsql09为:%s"%(sql01,sql02,sql03,sql04,sql05,sql06,sql07,sql08,sql09))
# 超时定义:
# 如果填写了截止时间或者截止时间不等于00: 00:00 - ---则取当前时间 - 截止时间 >= 0算超时
# 否则没填写截止时间,则用当前时间 - 创建时间 >= 3才算超时
# 获取超时工单总数/各天数,数据库返回一个元祖且元素也是元祖,通过下标取2层值
# 执行后相加处理
bug_count = self.execute_sql(sql01)[0][0] + self.execute_sql(sql08)[0][0]
bug_03 = self.execute_sql(sql02)[0][0] + self.execute_sql(sql05)[0][0]
bug_05 = self.execute_sql(sql03)[0][0] + self.execute_sql(sql06)[0][0]
bug_07 = self.execute_sql(sql04)[0][0] + self.execute_sql(sql07)[0][0]
bug_all = self.execute_sql(sql09)
print("全部超时个数为:", bug_count)
# print("超时3天个数为:", bug_03)
# print("超时5天个数为:", bug_05)
# print("超时7天个数为:", bug_07)
# print("sql09为:",sql09)
# print("bug_all为:", bug_all)
# 推送的报文 %d格式化数值,%s格式化字符,多个值用%(a,b,c)
# 定义汇总部分
head = """>【%s】\n超时工单数量汇总:<font color = "warning">%d单</font>\n>超时三天: <font color = "comment">%d单</font>\n>超时五天: <font color = "comment">%d单</font>\n>超时七天: <font color = "comment">%d单</font>""" % (
title, bug_count, bug_03, bug_05, bug_07)
# 定义详细清单-压缩前
body = ''
pending_list_01 = []
for i in bug_all:
# 将处理人遍历存储至新列表pending_list_01
pending_list_01.append(i[3])
body01 = """\n\n>工单标题:<font color="comment">%s</font>\n>工单链接:[%s%s](%s%s)\n>创建时间:<font color="comment">%s</font>\n>处理人:<font color="comment">%s</font>\n>所属小组:<font color="comment">%s</font>\n>超时天数:<font color="warning">%s天</font>""" % (
i[1], project_path, i[0], project_path, i[0], i[2], i[3], i[4], i[5])
# 前后加空格换行,+=遍历叠加
body += '\n\n' + body01 + '\n\n'
# print("处理人列表为:", pending_list_01)
# 定义请求报文,将2部分字符串拼接
markdown = {
"msgtype": "markdown",
"markdown": {
"content": head + body
}
}
# 定义详细清单-压缩后(因为接口content字节数不能大于4096,所以需要压缩)
body_short = ''
pending_list_01 = []
for i in bug_all:
# 将处理人遍历存储至新列表pending_list_01
pending_list_01.append(i[3])
body_short_01 = """\n\n>工单标题:<font color="comment">%s</font>\n>处理人:<font color="comment">%s</font>\n>超时天数:<font color="warning">%s天</font>""" % (
i[1], i[3], i[5])
# 前后加空格换行,+=遍历叠加
body_short += '\n\n' + body_short_01 + '\n\n'
# print("处理人列表为:", pending_list_01)
# 定义请求报文,将2部分字符串拼接
markdown_short = {
"msgtype": "markdown",
"markdown": {
"content": head + body_short
}
}
# 获取content字节数,注意采用 UTF-8 编码方式,先
# 使用 encode() 方法,将字符串进行编码后再获取它的字节数
count_byte = len((head + body).encode())
short_count_byte = len((head + body_short).encode())
print("压缩前content字节数为:", count_byte)
print("压缩后content字节数为:", short_count_byte)
# 添加判断,超时工单数为0且大于4096字节则不执行
if bug_count != 0:
if count_byte <= 4096:
# print("超时工单汇总明细为:",markdown)
resp01 = self.post_bot(url=self.url, data=markdown)
# print("超时工单汇总明细-响应为:", resp01.text)
assert resp01.text.__contains__("""errmsg":"ok""")
print("超时工单汇总明细-发送成功")
# 定义@处理人的消息
notice = {
"msgtype": "text",
"text": {
"content": timeout_notice,
"mentioned_list": self.find_userid(pending_list_01) # 调用用户userid转化方法
}
}
resp02 = self.post_bot(url=self.url, data=notice)
print("通知处理人消息为:", notice)
# print("通知处理人消息-响应为", resp02.text)
assert resp02.text.__contains__("""errmsg":"ok""")
print("通知处理人-发送成功")
elif short_count_byte <= 4096:
# print("超时工单汇总明细为:", markdown_short)
resp01 = self.post_bot(self.url, markdown_short)
# print("超时工单汇总明细-响应为:", resp01.text)
assert resp01.text.__contains__("""errmsg":"ok""")
print("超时工单汇总明细-发送成功")
# 定义@处理人的消息
notice = {
"msgtype": "text",
"text": {
"content": timeout_notice,
"mentioned_list": self.find_userid(pending_list_01) # 调用用户userid转化方法
}
}
print("通知处理人消息为:", notice)
resp02 = self.post_bot(self.url, notice)
# print("通知处理人消息-响应为:", resp02.text)
assert resp02.text.__contains__("""errmsg":"ok""")
print("通知处理人-发送成功")
else:
# print("压缩前-超时工单汇总明细为:",markdown)
# print("压缩后-超时工单汇总明细为:", markdown_short)
head = """>\n【%s】\n超时工单数量汇总:<font color = "warning">%d单</font>\n>超时三天: <font color = "comment">%d单</font>\n>超时五天: <font color = "comment">%d单</font>\n>超时七天: <font color = "comment">%d单</font>\n\n""" % (
title, bug_count, bug_03, bug_05, bug_07)
body_short = ''
top = '\n【超时工单Top10】\n'
pending_list_01 = []
# 压缩后还是大于4098则只取top10
for i in bug_all[:10]:
# 将处理人遍历存储至新列表pending_list_01
pending_list_01.append(i[3])
body_short_01 = """>工单标题:<font color="comment">%s</font>\n>处理人:<font color="comment">%s</font>\n>超时天数:<font color="warning">%s天</font>""" % (
i[1], i[3], i[5])
# 前后加空格换行,+=遍历叠加
body_short += '\n\n' + body_short_01 + '\n\n'
print("前10处理人列表为:", pending_list_01)
short_count_byte = len((head + top + body_short).encode())
print('压缩后前10条长度为:', short_count_byte)
# 定义请求报文,将2部分字符串拼接
markdown_short = {
"msgtype": "markdown",
"markdown": {
"content": head + top + body_short
}
}
# 定义@处理人的消息
notice = {
"msgtype": "text",
"text": {
"content": timeout_notice,
"mentioned_list": self.find_userid(pending_list_01) # 调用用户userid转化方法
}
}
resp01 = self.post_bot(self.url, markdown_short)
# print("超时工单汇总明细-响应为:", resp01.text)
assert resp01.text.__contains__("""errmsg":"ok""")
print("超时工单汇总明细-发送成功")
print("通知处理人消息为:", notice)
resp02 = self.post_bot(self.url, notice)
# print("通知处理人消息-响应为:", resp02.text)
assert resp02.text.__contains__("""errmsg":"ok""")
print("通知处理人-发送成功")
# logging.error("字节数为%s,markdown内容最长不超过4096个字节" % short_count_byte)
else:
logging.error("暂无超时记录")
def send_msg_real_time(self, realtime_notice, title, group_new):
# 定义查询语句
realtime_count_sql = "SELECT COUNT(0) from(SELECT min(b.created),b.bug_id FROM t_tapd_bug_order a, t_tapd_bug_order_change b WHERE a.id = b.bug_id AND b.bug_id IN(SELECT b.bug_id FROM t_tapd_bug_order_change b WHERE b.created >= DATE_SUB( now(), INTERVAL 15 MINUTE ) AND SUBSTR( b.bug_id, 1, 12 ) = xxx AND b.old_value = 'feedback' AND b.new_value = 'unconfirmed') AND DATE_FORMAT( a.created, '%Y-%m-%d' ) = date_sub( curdate( ), INTERVAL 0 DAY ) AND a.module = '技术问题' AND SUBSTR( a.id, 1, 12 ) = xxx AND a.`status` = 'unconfirmed' AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " GROUP BY b.bug_id ORDER BY b.bug_id) AS c;"
realtime_list_sql = "SELECT a.id, a.title AS '工单标题', a.custom_field_four AS '店铺', a.bugtype AS '类型', a.`status` AS '英文状态',( CASE WHEN a.`status` = 'feedback' THEN '待核实' WHEN a.`status` = 'acknowledged' THEN '已核实' WHEN a.`status` = 'unconfirmed' THEN '待评估' END) AS '状态', a.current_owner AS '处理人', a.custom_field_two AS '所属小组' FROM t_tapd_bug_order a, t_tapd_bug_order_change b WHERE a.id = b.bug_id AND b.created IN ( SELECT min( b.created ) FROM t_tapd_bug_order a, t_tapd_bug_order_change b WHERE a.id = b.bug_id AND b.bug_id IN ( SELECT DISTINCT b.bug_id FROM t_tapd_bug_order_change b WHERE b.created >= DATE_SUB( now( ), INTERVAL 15 MINUTE ) AND SUBSTR( b.bug_id, 1, 12 ) = xxx AND b.old_value = 'feedback' AND b.new_value = 'unconfirmed' ) AND DATE_FORMAT( a.created, '%Y-%m-%d' ) = date_sub( curdate( ), INTERVAL 0 DAY ) AND a.module = '技术问题' AND SUBSTR( a.id, 1, 12 ) = xxx AND a.`status` = 'unconfirmed' AND SUBSTR(a.custom_field_two,1,6) = " + group_new + " GROUP BY b.bug_id ORDER BY b.bug_id );"
print("第一个sql为:%s\n第二个sql为:%s" % (realtime_count_sql, realtime_list_sql))
# 执行查询语句
realtime_bug_count = self.execute_sql(realtime_count_sql)[0][0]
realtime_bug_detail = self.execute_sql(realtime_list_sql)
print("新增工单数为:", realtime_bug_count)
# 推送的报文 %d格式化数值,%s格式化字符,多个值用%(a,b,c)
# 定义汇总部分
head = """>【%s】\n当前新增技术工单数:<font color = "warning">%d单</font>""" % (title, realtime_bug_count)
# 定义详细清单-压缩前
body = ''
pending_list_01 = []
for i in realtime_bug_detail:
pending_list_01.append(i[6])
# 链接需要[链接名称](http://www....)
body01 = """\n\n>工单标题:<font color="comment">%s</font>\n>工单链接:[%s%s](%s%s)\n>店铺:<font color="comment">%s</font>\n>类型:<font color="comment">%s</font>\n>状态:<font color="comment">%s</font>\n>处理人:<font color="comment">%s</font>\n>所属小组:<font color="comment">%s</font>""" % (
i[1], project_path, i[0], project_path, i[0], i[2], i[3], i[5], i[6], i[7])
# 前后加空格换行,+=遍历叠加
body += '\n\n' + body01 + '\n\n'
# print("处理人列表为:", pending_list_01)
# 定义请求报文,将2部分字符串拼接
markdown = {
"msgtype": "markdown",
"markdown": {
"content": head + body
}
}
# 定义详细清单-压缩后
body_short = ''
pending_list_01 = []
for i in realtime_bug_detail:
pending_list_01.append(i[6])
# 链接需要[链接名称](http://www....)
body_short_01 = """\n\n>工单标题:<font color="comment">%s</font>\n>工单链接:[%s%s](%s%s)\n>处理人:<font color="comment">%s</font>""" % (
i[1], project_path, i[0], project_path, i[0], i[6])
# 前后加空格换行,+=遍历叠加
body_short += '\n\n' + body_short_01 + '\n\n'
markdown_short = {
"msgtype": "markdown",
"markdown": {
"content": head + body_short
}
}
# 获取content字节数,注意采用 UTF-8 编码方式,先
# 使用 encode() 方法,将字符串进行编码后再获取它的字节数
count_byte = len((head + body).encode())
short_count_byte = len((head + body_short).encode())
# print("content字节数为:", count_byte)
# 添加判断,实时工单数为0且大于4096字节则不执行
if realtime_bug_count != 0:
# 定义@处理人的消息
notice = {
"msgtype": "text",
"text": {
"content": realtime_notice,
"mentioned_list": self.find_userid(pending_list_01) # 调用用户userid转化方法
}
}
if count_byte <= 4096:
# print("新工单汇总明细为:",markdown)
resp01 = self.post_bot(self.url, markdown)
# print("新工单汇总明细-响应为:", resp01.text)
assert resp01.text.__contains__("""errmsg":"ok""")
print("新工单汇总明细-发送成功")
print("通知处理人消息为:", notice)
resp02 = self.post_bot(self.url, notice)
# print("通知处理人消息-响应为:", resp02.text)
assert resp02.text.__contains__("""errmsg":"ok""")
print("通知处理人-发送成功")
elif short_count_byte <= 4096:
# print("新工单汇总明细为:", markdown_short)
resp01 = self.post_bot(self.url, markdown_short)
# print("新工单汇总明细-响应为:", resp01.text)
assert resp01.text.__contains__("""errmsg":"ok""")
print("新工单汇总明细-发送成功")
print("通知处理人消息为:", notice)
resp02 = self.post_bot(self.url, notice)
# print("通知处理人消息-响应为:", resp02.text)
assert resp02.text.__contains__("""errmsg":"ok""")
print("通知处理人-发送成功")
else:
logging.error("字节数为%s,markdown内容最长不超过4096个字节" % short_count_byte)
else:
logging.error("暂无新记录")
# 定义产研自动转化分组方法
def group_transform(self, group):
group_list=['xx','xx','xx']
if group in group_list:
group = "'" + group + "产品中心'"
else:
logging.error("请填写正确的产研名称")
return group
@click.command()
@click.option('-rtu', '--realtime_url', required=False, help='定义实时工单机器人webhook,为url格式,如 https://qyapi.weixin.qq.com')
@click.option('-rtn', '--realtime_notice', required=False, default='请处理人抓紧处理新工单!', type=str, help='定义实时工单处理人的提醒文案')
@click.option('-tou', '--timeout_url', required=False, help='定义超时工单机器人webhook,为url格式,如 https://qyapi.weixin.qq.com')
@click.option('-ton', '--timeout_notice', required=False, default='请处理人抓紧处理超时工单!', type=str, help='定义超时工单处理人的提醒文案')
@click.option('-t', '--title', required=False, default='数据产品中心', type=str, help='定义标题,如【数据产品中心】')
@click.option('-g', '--group', required=False, default='数据', type=str,
help="定义处理人所在产研,如基础或者9,映射关系为:1-直播 2-营销 3-xxxx 4-xxx ")
# 定义机器人运行方法
# 实时机器人示例:python test/py_interface_automation/tapd_remind_robot.py -rtu https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=x -rtn 请处理xx人抓紧处理新工单! -t xx产品中心 -g xx
# 超时机器人示例:python test/py_interface_automation/tapd_remind_robot.py -tou https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx -ton 请处理人抓紧处理超时工单!-t xx产品中心 -g xx
def run_remind(timeout_url, realtime_url, timeout_notice='', realtime_notice='', title='', group=''):
# 添加判断超时机器人url对象存在且实时机器人url对象不存在
if timeout_url is not None and realtime_url is None:
print("url为:%s\n提醒信息为:%s\n标题为:%s\n处理分组前:%s" % (timeout_url, timeout_notice, title, group))
# 实例化
trr = Tapd_Remind_Robot(timeout_url)
# 调用发送超时工单提醒方法
group_new = trr.group_transform(group)
print("处理分组后:", group_new)
trr.send_msg_time_out(timeout_notice, title, group_new)
elif realtime_url is not None and timeout_url is None:
print("url为:%s\n提醒信息为:%s\n标题为:%s\n处理分组前:%s" % (realtime_url, realtime_notice, title, group))
# 实例化
trr = Tapd_Remind_Robot(realtime_url)
# 调用发送实时工单提醒方法
group_new = trr.group_transform(group)
print("处理分组后:", group_new)
trr.send_msg_real_time(realtime_notice, title, group_new)
else:
logging.error("请录入正确的url")
if __name__ == '__main__':
# 调用主运行方法
run_remind()```