class SaltAPI(object):
def __init__(self, url=None, user=None, password=None):
self.__url = settings.SALT_API_URL
self.__user = settings.SALT_API_USER
self.__password = settings.SALT_API_PASSWORD
self.__headers = {'Accept': 'application/json', 'Content-type': 'application/json', 'Connection': 'close'}
self.__data = {'client': 'local'}
self.__token = None
def get_token(self):
"""
用户登陆和获取token
:return:
"""
params = {'eauth': 'pam', 'username': self.__user, 'password': self.__password}
content = self.postRequest(params, self.__headers, prefix='login')
try:
self.__token = content['return'][0]['token']
self.__headers['X-Auth-Token'] = self.__token
except Exception as e:
logger.error(e)
return content
def get_grains(self, target=None):
"""
获取系统基础信息
:return:
"""
data = copy.deepcopy(self.__data)
if target:
data['tgt'] = target
else:
data['tgt'] = '*'
data['fun'] = 'grains.items'
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def get_auth_keys(self):
"""
获取所有已认证的主机
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'wheel'
data['fun'] = 'key.list_all'
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]['data']['return']['minions']
except Exception as e:
logger.error(e)
return content
def get_minion_status(self):
"""
获取所有主机的连接状态
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'runner'
data['fun'] = 'manage.status'
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def delete_key(self, minion=None):
'''
删除指定主机的认证信息
'''
if not minion:
return {'success': False, 'msg': 'minion-id is none'}
data = copy.deepcopy(self.__data)
data['client'] = 'wheel'
data['fun'] = 'key.delete'
data['match'] = minion
content = self.postRequest(data, self.__headers)
try:
return {'success': content['return'][0]['data']['success']}
except Exception as e:
logger.error(e)
return content
def minion_alive(self, minion=None):
'''
Minion主机存活检测
'''
data = copy.deepcopy(self.__data)
if minion:
data['tgt'] = minion
result = {minion: False}
else:
data['tgt'] = '*'
result = {'success': False}
data['fun'] = 'test.ping'
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return result
def passwd(self, target=None, user=None, password=None, pass_length=16):
"""
修改密码
:param target: 目标客户端
:param user: 目标客户端的系统用户名
:param password: 新的密码,必须大于等于12位
:return:
"""
if not target:
return {'success': False, 'msg': 'target is none.'}, password
if not user:
return {'success': False, 'msg': 'user is none.'}, password
if password:
if len(password) < pass_length:
return {'success': False, 'msg': 'password must be greater than or equal to {} bits.'.format(pass_length)}, password
if password.isalpha() or password.isdigit() or password.islower() or password.isupper():
return {'success': False, 'msg': 'password must be have lowercase, uppercase and digit.'}, password
else:
password = make_pass(pass_length)
_password = crypt(password, 'cmdb')
try:
self.cmd(target=target, arg='usermod -p "{}" {}'.format(_password, user))
except Exception as e:
logger.debug(e)
res = {'success': True,
'msg': 'Changing password for user {}.all authentication tokens updated successfully.'.format(user),
}
return res, password
def get_users(self, target=None):
"""
获取系统用户
:param target: 目标客户端
:return:
"""
if not target: return {'success': False, 'msg': 'target is none.'}
content = self.cmd(target=target, arg="grep /bin/bash /etc/passwd|awk -F ':' '{print $1}'")
return content
def run_cmdb_agent(self, target=None):
"""
运行cmdb agent
:param target: 目标客户端
:return:
"""
if not target:
return {'success': False, 'msg': 'target is none.'}
content = self.cmd(target=target, arg="/etc/init.d/vmagent")
return content
def cmd(self, target=None, fun='cmd.run', arg=None, async=False):
"""
远程执行任务
:param target: 目标客户端,为空return False
:param fun: 模块
:param arg: 参数,可为空
:param async: 异步执行,默认非异步
:return:
"""
data = copy.deepcopy(self.__data)
if not target:
return {'success': False, 'msg': 'target is none'}
if arg:
data['arg'] = arg
if async:
data['client'] = 'local_async'
data['tgt'] = target
data['fun'] = fun
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def jobs(self, fun=None, jid=None):
"""
任务
:param fun: active,detail
:param jid: Job ID
:return:
"""
data = {'client': 'runner'}
if fun == 'active':
data['fun'] = 'jobs.active'
elif fun == 'detail':
if not jid: return {'success': False, 'msg': 'job id is none'}
data['fun'] = 'jobs.lookup_jid'
data['jid'] = jid
else:
return {'success': False, 'msg': 'fun is active or detail'}
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def postRequest(self, data, headers, prefix=None):
if prefix:
url = '{}/{}'.format(self.__url, prefix)
else:
url = self.__url
try:
s = requests.Session()
s.mount('https://', HTTPAdapter(max_retries=10))
ret = s.post(url, data=json.dumps(data), headers=headers, verify=False, timeout=(30, 60))
if ret.status_code == 401:
logger.error('Salt Unauthorized')
return {'return': [{'success': False, 'msg': 'Salt Unauthorized'}]}
elif ret.status_code == 200:
return ret.json()
else:
return {'return': [{'success': False, 'msg': ret.content}]}
except Exception as e:
logger.error(e)
return {'return': [{'success': False, 'msg': e}]}
def get_pre_auth_keys(self):
"""
获取未授权的salt主机
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'wheel'
data['fun'] = 'key.list_all'
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]['data']['return']['minions_pre']
except Exception as e:
logger.error(e)
return content
def accept_key(self, minion):
"""
授权salt主机
:param minion:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'wheel'
data['fun'] = 'key.accept'
data['match'] = minion
content = self.postRequest(data, self.__headers)
try:
return {'success': content['return'][0]['data']['success']}
except Exception as e:
logger.error(e)
return content
def salt_alive(self, tgt):
'''
Minion主机存活检测
'''
data = copy.deepcopy(self.__data)
if tgt:
data['tgt'] = tgt
else:
data['tgt'] = '*'
data['fun'] = 'test.ping'
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return None
def getRequest(self, headers, prefix=None):
if prefix:
url = '{}/{}'.format(self.__url, prefix)
else:
url = self.__url
try:
s = requests.Session()
s.mount('https://', HTTPAdapter(max_retries=10))
ret = s.get(url, headers=headers, verify=False, timeout=(30, 60))
if ret.status_code == 401:
logger.error('Salt Unauthorized')
return {'return': [{'success': False, 'msg': 'Salt Unauthorized'}]}
elif ret.status_code == 200:
return ret.json()
except Exception as e:
logger.error(e)
return {'return': [{'success': False, 'msg': e}]}
def salt_runner_requests(self, jid):
'''
通过jid获取执行结果
'''
content = self.getRequest(prefix='/jobs/{}'.format(jid), headers=self.__headers)
return content
def salt_runner(self, jid):
"""
获取job的执行结果
:param jid:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'runner'
data['fun'] = 'jobs.lookup_jid'
data['jid'] = jid
content = self.postRequest(data, self.__headers)
return content
def salt_running_jobs(self):
"""
获取在运行的job
:return:
"""
data = copy.deepcopy(self.__data)
data['clent'] = 'runner'
data['fun'] = 'jobs.active'
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def remote_execution(self, tgt, fun, arg, expr_form):
"""
异步执行远程指令
:param tgt:
:param fun:
:param arg:
:param expr_form:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'local_async'
data['tgt'] = tgt
data['fun'] = fun
data['arg'] = arg
data['expr_form'] = expr_form
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]['jid']
except Exception as e:
logger.error(e)
return content
def remote_module(self, tgt, fun, arg, kwarg, expr_form, client='local_async'):
"""
异步部署模块
:param tgt:
:param fun:
:param arg:
:param kwarg:
:param expr_form:
:param client local_async 异步 or local 同步
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = client
data['tgt'] = tgt
data['fun'] = fun
data['arg'] = arg
data['kwarg'] = {"pillar": kwarg}
data['expr_form'] = expr_form
content = self.postRequest(data, self.__headers)
if client == "local_async":
try:
return content['return'][0]['jid']
except Exception as e:
logger.error(e)
return content
else:
return content
def remote_localexec(self, tgt, fun, arg, expr_form):
data = copy.deepcopy(self.__data)
data['client'] = 'local'
data['tgt'] = tgt
data['fun'] = fun
data['arg'] = arg
data['expr_form'] = expr_form
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]['jid']
except Exception as e:
logger.error(e)
return content
def salt_state(self, tgt, arg, expr_form):
"""
sls文件
:param tgt:
:param arg:
:param expr_form:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'local'
data['tgt'] = tgt
data['fun'] = 'state.sls'
data['arg'] = arg
data['expr_form'] = expr_form
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def project_manage(self, tgt, fun, arg1, arg2, arg3, arg4, arg5, expr_form):
"""
项目管理
:param tgt:
:param fun:
:param arg1:
:param arg2:
:param arg3:
:param arg4:
:param arg5:
:param expr_form:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'local'
data['tgt'] = tgt
data['fun'] = fun
data['arg'] = arg1
data['arg2'] = arg2
data['arg3'] = arg3
data['arg4'] = arg4
data['arg5'] = arg5
data['expr_form'] = expr_form
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def file_copy(self, tgt, fun, arg1, arg2, expr_form):
"""
文件copy
:param tgt: ./
:param fun: file.manager
:param arg1:
:param arg2:
:param expr_form:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'local'
data['tgt'] = tgt
data['fun'] = fun
data['arg'] = arg1
data['arg2'] = arg2
data['expr_form'] = expr_form
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def file_bak(self, tgt, fun, arg, expr_form):
"""
文件备份到master上
:param tgt:
:param fun:
:param arg:
:param expr_form:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'local'
data['tgt'] = tgt
data['fun'] = fun
data['expr_form'] = expr_form
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def file_manage(self, tgt, fun, arg1, arg2, arg3, expr_form):
"""
文件回滚
:param tgt:
:param fun:
:param arg1:
:param arg2:
:param arg3:
:param expr_form:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'local'
data['tgt'] = tgt
data['fun'] = fun
data['arg'] = arg1
data['arg2'] = arg2
data['arg3'] = arg3
data['expr_form'] = expr_form
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def remote_server_info(self, tgt, fun):
"""
获取远程主机信息
:param tgt:
:param fun:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'local'
data['tgt'] = tgt
data['fun'] = fun
content = self.postRequest(data, self.__headers)
try:
return content['return'][0][tgt]
except Exception as e:
logger.error(e)
return content
def get_list_job(self):
"""
获取正在运行的job
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'runner'
data['fun'] = 'jobs.list_jobs'
content = self.postRequest(data, self.__headers)
ret = content['return'][0]
return [{k: v} for k, v in zip(ret.keys(), ret.values())]
def get_running_job(self):
"""
获取正在运行的job
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'runner'
data['fun'] = 'jobs.active'
content = self.postRequest(data, self.__headers)
ret = content['return'][0]
return [{k: v} for k, v in zip(ret.keys(), ret.values())]
def term_running_job(self, jid):
"""
终止正在运行的job
:return:
"""
data = copy.deepcopy(self.__data)
data['tgt'] = '*'
data['client'] = 'local'
data['fun'] = 'saltutil.term_job'
data['arg'] = jid
content = self.postRequest(data, self.__headers)
return content['return'][0]
def get_job_info(self, jid):
"""
获取正在运行的job
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'runner'
data['fun'] = 'jobs.lookup_jid'
data['arg'] = jid
content = self.postRequest(data, self.__headers)
return content
def check_job_result(self, jid):
"""
检查job是否已经运行完成
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'runner'
data['fun'] = 'jobs.exit_success'
data['arg'] = jid
content = self.postRequest(data, self.__headers)
return content
使用方式
client = SaltAPI()
client.get_token()
然后用client调用方法
异步执行模块部署
入参
module.module_path # 模块路径
"tgt_list": [
"SHTL00706921"
],
"arg": "init.6_env_init"
expr_form = 'list'
调用并返回jid
jid = sapi.remote_module(tgt_select, 'state.sls', 'module.{}.{}'.format(module.module_path, module.module),
{'SALTSRC': 'module/{}'.format(module.module_path)}, expr_form)