前面有文章介绍过使用 django-crontab 和 apscheduler 模块管理Django本身应用的定时任务。
注意这里说的是Django本身应用的定时任务,也就是说定时任务本身是Django应用中的对应的函数功能。
这里也大概总结下 django-crontab 和 apscheduler 优缺点供大家参考
1、 Django crontab 的优点:
集成度高:Django crontab 是 Django 框架的一部分,与 Django 应用无缝集成,使用起来非常方便。
简单易用:Django crontab 提供了简洁的语法和接口,可以轻松定义和管理定时任务。
2、Django crontab 的缺点:
- 依赖 Django:Django crontab 需要依赖 Django 框架,如果你的项目不是基于 Django,就无法使用该库。
- 单进程模型:Django crontab 默认使用单进程模型,即每个定时任务在单独的进程中执行,可能会对系统资源造成一定压力。
3、APScheduler 的优点:
独立性:APScheduler 是一个独立的任务调度库,可以与任何 Python 项目集成,不依赖于特定的框架。
灵活性:APScheduler 提供了多种调度器和触发器的选择,可以根据不同的需求选择最合适的调度策略。
多进程支持:APScheduler 支持多进程模型,可以通过配置来控制任务的并发执行,提高任务的执行效率。
4、APScheduler 的缺点:
- 配置复杂:相比 Django crontab,APScheduler 的配置相对复杂一些,需要更多的配置项来定义定时任务。
背景介绍
不同于 django-crontab 和 apscheduler 管理Django应用本身的定时任务。
这里介绍的是类似Java的 xxl-job 管理不同Java项目的Task定时任务。
通过调用ansible api 来实现管理分发crontab任务到Linux指定单台或者多台主机。
简单理解就是 ansible cron 模块的可视化界面版本
技术栈
Django及其信号signal机制
Django celery异步
Django logging 日志
Ansible API 这里推荐使用 ansible-runner
实现逻辑
新增crontab模型,然后注册到Django Admin后台,新增、修改、删除(逻辑删除)时触发 post_save
信号,然后在信号中触发celery
任务Task,在Task中调用 ansible-runner的playbook接口推进 crontab 记录到远程主机
1、模型设计
from django.db import models
from django.contrib.auth import get_user_model
User = get_user_model()
# Create your models here.
class CronJob(models.Model):
name = models.CharField(max_length=128, verbose_name="任务名称")
# 远程主机的用户,也就是crontab任务配置在那个用户下
system_user = models.CharField(max_length=16, default="root", verbose_name="系统用户")
command = models.CharField(max_length=512, verbose_name="命令")
# 多个主机逗号分隔
execute_host = models.CharField(max_length=512, verbose_name="执行主机")
# 可以为空,为空就是给指定system_user下创建crontab,不为空则在 /etc/cron.d/下新增 cron_file只能名称的crontab
# Will not manage /etc/crontab via cron_file, see documentation
# https://docs.ansible.com/ansible/latest/collections/ansible/builtin/cron_module.html
cron_file = models.CharField(max_length=512, default="job-from-devops", verbose_name="cron文件名")
# True 代表 present, False 代表 absent
status = models.BooleanField(default=True, verbose_name="是有有效")
# crontab任务的时间规则
weekday = models.CharField(max_length=16, default="*", verbose_name="crontab周")
month = models.CharField(max_length=16, default="*", verbose_name="crontab月")
day = models.CharField(max_length=16, default="*", verbose_name="crontab天")
hour = models.CharField(max_length=16, default="*", verbose_name="crontab小时")
minute = models.CharField(max_length=16, default="*", verbose_name="crontab分钟")
# 逻辑删除
is_deleted = models.BooleanField(default=False, verbose_name="是否删除")
# 扩展信息
creator = models.ForeignKey(User, on_delete=models.SET_DEFAULT, default="admin", verbose_name="创建者")
create_time = models.DateTimeField(auto_now_add=True, verbose_name="创建时间")
update_time = models.DateTimeField(auto_now=True, verbose_name="更新时间")
def __str__(self):
return self.name
class Meta:
verbose_name = 'CronJob'
verbose_name_plural = verbose_name
2、celery task执行ansible-playbook
先给出核心代码
@shared_task
def task_cron(id, update_time):
# 获取crontab 记录
cron_obj = Job.objects.get(id=id)
if cron_obj.status:
state = 'present'
else:
state = 'absent'
cron_obj.command = mark_safe(cron_obj.command)
# 渲染 Ansible playbook 配置文件模板
adhoc_template = 'jobs/adhoc_crontab_template.yml'
ansible_config = render_to_string(adhoc_template, {'cron_obj': cron_obj, 'state': state})
# 将 Ansible 配置文件保存到临时文件
ansible_config_file = '/tmp/ansible/xxx.yml'
with open(ansible_config_file, 'w') as f:
f.write(ansible_config)
# 使用 Ansible Runner 运行 Ansible playbook
os.environ['ANSIBLE_REMOTE_USER'] = getattr(settings, 'ANSIBLE_REMOTE_USER', 'xxxx')
os.environ['ANSIBLE_PRIVATE_KEY_FILE'] = getattr(settings, 'ANSIBLE_PRIVATE_KEY_FILE', '/home/xxxx/.ssh/id_rsa')
private_data_dir = getattr(settings, 'ANSIBLE_PRIVATE_DATA_DIR', '/tmp/ansible')
inventory_file = getattr(settings, 'ANSIBLE_INVENTORY_FILE', '/etc/ansible/inventory')
runner = ansible_runner.run_async(
private_data_dir=private_data_dir,
inventory=inventory_file,
playbook=ansible_config_file
)
return runner[1].stats
这里重点解读下该断代码
1)、函数参数额多了个 update_time 但是实际未用,目的是每次更新都是新的celery task (函数名+参数)
,不然会出现更新了模型但是celery task不执行的情况
2)、使用mark_safe
函数是为了确保crontab command 中如果有 重定向符时不被转义
3)、根据模型记录使用render_to_string转化 ansible 模板为具体的 playbook
4)、通过 os.environ 设定 ansible的系统变量
5)、关于 ansible_runner的使用和介绍可以参考
Python使用ansible-runner模块实现ansible调用学习
3、配置celery和signal
这里就细讲了,感兴趣的可以阅读之前写的相关文档
django使用celery执行异步任务时采用信号实现每个任务日志独立存放(after_setup_logger)
4、配置logging
这里给出配置的logging 模板
# 在日志级别中,有以下几个常见的级别(从低到高):'DEBUG'、'INFO'、'WARNING'、'ERROR' 和 'CRITICAL'。
LOGGING = {
'version': 1,
'disable_existing_loggers': False,#此选项开启表示禁用部分日志,不建议设置为True
'formatters': {
'verbose': {
'format': '%(levelname)s %(asctime)s %(message)s'#日志格式
},
'simple': {
'format': '%(levelname)s %(message)s'
},
},
'filters': {
'require_debug_true': {
'()': 'django.utils.log.RequireDebugTrue',#过滤器,只有当setting的DEBUG = True时生效
},
},
'handlers': {
'console': {
'level': 'DEBUG',
'filters': ['require_debug_true'],
'class': 'logging.StreamHandler',
'formatter': 'verbose'
},
'file': {#重点配置部分
'level': 'INFO',
'class': 'logging.FileHandler',
'filename': 'logs/job.log',#日志保存文件
'formatter': 'verbose'#日志格式,与上边的设置对应选择
}
},
'loggers': {
'jobapp': {#日志记录器
'handlers': ['file'],
'level': 'INFO',
'propagate': True,
}
},
}
项目中 signal.py 中使用logging
from django.db.models.signals import post_save
from django.dispatch import receiver
from jobs.models import Job
from jobs.tasks import task_cron
import logging
# logger = logging.getLogger(__name__)
logger = logging.getLogger('jobapp')
# 监听User模型创建
# Todo: 特殊日志接入到表中,做页面结果展示
@receiver(post_save, sender=Job)
def create_user_profile(sender, instance, created, **kwargs):
if created:
print("Job created", instance.name, instance.create_time)
logger.info(f"Job created {instance.name}; {instance.create_time}")
else:
print("job update", instance.name, instance.update_time)
logger.info(f"job update { instance.name} ; {instance.update_time}")
task_cron.delay(instance.id, instance.update_time)
print(f"after signal execute task_cron {instance.id}")
这里关于crontab 的更变是记录到日志中的。如果想记录到一个日志表中,可以单独定义一个 日志模型,然后通过信号获取到 sender的信息 写入到日志表中去
这里有个问题,就是如果是使用Django自带的后台管理系统,那么
request
中是获取不到当前用户的,这样不管是记录操作的方式,都无法记录“是谁” 进行的操作,无法审计。
关于Django自带Admin后台如何获取当前用户,可以参考之前的文章
实际的代码演示就不操作了,把源码分享给大家,然后自己实际跑一遍更有收获,有问题可以微信公众号联系我
源码地址 dj_cronjobs
具体如何使用参考源码中的 Readme.md 即可。
如果觉得文章对你有用,请不吝点赞 和 关注个人公众号(搜索 全栈运维
或者 DailyJobOps
)