在实际生产环境中,避免不了有很多后台运行的任务和定时任务,对任务状态的监控与及时告警可以尽量减少程序出错时对用户造成的影响。针对常见的两种任务类型:定时任务、守护进程内批处理任务,利用 Node.js child_process 实现了任务状态的监控、重启与邮件告警。
思路
现在的互联网已经不是单机作战的时代了,分布式部署是非常常见的方式,一个项目中的任务可能运行在多台服务器上,我们的监控平台要做到重启某个任务就需要知道任务运行的具体服务器,针对这一个问题我们需要获取到任务与服务器关系的确切信息,所以每台运行任务的服务器需要在启动任务时向任务状态管理平台注册自己的信息。
任务状态的维护依赖于任务运行服务器的心跳上报,每个任务设置一个超时时间,在任务启动时向任务状态管理平台发送开始运行信号,在任务运行结束后向管理平台发送运行完成信号。任务管理平台根据任务设置的超时时间,在超时后仍然没有接收到任务完成信号则判定任务失败,将任务失败信号发送回任务运行的服务器。再有任务运行服务器自行处理,如重启任务或者结束任务等。
根据以上的逻辑,实际需要就是在任务运行的服务器实现一个任务调度功能与 HTTP 服务器用来监听管理平台发送的信号;在管理平台这边实现任务服务器信息注册、任务状态监管与超时告警。文字表述比较晦涩,具体流程可以参考一下的流程图。
实现代码
后续会把关键信息从代码中抽离出来放到配置文件中,然后放到 GitHub 上,暂时以贴代码的形式简单展示一下。
// 任务运行服务器调度系统
'use strict';
// 内建模块
const fork = require('child_process').fork;
const path = require('path');
// 第三方模块
const _ = require('lodash');
const CronJob = require('cron').CronJob;
const bodyParser = require('body-parser');
const express = require('express');
const request = require('request');
const uuid = require('uuid');
class TaskStatusManagementClient {
/**
* 初始化 TaskStatusClient
* @param {Object} taskClientConfig 服务器配置信息
*/
constructor(taskClientConfig) {
this.taskClientConfig = taskClientConfig;
this.taskHomePath = taskClientConfig.taskHomePath;
this.childsInstance = {};
this.crontabJobsInstance = {};
}
/**
* start TaskStatusClient
*/
start() {
this._process();
}
_process() {
let self = this;
// 根据服务器配置信息启动所有任务
for(let taskConfig of self.taskClientConfig.tasks) {
switch(taskConfig.type) {
case 'daemon': {
self._daemonTaskHandler(taskConfig);
break;
}
case 'crontab': {
if(taskConfig.crontabRule) {
self._crontabTaskHanlder(taskConfig);
}
break;
}
default: {
console.log('unknow task type');
break;
}
}
}
// 在程序退出时结束所有子进程任务
process.on('exit', function (code) {
for(let child in self.childsInstance) {
if(self.childsInstance.hasOwnProperty(child)) {
self.childsInstance[child].kill('SIGHUP');
}
}
});
// 启动 HTTP 服务器,监听任务状态监控平台发回的信号
let app = express();
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: false }));
app.post('/', function (req, res) {
let body;
try {
body = typeof req.body !== 'object' ? JSON.parse(req.body) : req.body;
} catch (error) {
return res.status(400).json({ message: 'invalid json' });
}
res.status(200).json({ message: 'ok' });
// 收到任务状态监控平台发回的信号后重启任务
let taskConfig = _.find(self.taskClientConfig.tasks, { name: body.name });
let taskIdentifier = '';
// daemon 类型的任务在 childsInstance 中的 key 是任务名
// crontab 类型的任务在 childsInstance 中的 key 是任务名 + 任务 ID
switch (taskConfig.type) {
case 'daemon': {
taskIdentifier = taskConfig.name;
self.childsInstance[taskIdentifier].kill('SIGHUP');
delete self.childsInstance[taskIdentifier];
if(self.handlers.daemonTaskHandler) {
self.handlers.daemonTaskHandler(taskConfig);
} else {
self._daemonTaskHandler(taskConfig);
}
break;
}
case 'crontab': {
taskIdentifier = taskConfig.name + taskConfig.id;
self.childsInstance[taskIdentifier].kill('SIGHUP');
delete self.childsInstance[taskIdentifier];
if(self.handlers.crontabTaskHandler) {
self.handlers.crontabTaskHandler(taskConfig);
} else {
self._crontabTaskHanlder(taskConfig);
}
break;
}
default: {
console.log('unknow task type');
break;
}
}
});
app.listen(self.taskClientConfig.server.port, function () {
console.log('server start at: ' + self.taskClientConfig.server.port);
self._registerServer(self.taskClientConfig, function (error, result) {
if (error) {
console.log(error);
}
});
});
}
_daemonTaskHandler(taskConfig) {
let self = this;
let taskInfo = {
appName: self.appName,
name: taskConfig.name,
expires: taskConfig.timeout
};
let child = fork(path.join(self.taskHomePath,taskConfig.file));
self.childsInstance[taskInfo.name] = child;
child.on('message', function (message) {
switch (message.signal) {
case 'start': {
taskInfo.id = message.id;
self._startTask(taskInfo, function (error) {
if (error) {
console.log(error);
}
});
break;
}
case 'end': {
taskInfo.id = message.id;
self._endTask(taskInfo, function (error) {
if (error) {
console.log(error)
}
});
break;
}
default: {
console.log('unknow signal');
break;
}
}
});
}
_crontabTaskHanlder(taskConfig) {
let self = this;
let taskInfo = {
appName: self.appName,
name: taskConfig.name,
id: uuid.v4(),
expires: taskConfig.timeout
};
self.crontabJobsInstance[taskInfo.name + taskInfo.id] = new CronJob(taskConfig.crontabRule, function () {
self._startTask(taskInfo, function (error) {
if(error) {
console.log(error);
} else {
let child = fork(path.join(self.taskHomePath, taskConfig.file));
self.childsInstance[taskInfo.name + taskInfo.id] = child;
child.on('exit', function (code) {
// 子进程退出 code 为 0,代表正常退出,这时可以向监控平台发送任务已完成信号
if(code === 0) {
self._endTask(taskInfo, function (error) {
if (error) {
console.log(error);
}
});
}
});
}
});
}, undefined, true);
}
_startTask(taskInfo, callback) {
let requestOptions = {
uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/status/start',
method: 'POST',
timeout: 5000,
form: taskInfo
};
request(requestOptions, function (error, response, body) {
return callback(error, body);
});
}
_endTask(taskInfo, callback) {
let requestOptions = {
uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/status/end',
method: 'POST',
timeout: 5000,
form: taskInfo
};
request(requestOptions, function (error, response, body) {
return callback(error, body);
});
}
_registerServer(taskClientConfig, callback) {
let requestOptions = {
uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/servers',
method: 'POST',
timeout: 5000,
form: taskClientConfig
};
request(requestOptions, function (error, response, body) {
return callback(error, body);
});
}
}
module.exports = TaskStatusManagementClient;
// 调度系统的使用
'use strict';
const config = {
// 监控平台信息,必须
management: {
host: 'http://127.0.0.1',
port: 3000
},
// 当前服务器信息,必须
server: {
host: 'http://127.0.0.1',
port: 3001
},
// 当前服务器任务文件地址(绝对路径),必须
taskHomePath: path.join(__dirname, 'tasks'),
// 任务配置信息,必须
tasks:[{
name: 'exampleTaskOne',
type: 'daemon',
file: 'example_task_one.js',
timeout: 10000
}, {
name: 'exampleTaskTwo',
type: 'crontab',
file: 'example_task_two.js',
crontabRule: '*/20 * * * * *', // 任务类型为 crontab 是此字段为必须
timeout: 10000
}]
};
let taskStatusManagementClient = new TaskStatusManagementClient(config);
taskStatusManagementClient.start();
监控平台 HTTP 服务器比较简单,三个 API,用来将服务器信息、任务开始状态、任务结束状态写入数据库,这里就不在赘述。
// 监控平台任务状态监管
'use strict';
const async = require('async');
const config = require('config');
const mail = require('nodemailer').createTransport({
service: 'your email service',
auth: {
user: 'your email username',
pass: 'your email password'
}
});
const request = require('request');
const logger = require('../log/logger');
const moment = require('../libs/moment');
// 一下三个为数据库操作连接,不需要关注内部代码,不影响代码阅读
const TaskResult = require('../models/task_result');
const TaskStatus = require('../models/task_status');
const TaskServer = require('../models/task_server');
/**
* 发送告警邮件
* @param {String} to 邮件接收者
* @param {String} subject 邮件主题
* @param {String} message 邮件内容
* @param {Function} callback
*/
function sendWarningMessage(to, subject, message, callback) {
var options = {
'from': 'xxx@xxx.xxx',
'to': to,
'subject': subject,
'text': message,
'encoding': 'UTF-8'
};
mail.sendMail(options, function(error) {
console.log('send message to: ' + to);
return callback(error);
});
}
/**
* 处理执行成功任务
* @param {String} task 任务状态对象
* @param {Function} callback
*/
function handleSingleSucceedTask(task, callback) {
TaskResult.increaseTaskSuccessCount(task.name, function (error) {
return callback(error);
});
}
/**
* 处理执行失败的任务
* @param {String} task 任务状态对象
* @param {Function} callback
*/
function handleSingleErrorTask(task, callback) {
async.waterfall([
// 增加任务执行失败次数
async.apply(TaskResult.increaseTaskErrorCount.bind(TaskResult), task.name),
async.apply(TaskResult.getTaskErrorCount.bind(TaskResult), task.name),
function (count, callback) {
callback(undefined);
// 给调度系统发信号
TaskServer.findTaskServerHost(task.name, function (error, serverHost) {
// 超过限制失败次数,发送告警邮件
if (count >= config.get('task.limitErrorCount')) {
sendWarningMessage(config.get('task.noticeUserEmailAddress'), '定时任务告警', `${ serverHost }: "${ task.name }" 执行失败超过预定失败次数`, function (error) {
if (error) {
logger.error(error);
}
});
TaskResult.resetTaskErrorCount(task.name, function (error) {
if (error) {
logger.error(error);
}
});
}
if (!error && serverHost) {
// send 'error' signal to task server
request({
uri: serverHost,
method: 'POST',
timeout: 5000,
form: {
name: task.name,
id: task.taskId,
pid: task.pid
}
}, function (error, response, body) {
if (error) {
logger.error(error);
}
});
}
});
}
], function (error) {
return callback(error);
});
}
/**
* 获取所有已经到达预定超时时间的任务并处理
*/
function process() {
let currentTime = new Date().getTime();
async.waterfall([
async.apply(TaskStatus.getExpiredTasks.bind(TaskStatus), currentTime),
// 并行处理每个任务
function (tasks, callback) {
if (tasks.length > 0) {
async.parallel(tasks.map(function (task) {
return function (callback) {
// 超时后任务状态仍然为 running,代表任务执行失败
if (task.status === 'running') {
handleSingleErrorTask(task, function (error) {
return callback(error);
});
} else {
handleSingleSucceedTask(task, function (error) {
return callback(error);
});
}
}
}), function (error) {
return callback(error);
});
} else {
return callback(new Error('no tasks need to exec'));
}
},
// 删除已经处理完成的任务
async.apply(TaskStatus.removeExpiredTasks.bind(TaskStatus), currentTime)
], function (error) {
if (error && error.message === 'no tasks need to exec') {
let delay = moment.millisecondToDayMinuteHourSecond(config.get('task.noTaskDelayTime'));
console.log(`no tasks, delay ${ delay }`);
setTimeout(process, config.get('task.noTaskDelayTime'));
} else if (error) {
logger.error(error);
process();
} else {
process();
}
});
}
module.exports = process;
实现代码
后续会把关键信息从代码中抽离出来放到配置文件中,然后放到 GitHub 上,暂时以贴代码的形式简单展示一下。
// 任务运行服务器调度系统
'use strict';
// 内建模块
const fork = require('child_process').fork;
const path = require('path');
// 第三方模块
const _ = require('lodash');
const CronJob = require('cron').CronJob;
const bodyParser = require('body-parser');
const express = require('express');
const request = require('request');
const uuid = require('uuid');
class TaskStatusManagementClient {
/**
* 初始化 TaskStatusClient
* @param {Object} taskClientConfig 服务器配置信息
*/
constructor(taskClientConfig) {
this.taskClientConfig = taskClientConfig;
this.taskHomePath = taskClientConfig.taskHomePath;
this.childsInstance = {};
this.crontabJobsInstance = {};
}
/**
* start TaskStatusClient
*/
start() {
this._process();
}
_process() {
let self = this;
// 根据服务器配置信息启动所有任务
for(let taskConfig of self.taskClientConfig.tasks) {
switch(taskConfig.type) {
case 'daemon': {
self._daemonTaskHandler(taskConfig);
break;
}
case 'crontab': {
if(taskConfig.crontabRule) {
self._crontabTaskHanlder(taskConfig);
}
break;
}
default: {
console.log('unknow task type');
break;
}
}
}
// 在程序退出时结束所有子进程任务
process.on('exit', function (code) {
for(let child in self.childsInstance) {
if(self.childsInstance.hasOwnProperty(child)) {
self.childsInstance[child].kill('SIGHUP');
}
}
});
// 启动 HTTP 服务器,监听任务状态监控平台发回的信号
let app = express();
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: false }));
app.post('/', function (req, res) {
let body;
try {
body = typeof req.body !== 'object' ? JSON.parse(req.body) : req.body;
} catch (error) {
return res.status(400).json({ message: 'invalid json' });
}
res.status(200).json({ message: 'ok' });
// 收到任务状态监控平台发回的信号后重启任务
let taskConfig = _.find(self.taskClientConfig.tasks, { name: body.name });
let taskIdentifier = '';
// daemon 类型的任务在 childsInstance 中的 key 是任务名
// crontab 类型的任务在 childsInstance 中的 key 是任务名 + 任务 ID
switch (taskConfig.type) {
case 'daemon': {
taskIdentifier = taskConfig.name;
self.childsInstance[taskIdentifier].kill('SIGHUP');
delete self.childsInstance[taskIdentifier];
if(self.handlers.daemonTaskHandler) {
self.handlers.daemonTaskHandler(taskConfig);
} else {
self._daemonTaskHandler(taskConfig);
}
break;
}
case 'crontab': {
taskIdentifier = taskConfig.name + taskConfig.id;
self.childsInstance[taskIdentifier].kill('SIGHUP');
delete self.childsInstance[taskIdentifier];
if(self.handlers.crontabTaskHandler) {
self.handlers.crontabTaskHandler(taskConfig);
} else {
self._crontabTaskHanlder(taskConfig);
}
break;
}
default: {
console.log('unknow task type');
break;
}
}
});
app.listen(self.taskClientConfig.server.port, function () {
console.log('server start at: ' + self.taskClientConfig.server.port);
self._registerServer(self.taskClientConfig, function (error, result) {
if (error) {
console.log(error);
}
});
});
}
_daemonTaskHandler(taskConfig) {
let self = this;
let taskInfo = {
appName: self.appName,
name: taskConfig.name,
expires: taskConfig.timeout
};
let child = fork(path.join(self.taskHomePath,taskConfig.file));
self.childsInstance[taskInfo.name] = child;
child.on('message', function (message) {
switch (message.signal) {
case 'start': {
taskInfo.id = message.id;
self._startTask(taskInfo, function (error) {
if (error) {
console.log(error);
}
});
break;
}
case 'end': {
taskInfo.id = message.id;
self._endTask(taskInfo, function (error) {
if (error) {
console.log(error)
}
});
break;
}
default: {
console.log('unknow signal');
break;
}
}
});
}
_crontabTaskHanlder(taskConfig) {
let self = this;
let taskInfo = {
appName: self.appName,
name: taskConfig.name,
id: uuid.v4(),
expires: taskConfig.timeout
};
self.crontabJobsInstance[taskInfo.name + taskInfo.id] = new CronJob(taskConfig.crontabRule, function () {
self._startTask(taskInfo, function (error) {
if(error) {
console.log(error);
} else {
let child = fork(path.join(self.taskHomePath, taskConfig.file));
self.childsInstance[taskInfo.name + taskInfo.id] = child;
child.on('exit', function (code) {
// 子进程退出 code 为 0,代表正常退出,这时可以向监控平台发送任务已完成信号
if(code === 0) {
self._endTask(taskInfo, function (error) {
if (error) {
console.log(error);
}
});
}
});
}
});
}, undefined, true);
}
_startTask(taskInfo, callback) {
let requestOptions = {
uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/status/start',
method: 'POST',
timeout: 5000,
form: taskInfo
};
request(requestOptions, function (error, response, body) {
return callback(error, body);
});
}
_endTask(taskInfo, callback) {
let requestOptions = {
uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/status/end',
method: 'POST',
timeout: 5000,
form: taskInfo
};
request(requestOptions, function (error, response, body) {
return callback(error, body);
});
}
_registerServer(taskClientConfig, callback) {
let requestOptions = {
uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/servers',
method: 'POST',
timeout: 5000,
form: taskClientConfig
};
request(requestOptions, function (error, response, body) {
return callback(error, body);
});
}
}
module.exports = TaskStatusManagementClient;
// 调度系统的使用
'use strict';
const config = {
// 监控平台信息,必须
management: {
host: 'http://127.0.0.1',
port: 3000
},
// 当前服务器信息,必须
server: {
host: 'http://127.0.0.1',
port: 3001
},
// 当前服务器任务文件地址(绝对路径),必须
taskHomePath: path.join(__dirname, 'tasks'),
// 任务配置信息,必须
tasks:[{
name: 'exampleTaskOne',
type: 'daemon',
file: 'example_task_one.js',
timeout: 10000
}, {
name: 'exampleTaskTwo',
type: 'crontab',
file: 'example_task_two.js',
crontabRule: '*/20 * * * * *', // 任务类型为 crontab 是此字段为必须
timeout: 10000
}]
};
let taskStatusManagementClient = new TaskStatusManagementClient(config);
taskStatusManagementClient.start();
监控平台 HTTP 服务器比较简单,三个 API,用来将服务器信息、任务开始状态、任务结束状态写入数据库,这里就不在赘述。
// 监控平台任务状态监管
'use strict';
const async = require('async');
const config = require('config');
const mail = require('nodemailer').createTransport({
service: 'your email service',
auth: {
user: 'your email username',
pass: 'your email password'
}
});
const request = require('request');
const logger = require('../log/logger');
const moment = require('../libs/moment');
// 一下三个为数据库操作连接,不需要关注内部代码,不影响代码阅读
const TaskResult = require('../models/task_result');
const TaskStatus = require('../models/task_status');
const TaskServer = require('../models/task_server');
/**
* 发送告警邮件
* @param {String} to 邮件接收者
* @param {String} subject 邮件主题
* @param {String} message 邮件内容
* @param {Function} callback
*/
function sendWarningMessage(to, subject, message, callback) {
var options = {
'from': 'xxx@xxx.xxx',
'to': to,
'subject': subject,
'text': message,
'encoding': 'UTF-8'
};
mail.sendMail(options, function(error) {
console.log('send message to: ' + to);
return callback(error);
});
}
/**
* 处理执行成功任务
* @param {String} task 任务状态对象
* @param {Function} callback
*/
function handleSingleSucceedTask(task, callback) {
TaskResult.increaseTaskSuccessCount(task.name, function (error) {
return callback(error);
});
}
/**
* 处理执行失败的任务
* @param {String} task 任务状态对象
* @param {Function} callback
*/
function handleSingleErrorTask(task, callback) {
async.waterfall([
// 增加任务执行失败次数
async.apply(TaskResult.increaseTaskErrorCount.bind(TaskResult), task.name),
async.apply(TaskResult.getTaskErrorCount.bind(TaskResult), task.name),
function (count, callback) {
callback(undefined);
// 给调度系统发信号
TaskServer.findTaskServerHost(task.name, function (error, serverHost) {
// 超过限制失败次数,发送告警邮件
if (count >= config.get('task.limitErrorCount')) {
sendWarningMessage(config.get('task.noticeUserEmailAddress'), '定时任务告警', `${ serverHost }: "${ task.name }" 执行失败超过预定失败次数`, function (error) {
if (error) {
logger.error(error);
}
});
TaskResult.resetTaskErrorCount(task.name, function (error) {
if (error) {
logger.error(error);
}
});
}
if (!error && serverHost) {
// send 'error' signal to task server
request({
uri: serverHost,
method: 'POST',
timeout: 5000,
form: {
name: task.name,
id: task.taskId,
pid: task.pid
}
}, function (error, response, body) {
if (error) {
logger.error(error);
}
});
}
});
}
], function (error) {
return callback(error);
});
}
/**
* 获取所有已经到达预定超时时间的任务并处理
*/
function process() {
let currentTime = new Date().getTime();
async.waterfall([
async.apply(TaskStatus.getExpiredTasks.bind(TaskStatus), currentTime),
// 并行处理每个任务
function (tasks, callback) {
if (tasks.length > 0) {
async.parallel(tasks.map(function (task) {
return function (callback) {
// 超时后任务状态仍然为 running,代表任务执行失败
if (task.status === 'running') {
handleSingleErrorTask(task, function (error) {
return callback(error);
});
} else {
handleSingleSucceedTask(task, function (error) {
return callback(error);
});
}
}
}), function (error) {
return callback(error);
});
} else {
return callback(new Error('no tasks need to exec'));
}
},
// 删除已经处理完成的任务
async.apply(TaskStatus.removeExpiredTasks.bind(TaskStatus), currentTime)
], function (error) {
if (error && error.message === 'no tasks need to exec') {
let delay = moment.millisecondToDayMinuteHourSecond(config.get('task.noTaskDelayTime'));
console.log(`no tasks, delay ${ delay }`);
setTimeout(process, config.get('task.noTaskDelayTime'));
} else if (error) {
logger.error(error);
process();
} else {
process();
}
});
}
module.exports = process;