全文共6162字,阅读大约需要10分钟
最近在群里看到一个好玩的消息推送场景,如下图所示,原理是在微信或者企业微信通过调用官方的接口实现每日定时推送消息。今天就带大家来研究下它是怎么实现的。
整个代码会分几个部分来讲解
日志:为了实时监测程序的运行状态,及后期问题排查
天气API详解:会讲述如何调用免费的天气API接口
Python日期处理:Python中日期转换及日期天数的计算
完整的消息推送
1.日志
Python日志记录的代码,可在任何场景下复用,它能够实时监测程序的运行状态,轻松解决测试和问题排查的难题。
注意:log_home
需要改为自己本地路径
_tb_nm = '微信每日推送'
_tb_nm_cn = "微信每日推送"
_service_code = _tb_nm
# 日志目录
log_home = '/home/xusl/log/wx'
# 日志level
log_level = logging.INFO
# 日志打印到控制台
log_to_console = True
log_config = {
'version': 1,
'formatters': {
'generic': {
'format': '%(asctime)s %(levelname)-5.5s [%(name)s:%(lineno)s][%(threadName)s] %(message)s',
},
'simple': {
'format': '%(asctime)s %(levelname)-5.5s %(message)s',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'generic',
},
'file': {
'class': 'logging.FileHandler',
'filename': os.path.join(log_home, 'excel_to_data.log'),
'encoding': 'utf-8',
'formatter': 'generic',
},
},
'root': {
'level': log_level,
'handlers': ['console', 'file', ] if log_to_console else ['file', ],
}
}
logging.config.dictConfig(log_config)
logger = logging.getLogger(__name__)
2.天气API详解
在这里提供一个网站 天气API说明,感谢作者提供了8个天气接口,响应效率高,可以达到不限制次数。关键是免费的,JSON两种方式返回。
接口返回的天气指数数据很全面,如:温度、最高温度、最低温度、风、天气、空气质量指数。
参数只有一个,就是cityId。
比如上海市的cityId是101020100,获取天气的API接口就是http://t.weather.sojson.com/api/weather/city/101020100
访问这个地址,返回的数据如下:
因为返回的结果有近15天的天气,所以要对结果做一个遍历,取出今天对应的天气信息。同时天气更新时间为每天的:3点,8点,13点,19点,所以建议不要凌晨去获取,加上CDN有1个小时的缓存,建议4点,9点,14点,20点后获取。
注意:因为我们的程序是每日推送一次,所以没有对天气结果进行缓存处理,但如果你的程序需要频繁调用天气接口,为了减少对方的CDN加速费用,一定要在代码里加入缓存,API接口是每8小时更新一次,机制是CDN缓存8小时更新一次。
城市数据请在百度网盘下载:
链接: https://pan.baidu.com/s/1JFAwnH2MRLc5OD3hsJZwGQ 提取码: u8sk
3.Python日期处理
考虑到程序中有日期转字符串,字符串转日期,日期相减,所以写了几个方法供大家参考,同时兼顾了国历和农历生日信息的获取,具体如下
import datetime
from time import localtime
def get_now_datetime():
"""
获取当前日期
:return: datetime now
"""
return datetime.datetime.now()
def get_datetime_str(d_date=None, pattern='%Y-%m-%d'):
"""
获取指定日期 字符格式
:param d_date:
:param pattern:
:return:
"""
if not d_date:
d_date = get_now_datetime()
return datetime.datetime.strftime(d_date, pattern)
def parse_str2date(s_date, pattern='%Y-%m-%d'):
"""
将字符串转换为日期格式
:param s_date:
:param pattern:
:return:
"""
return datetime.datetime.strptime(s_date, pattern)
def get_birthday(config, year, today_dt):
"""
获取距离下次生日的时间
:return:
"""
logger.info('获取距离下次生日的时间...................')
birthday = config["birth_day"] # 获取生日日期
birthday_year = birthday.split("-")[0] # 2023 or r2023
# 将str日期转换为日期型
# d_birthday = datetime.datetime.strptime(birthday, "%Y-%m-%d")
# 判断是否为农历生日
if birthday_year[0] == "r":
# 获取农历生日的今年对应的月和日
try:
r_mouth = int(birthday.split("-")[1])
r_day = int(birthday.split("-")[2])
nl_birthday = ZhDate(year, r_mouth, r_day).to_datetime().date()
except TypeError:
logger.error("请检查生日的日子是否在今年存在")
# 调用系统命令行执行 pause 命令,目的是在控制台窗口显示 "请按任意键继续. . ." 的提示信息,并等待用户按下任意键后继续执行程序
# os.system("pause")
sys.exit(1) # 异常退出
birthday_month = nl_birthday.month
birthday_day = nl_birthday.day
# 今年生日
year_date = datetime.date(int(year), birthday_month, birthday_day)
else:
# 获取国历生日的今年对应月和日
birthday_month = int(birthday.split("-")[1])
birthday_day = int(birthday.split("-")[2])
# 获取国历生日的今年对应月和日
year_date = datetime.date(int(year), birthday_month, birthday_day)
# 计算生日年份,如果还没过,按当年减,如果过了需要+1
year_date = get_datetime_str(year_date)
if today_dt > year_date:
if birthday_year[0] == "r":
r_mouth = int(birthday.split("-")[1])
r_day = int(birthday.split("-")[2])
# 获取农历明年生日的月和日
r_last_birthday = ZhDate((int(year) + 1), r_mouth, r_day).to_datetime().date()
birth_date = datetime.date((int(year) + 1), r_last_birthday.month, r_last_birthday.day)
print(type(birth_date))
else:
# 获取国历明年生日的月和日
birth_date = datetime.date((int(year) + 1), birthday_month, birthday_day)
str_birth_date = get_datetime_str(birth_date)
birth_day = (datetime.datetime.strptime(str_birth_date, "%Y-%m-%d").date() - datetime.datetime.strptime(today_dt, "%Y-%m-%d").date()).days
elif today_dt == year_date:
birth_day = 0
else:
birth_day = (datetime.datetime.strptime(year_date, "%Y-%m-%d").date() - datetime.datetime.strptime(today_dt, "%Y-%m-%d").date()).days
if birth_day == 0:
birthday_data = "生日快乐,祝福你无事绊心弦,所念皆如愿。"
else:
birthday_data = "平安喜乐,得偿所愿。"
return birth_day, birthday_data
*注:生日当天的文案可根据自己的风格修改😂
4.完整的消息推送脚本
整个消息推送脚本分两个文件,分别是配置信息和python脚本,开始之前我们要准备一下配置信息。
-
申请相关的信息
app_id
、app_secret
、template_id
、user
去微信申请,微信公众平台接口测试账号申请:https://mp.weixin.qq.com/debug/cgi-bin/sandbox?t=sandbox/login,
-
如下图所示,登录后会有对应的
app_id
、app_secret
-
然后扫码关注,就能看到对应的微信号信息,这里的微信号是放在配置文件的
user
的
-
接着是最重要的一步,写入下方的模板信息,模板ID即为我们
template_id
✌美好的一天开始啦(´⊙ω⊙`)
☕今天是:{{date.DATA}}
⏩下面开始为你播报{{city_nm.DATA}}的天气
⛅今天的天气:{{weather.DATA}}
☀最高:{{max_temperature.DATA}}
❄最低:{{min_temperature.DATA}}
❤❤❤❤❤:{{glowing_terms.DATA}}
⏳距离宝贝的生日:{{birth_day.DATA}} 天
⭐⭐⭐:{{birthday_data.DATA}}
- 最后修改我们配置信息
config.txt
和wx_message.py
。
config.txt
{
# 微信公众号配置
"app_id": "xxx",
"app_secret": "xxx",
"template_id": "xxx",
"user": ["xxx"], # 接收消息的微信号,多个微信用英文逗号间隔,例如["wx1", "wx2"]
# 信息配置
"city_id": "101020100",
"city_nm": "上海市",
"birth_day":"2023-08-22", # 生日若为农历在最前面加上r即可
}
wx_message.py
# Created on 2024/1/19
# @title: '微信公众号发送消息'
# @author: Xusl
import logging.config
import random
import datetime
import sys, json
import os
import requests
from time import localtime
from requests import get, post
from zhdate import ZhDate
_tb_nm = '微信每日推送'
_tb_nm_cn = "微信每日推送"
_service_code = _tb_nm
# 日志目录
log_home = '/home/xusl/log/wx'
# 日志level
log_level = logging.INFO
# 日志打印到控制台
log_to_console = True
log_config = {
'version': 1,
'formatters': {
'generic': {
'format': '%(asctime)s %(levelname)-5.5s [%(name)s:%(lineno)s][%(threadName)s] %(message)s',
},
'simple': {
'format': '%(asctime)s %(levelname)-5.5s %(message)s',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'generic',
},
'file': {
'class': 'logging.FileHandler',
'filename': os.path.join(log_home, _tb_nm + '.log'),
'encoding': 'utf-8',
'formatter': 'generic',
},
},
'root': {
'level': log_level,
'handlers': ['console', 'file', ] if log_to_console else ['file', ],
}
}
logging.config.dictConfig(log_config)
logger = logging.getLogger(_tb_nm)
# 每日一言
lines = [
"会好,迟早。",
"生命几许,遵从自己,别赶路,感受路。",
"去爱具体的生活。",
"拐个弯,与生活和解,得失都随意。",
"不要预知明天的烦恼。",
"后来重闻往事如耳旁过风,不慌不忙。",
"勇敢的人先享受世界。",
"玫瑰不用长高,晚霞自会俯腰,爱意随风奔跑,温柔漫过山腰。",
"春风得意马蹄疾,一日看尽长安花。",
"你若决定灿烂,山无遮,海无拦。",
"中途下车的人很多,你不必耿耿于怀。",
"内心丰盈者,独行也如众。",
"你记得花,花就不怕枯萎。",
"春日不迟,相逢终有时。",
"日升月落总有黎明。",
"有人等烟雨,有人怪雨急。",
"等风来,不如追风去。",
"真诚永远可贵。",
"喜乐有分享,共度日月长。",
"在过程中追逐意义。"
]
def get_color():
# 获取随机颜色
get_colors = lambda n: list(map(lambda i: "#" + "%06x" % random.randint(0, 0xFFFFFF), range(n)))
color_list = get_colors(100)
return random.choice(color_list)
def get_access_token(config):
logger.info('获取access_token...................')
# appId
app_id = config["app_id"]
# appSecret
app_secret = config["app_secret"]
post_url = ("https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={}&secret={}"
.format(app_id, app_secret))
try:
access_token = get(post_url).json()['access_token']
except KeyError:
logging.error("获取access_token失败,请检查app_id和app_secret是否正确")
sys.exit(1)
return access_token
def get_now_datetime():
"""
获取当前日期
:return: datetime now
"""
return datetime.datetime.now()
def get_datetime_str(d_date=None, pattern='%Y-%m-%d'):
"""
获取指定日期 字符格式
:param d_date:
:param pattern:
:return:
"""
if not d_date:
d_date = get_now_datetime()
return datetime.datetime.strftime(d_date, pattern)
def parse_str2date(s_date, pattern='%Y-%m-%d'):
"""
将字符串转换为日期格式
:param s_date:
:param pattern:
:return:
"""
return datetime.datetime.strptime(s_date, pattern)
def get_weather_info(config, today_dt):
"""
获取城市当日天气
:return:
"""
logger.info('获取天气...................')
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36'
}
city_id = config["city_id"]
region_url = "http://t.weather.sojson.com/api/weather/city/{}".format(city_id)
response = get(region_url, headers=headers).json()
if response["status"] == 200:
forecast = response["data"]["forecast"]
for item in forecast:
if item["ymd"] == today_dt:
return True, item
else:
logging.error("天气信息获取失败,请检查天气API是否正常")
return False, response["status"]
def get_birthday(config, year, today_dt):
"""
获取距离下次生日的时间
:return:
"""
logger.info('获取距离下次生日的时间...................')
birthday = config["birth_day"] # 获取生日日期
birthday_year = birthday.split("-")[0] # 2023 or r2023
# 将str日期转换为日期型
# d_birthday = datetime.datetime.strptime(birthday, "%Y-%m-%d")
# 判断是否为农历生日
if birthday_year[0] == "r":
# 获取农历生日的今年对应的月和日
try:
r_mouth = int(birthday.split("-")[1])
r_day = int(birthday.split("-")[2])
nl_birthday = ZhDate(year, r_mouth, r_day).to_datetime().date()
except TypeError:
logger.error("请检查生日的日子是否在今年存在")
# 调用系统命令行执行 pause 命令,目的是在控制台窗口显示 "请按任意键继续. . ." 的提示信息,并等待用户按下任意键后继续执行程序
# os.system("pause")
sys.exit(1) # 异常退出
birthday_month = nl_birthday.month
birthday_day = nl_birthday.day
# 今年生日
year_date = datetime.date(int(year), birthday_month, birthday_day)
else:
# 获取国历生日的今年对应月和日
birthday_month = int(birthday.split("-")[1])
birthday_day = int(birthday.split("-")[2])
# 获取国历生日的今年对应月和日
year_date = datetime.date(int(year), birthday_month, birthday_day)
# 计算生日年份,如果还没过,按当年减,如果过了需要+1
year_date = get_datetime_str(year_date)
if today_dt > year_date:
if birthday_year[0] == "r":
r_mouth = int(birthday.split("-")[1])
r_day = int(birthday.split("-")[2])
# 获取农历明年生日的月和日
r_last_birthday = ZhDate((int(year) + 1), r_mouth, r_day).to_datetime().date()
birth_date = datetime.date((int(year) + 1), r_last_birthday.month, r_last_birthday.day)
print(type(birth_date))
else:
# 获取国历明年生日的月和日
birth_date = datetime.date((int(year) + 1), birthday_month, birthday_day)
str_birth_date = get_datetime_str(birth_date)
birth_day = (datetime.datetime.strptime(str_birth_date, "%Y-%m-%d").date() - datetime.datetime.strptime(today_dt, "%Y-%m-%d").date()).days
elif today_dt == year_date:
birth_day = 0
else:
birth_day = (datetime.datetime.strptime(year_date, "%Y-%m-%d").date() - datetime.datetime.strptime(today_dt, "%Y-%m-%d").date()).days
if birth_day == 0:
birthday_data = "生日快乐,祝福你无事绊心弦,所念皆如愿。"
else:
birthday_data = "平安喜乐,得偿所愿。"
return birth_day, birthday_data
def get_image_url():
url = "https://cn.bing.com/HPImageArchive.aspx?format=js&idx=0&n=1"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
'Content-type': 'application/x-www-form-urlencoded'
}
r = requests.get(url, headers=headers, verify=False)
r.encoding = 'UTF-8-sig'
image_url = "https://cn.bing.com" + json.loads(r.text)["images"][0]["url"]
return image_url
def send_message(to_user, access_token, template_id, result, city_nm, birth_day, birthday_data):
"""
发送微信通知
:param to_user:
:param access_token:
:param template_id:
:param result:
:param city_nm:
:param birth_day:
:param birthday_data:
:return:
"""
logger.info('发送微信通知...................')
weather = result["type"] # 天气
max_temperature = result["high"] # 高温
min_temperature = result["low"] # 低温
glowing_terms = random.choice(lines) # 每日一言
url = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={}".format(access_token)
week_list = ["星期日", "星期一", "星期二", "星期三", "星期四", "星期五", "星期六"]
year = localtime().tm_year # 年 2024
month = localtime().tm_mon # 月 1
day = localtime().tm_mday # 日 19
today = datetime.date(year=year, month=month, day=day)
logging.info('today:%s ' % today)
week = week_list[today.isoweekday() % 7]
logging.info('week:%s ' % week)
logging.info('城市:{},天气:{},高温:{},低温:{}'.format(city_nm, weather, max_temperature, min_temperature))
data = {
"touser": to_user,
"template_id": template_id,
"url": "http://weixin.qq.com/download",
"topcolor": "#FF0000",
"data": {
"date": {
"value": "{} {}".format(today, week),
"color": get_color()
},
"city_nm": {
"value": city_nm,
"color": get_color()
},
"weather": {
"value": weather,
"color": get_color()
},
"max_temperature": {
"value": max_temperature,
"color": get_color()
},
"min_temperature": {
"value": min_temperature,
"color": get_color()
},
"glowing_terms": {
"value": glowing_terms,
"color": get_color()
},
"birth_day": {
"value": birth_day,
"color": get_color()
},
"birthday_data": {
"value": birthday_data,
"color": get_color()
}
}
}
# 推送消息
headers = {
'Content-Type': 'application/json',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36'
}
response = post(url, headers=headers, json=data).json()
if response["errcode"] == 40037:
logger.error("推送消息失败,请检查模板id是否正确")
elif response["errcode"] == 40036:
logger.error("推送消息失败,请检查模板id是否为空")
elif response["errcode"] == 40003:
logger.error("推送消息失败,请检查微信号是否正确")
elif response["errcode"] == 0:
logger.info("推送消息成功")
else:
logger.info(response)
if __name__ == '__main__':
today_dt = get_datetime_str() # 获取当日日期
t_year = today_dt.split("-")[0] # 当年
try:
with open("config.txt", encoding="utf-8") as f:
config = eval(f.read())
access_token = get_access_token(config)
birth_day, birthday_data = get_birthday(config, t_year, today_dt) # 生日祝福语
city_nm = config["city_nm"] # 城市名
# 获取城市当日天气
flag, result = get_weather_info(config, today_dt)
template_id = config["template_id"] # 模板ID
# 接收的用户
to_user = config["user"] # 用户里诶奥
if flag is True:
logging.info(f'天气获取成功 {result}: {str(result)}')
for user in to_user:
send_message(user, access_token, template_id, result, city_nm, birth_day, birthday_data)
else:
logging.error(f'异常 {result}: {str(result)}')
except FileNotFoundError:
logging.error("推送消息失败,请检查config.txt文件是否与程序位于同一路径")
5.结尾
整个程序尽可能在代码里体现了备注信息,仍然有几点美中不足
每日一言部分太少,解决方案可以优化成获取相关网站的数据保证每天不是重复的,或者可以直接扩大词库lines。
-
抬头部分不能自定义修改,最早的想法是改成自己的公众号,每日定时推送,研究发现公众号不能自定义模板,只能从官方的模板里挑选,局限性就太大了。
解决方法可以把代码移植至企业微信,这样抬头支持自定义,换汤不换药,唯一需要更改的就是申请注册企业微信,同时更换为企业微信相关配置信息,如果时间允许,我尽量再出一版企业微信的教程。(ง •̀_•́)ง
最后的定时任务就不再过多详解了,直接使用服务器的
crontab
即可
最后的最后,希望单身的朋友有双向暗恋,早日追到心选,早日心动。希望不单身的朋友彼此珍惜,和对象长久。希望所有人都能拥有爱,所有人都能被爱拥有。