python gpu任务及时调度

在跑深度模型的时候,经常需要和课题组其他同学共用服务器或者多个对比模型需要依次训练,这时候可能需要频繁去查看当前gpu是否满足需求。所以想写一个gpu任务及时调用的脚本,实现目标有三个:

  • 周期性监控gpu显存状态,若满足任务运行条件则运行任务
  • 若任务成功放到gpu上,则邮件通知代码开始执行

周期性监控gpu显存状态

读取gpu显存状态

python中使用pynvml库查询。

#python2
pip install nvidia-ml-py2
#python3
pip install nvidia-ml-py3

读取gpu显存

import pynvml
pynvml.nvmlInit()  # 初始化
handle=pynvml.nvmlDeviceGetHandleByIndex(0)  # 这里参数表示查询的索引
meminfo = pynvml.nvmlDeviceGetMemoryInfo(handle)
print("totol mem: %0.3f G, used mem: %0.3f G, free mem: %0.3f G"%(meminfo.total/1e9, meminfo.used/1e9, meminfo.free/1e9))
deviceCount = pynvml.nvmlDeviceGetCount()
deviceName = pynvml.nvmlDeviceGetName(handle)
print('device count: %d device name: %s'%(deviceCount, deviceName))
pynvml.nvmlShutdown()  # 关闭

周期性执行任务

python 周期性任务执行方式大概有四种,参考Python3-定时任务四种实现方式, 这里使用定时框架: APScheduler实现。
安装apscheduler

pip install apscheduler

调度器使用:

import time
from apscheduler.scheduler.blocking import BlockingScheduler

def func(arg1, arg2):
  pass

def dojob():
  scheduler = BlockingScheduler()  # 创建任务调度器
  scheduler.add_job(func, args=(a1, a2), "interval", seconds=2, id="test_job1)  # 添加任务
  scheduler.start()  # 周期性调度开始

dojob()

所以周期性查询gpu显存只需要将查询任务放入scheduler调用列表中。

定时发送邮件

我们希望任务开始执行时发送提醒邮件,显然只需实现发送邮件服务即可。简单三步,用 Python 发邮件

import smtplib
from email.mime.text import MIMEText
mail_host = 'smtp.163.com'  # 163邮箱的域名
mail_user='casia_zhouzongwei'   # 邮箱用户名
mail_pass = '**********'   # 邮箱密码(有可能是开始SMTP时提供的授权码)
sender='casia_zhouzongwei@163.com'  # 发送邮箱,和上面用户名一致
receivers=['****@163.com']   # 要接收邮件的邮箱
message=MIMEText('context', 'plain', 'utf-8')  # 发送内容,格式,字体
message['Subject']='Title' # 邮件主题
message['From'] = sender
message['To'] = receivers[0] 
try:
    smtpObj = smtplib.SMTP() 
    #连接到服务器
    smtpObj.connect(mail_host,25)
    #登录到服务器
    smtpObj.login(mail_user,mail_pass) 
    #发送
    smtpObj.sendmail(
        sender,receivers,message.as_string()) 
    #退出
    smtpObj.quit() 
    print('success')
except smtplib.SMTPException as e:
    print('error',e) #打印错误

消息提醒的话,显然富文本就足够了,因此后面发送图片和附件用到时再说。

代码

# --*-- coding:utf-8 --*--
import os
import time
import pynvml
import smtplib
from email.mime.text import MIMEText
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

def send_message_by_email(mes):
    mail_host = 'smtp.163.com'
    mail_user = 'casia_zhouzongwei'
    mail_pass = "*******"
    sender = 'casia_zhouzongwei@163.com'
    receivers=['cumtzhouzongwei@163.com']

    message = MIMEText(mes, 'plain', 'utf-8')
    message['Subject'] = "New Task has been move to GPUs"
    message['From'] = sender
    message['To'] = receivers[0]
    try:
        smtpObj = smtplib.SMTP() 
        #连接到服务器
        smtpObj.connect(mail_host,25)
        #登录到服务器
        smtpObj.login(mail_user,mail_pass) 
        #发送
        smtpObj.sendmail(
            sender,receivers,message.as_string()) 
        #退出
        smtpObj.quit() 
        print('state been sent successfully.')
    except smtplib.SMTPException as e:
        print('email error',e) #打印错误

def func(pre_commands, command_file, mem_required=8, gpu_num_need=2):
    """
    :param pre_commands:    任务执行前的处理,比如环境和路径的设置等
    :param command_file:    最终需要执行的指令文件 "*.sh"
    :param men_required:    每张卡最低需要的显存数, 单位G
    :param gpu_num_need:    每个任务需要的gpu卡数
    """
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    # print('do func  {} time :'.format(command_file),ts)

    pynvml.nvmlInit()
    deviceCount = pynvml.nvmlDeviceGetCount()  # 当前机器gpu个数
    available_device = []

    if os.path.exists('tmp.txt'):
        with open('tmp.txt', 'r') as f:
            lines = f.readlines()
            cidx = int(lines[0].strip())
    else:
        cidx = 0
    
    if cidx < len(command_file):
        for i in range(deviceCount):
            handle = pynvml.nvmlDeviceGetHandleByIndex(i)       # 第i张卡
            meminfo = pynvml.nvmlDeviceGetMemoryInfo(handle)    # 读取信息
            freeMem = meminfo.free/1e9                          # 该张卡还剩多少显存
            if freeMem >= mem_required:            # 该张卡可用
                available_device.append(str(i))
            if len(available_device) == gpu_num_need:   # 满足条件,执行
                os.system('{} && CUDA_VISIBLE_DEVICES={} sh {}'.format(pre_commands, ','.join(available_device), command_file[cidx]))
                message = 'task id %d : %s is executed!'%(cidx, command_file[cidx])+' the gpu indices used are %s'%(','.join(available_device))
                send_message_by_email(message)
                with open('tmp.txt','w') as f:
                    f.write(str(cidx+1))
    else:
        print('all tasks are done. please kill this subprocess! time :'.format(command_file),ts)


def dojob():
    with open('tmp.txt','w') as f:
        f.write(str(0))
    pre_commands="cd ~/Documents/subdir "  # 此处修改
    commands = ['job1.sh', 'job2.sh']  # 此处修改
    scheduler = BlockingScheduler()
    job1 = scheduler.add_job(func, 'interval', seconds=10, id='job1', args=(pre_commands, commands,))
    scheduler.start()


dojob()

目前只邮件通知任务执行,可以很容易的通过附件的形式把结果也同步发送。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容