import argparse
import logging
import os
import re
import subprocess
import sys
import traceback
from concurrent.futures import ThreadPoolExecutor
import time
from functools import wraps
from flask import Flask, request, jsonify
LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s'
DEFAULT_MAP_NUMS = 10
DEFAULT_MAP_MAX_BANDWIDTH = 20
RUNING_RETURN_CODE = 200
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=LOG_FORMAT)
app = Flask("distcp")
executor = ThreadPoolExecutor()
from mysql.connector import pooling
pattern = r"job_[a-zA-Z0-9]+_[0-9]+"
progress_rate_pattern = r"map (\d+)%"
pool = None
# 执行distcp提交mapreduce作业的客户端,多久刷一次日志,默认1000ms
monitor_total_bandwidth = 0
monitor_bandwidth_by_database = False
def retry_on_exception(retries=10, wait_time=2):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for _ in range(retries):
try:
return func(*args, **kwargs)
except Exception as e:
print(f"Exception occurred: {e}")
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
raise e # 所有重试耗尽后抛出异常
return wrapper
return decorator
def create_pool(host, port, user, password, database):
config = {
"host": host,
"port": port,
"user": user,
"password": password,
"database": database
}
return pooling.MySQLConnectionPool(pool_name="mypool", pool_size=20, **config)
@app.route('/verify', methods=['POST'])
def verify():
data = request.get_json()
source_path = data.get('source_path')
target_path = data.get('target_path')
result = {}
if source_path and target_path:
command_source = f'hadoop fs -du -s {source_path}'
command_target = f'hadoop fs -du -s {target_path}'
output_source = subprocess.check_output(command_source, shell=True)
output_target = subprocess.check_output(command_target, shell=True)
result_source = re.sub(r"\s+", " ", output_source.decode("utf-8")).split(" ")[0]
result_target = re.sub(r"\s+", " ", output_target.decode("utf-8")).split(" ")[0]
# 比较两个目录的存储大小,相等则验证数据一致;否则,验证数据不一致
# 因为使用distcp拷贝数据时保持了各种属性,正常来说,只需要比较两个目录的存储大小
if result_source == result_target:
result = {'verify_result': 'true'}
else:
result = {'verify_result': 'false'}
return jsonify(result)
def perm_to_int(perm_str):
if "rwx" == perm_str:
return 7
elif "rw-" == perm_str:
return 6
elif "r-x" == perm_str:
return 5
elif "r--" == perm_str:
return 4
elif "-wx" == perm_str:
return 3
elif "-w-" == perm_str:
return 2
elif "--x" == perm_str:
return 1
else:
return 0
def parse_permission(attribute):
if len(attribute) != 9:
raise ValueError("permission attribute length must be 9 character.")
user_permission = perm_to_int(attribute[0:3])
group_permission = perm_to_int(attribute[3:6])
other_permission = perm_to_int(attribute[6:9])
return str(user_permission) + str(group_permission) + str(other_permission)
def get_attribute_dict(table_path):
results = {}
if table_path:
command = f'hadoop fs -ls -d {table_path}'
command_result = subprocess.check_output(command, shell=True)
attribute_outputs = re.sub(r"\s+", " ", command_result.decode("utf-8")).split(" ")
permission = parse_permission(attribute_outputs[0][1:10])
user = attribute_outputs[2]
group = attribute_outputs[3]
results["permission"] = permission
results["user"] = user
results["group"] = group
return results
def do_sync_attribute(source_attr_dict, target_attr_dict, target_table_path):
try:
if target_attr_dict["permission"] != source_attr_dict["permission"]:
new_permission = int(source_attr_dict["permission"])
perm_command = f'hadoop fs -chmod {new_permission} {target_table_path}'
print(f'sync permission for {target_table_path} command is {perm_command}')
subprocess.check_output(perm_command, shell=True)
if target_attr_dict["user"] != source_attr_dict["user"] \
or target_attr_dict["group"] != source_attr_dict["group"]:
new_owner = source_attr_dict["user"] + ":" + source_attr_dict["group"]
owner_command = f'hadoop fs -chown {new_owner} {target_table_path}'
print(f'sync owner for {target_table_path} command is {owner_command}')
subprocess.check_output(owner_command, shell=True)
return True
except Exception as e:
print(f'do sync table directory attribute failed, cause is {e.args}, stack info is {traceback.print_stack()}')
return False
@app.route('/sync_dir_attribute', methods=['POST'])
def sync_dir_attribute():
data = request.get_json()
source_table_path = data.get('source_table_path')
target_table_path = data.get('target_table_path')
source_table_attr_dict = get_attribute_dict(source_table_path)
target_table_attr_dict = get_attribute_dict(target_table_path)
print(f'source table attribute dict is {source_table_attr_dict}, '
f'target table attribute dict is {target_table_attr_dict}')
sync_result = do_sync_attribute(source_table_attr_dict, target_table_attr_dict, target_table_path)
if sync_result:
result = {'sync_result': 'true'}
else:
result = {'sync_result': 'false'}
return jsonify(result)
@app.route('/get_storage', methods=['POST'])
def get_storage():
data = request.get_json()
target_path = data.get('target_path')
target_storage = None
if target_path:
command_storage = f'hadoop fs -du -s {target_path}'
output_target = subprocess.check_output(command_storage, shell=True)
target_storage = re.sub(r"\s+", " ", output_target.decode("utf-8")).split(" ")[0]
# 获取指定hdfs目录的存储大小
result = {'storage': target_storage}
return jsonify(result)
@app.route('/distcp', methods=['POST'])
def distcp():
data = request.get_json()
source_path = data.get('source_path')
target_path = data.get('target_path')
run_params = data.get('run_params')
job_name = data.get('job_name', 'distcp-job')
hadoop_user_name = data.get('hadoop_user_name', None)
monitor_poll_interval = data.get('monitor_poll_interval', 5000)
# 使用线程池执行异步任务
command = ['hadoop', 'distcp', f'-Dmapreduce.job.name={job_name}',
f'-Dmapreduce.client.progressmonitor.pollinterval={monitor_poll_interval}']
if run_params:
# 将命令中多余的空格去掉
run_params = re.sub(r"\s+", " ", run_params)
command.extend(run_params.split(" "))
command.extend([source_path, target_path])
logging.info(' '.join(command))
exec_id = insert(command, '', '', 100)
env_vars = os.environ.copy()
if hadoop_user_name:
env_vars.update({'HADOOP_USER_NAME': hadoop_user_name})
executor.submit(run_distcp, exec_id, command, env_vars)
return jsonify({'exec_id': exec_id})
def run_distcp(exec_id, command, env_vars):
# 构建distcp命令
logs = []
job_id = None
map_nums, map_max_bandwidth = parse_command(command)
cmd = ' '.join(command)
global monitor_total_bandwidth
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env_vars)
for line in iter(process.stdout.readline, b''):
if line:
line = line.decode('utf-8')
logs.append(line)
logging.info(line)
# progress_rate
matches = re.findall(progress_rate_pattern, line)
if matches:
progress_rate = matches[0]
update_progress_rate(exec_id, cmd, '', progress_rate, 200)
# job_id
matches = re.findall(pattern, line)
if not job_id and len(matches) > 0:
job_id = matches[0]
monitor_total_bandwidth += int(map_nums) * int(map_max_bandwidth)
update(exec_id, cmd, '', job_id, 200)
process.stdout.close()
process.wait()
return_code = process.returncode
monitor_total_bandwidth -= int(map_nums) * int(map_max_bandwidth)
# 记录执行结果
logging.info(f'Return code: {return_code}\n')
logging.info(f'Job id: {job_id}\n')
logging.info('------------------------\n')
update(exec_id, cmd, ''.join(logs), job_id, return_code)
@retry_on_exception()
def insert(command, output, job_id, return_code):
map_nums, map_max_bandwidth = parse_command(command)
cmd = ' '.join(command)
connection = None
try:
connection = pool.get_connection()
cursor = connection.cursor()
sql = "INSERT INTO distcp_logs (cmd, output, job_id, return_code, map_nums, map_max_bandwidth) " \
"VALUES (%s, %s, %s, %s, %s, %s)"
val = (cmd, output, job_id, return_code, map_nums, map_max_bandwidth)
cursor.execute(sql, val)
connection.commit()
job_id = cursor.lastrowid
cursor.close()
logging.info(f"Distcp record insert successfully. ID: {job_id}")
except Exception as e:
raise e
finally:
if connection:
connection.close()
return job_id
@retry_on_exception()
def update_progress_rate(id, cmd, output, progress_rate, return_code):
connection = None
try:
connection = pool.get_connection()
cursor = connection.cursor()
sql = "UPDATE distcp_logs SET cmd = %s, output = %s, progress_rate = %s, return_code = %s WHERE id = %s"
val = (cmd, output, progress_rate, return_code, id)
cursor.execute(sql, val)
connection.commit()
cursor.close()
logging.info(f"Distcp record {id} with progress_rate {progress_rate} updated successfully.")
except Exception as e:
raise e
finally:
if connection:
connection.close()
@retry_on_exception()
def update(id, cmd, output, job_id, return_code):
connection = None
try:
connection = pool.get_connection()
cursor = connection.cursor()
sql = "UPDATE distcp_logs SET cmd = %s, output = %s, job_id = %s, return_code = %s WHERE id = %s"
val = (cmd, output, job_id, return_code, id)
cursor.execute(sql, val)
connection.commit()
cursor.close()
logging.info(f"Distcp record with id {id} updated successfully.")
except Exception as e:
raise e
finally:
if connection:
connection.close()
@app.route('/distcp', methods=['GET'])
def distcp_logs():
exec_id = request.args.get('exec_id')
# 查询执行结果
connection = pool.get_connection()
cursor = connection.cursor()
if exec_id:
sql = "SELECT * FROM distcp_logs WHERE id = %s"
val = (exec_id,)
cursor.execute(sql, val)
else:
sql = "SELECT * FROM distcp_logs"
cursor.execute(sql)
result = cursor.fetchall()
cursor.close()
connection.close()
# 将查询结果转换为Python对象
data = []
for row in result:
entry = {
"id": row[0],
"cmd": row[1],
"job_id": row[2],
"return_code": row[3],
"progress_rate": f"{row[4]}%",
"logs": row[5]
}
data.append(entry)
return jsonify(data)
@app.route('/distcp/total_bandwidth', methods=['GET'])
def distcp_total_bandwidth():
total_bandwidth = 0
if monitor_bandwidth_by_database:
return_code = request.args.get("return_code")
if return_code is None:
return_code = RUNING_RETURN_CODE
# 查询执行结果
connection = pool.get_connection()
cursor = connection.cursor()
sql = "SELECT * FROM distcp_logs WHERE return_code = %s"
val = (return_code,)
cursor.execute(sql, val)
result = cursor.fetchall()
cursor.close()
connection.close()
for row in result:
map_nums = row[5]
map_max_bandwidth = row[6]
if map_nums and map_max_bandwidth:
total_bandwidth += int(map_nums) * int(map_max_bandwidth)
else:
logging.warning(f'Wrong map_nums with {map_nums} '
f'or map_max_bandwidth with {map_max_bandwidth}')
else:
total_bandwidth = monitor_total_bandwidth
data = {'total_bandwidth': total_bandwidth}
return jsonify(data)
@app.route('/distcp/kill', methods=['GET'])
def distcp_kill():
app_id = request.args.get('app_id')
command = ['yarn', 'application', '-kill', app_id]
code = subprocess.call(command)
return jsonify({'code', code})
# 从命令数组中获得map数和map最大带宽
def parse_command(command):
map_nums = DEFAULT_MAP_NUMS
map_max_bandwidth = DEFAULT_MAP_MAX_BANDWIDTH
i = 0
while i < len(command):
if command[i] == '-m':
i += 1
map_nums = command[i]
elif command[i] == '-bandwidth':
i += 1
map_max_bandwidth = command[i]
i += 1
return map_nums, map_max_bandwidth
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--host', type=str, default='10.52.46.123', help='Database host')
parser.add_argument('--port', type=int, default=33066, help='Database port')
parser.add_argument('--user', type=str, default='diagnose', help='Database username')
parser.add_argument('--password', type=str, default='diagnose123456', help='Database password')
parser.add_argument('--database', type=str, default='spark_diagnose', help='Database name')
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
logging.info(' '.join(f'\n{k}={v}' for k, v in vars(args).items()))
pool = create_pool(args.host, args.port, args.user, args.password, args.database)
app.run(host="0.0.0.0", debug=True)
distcp服务
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 原文章转载:http://blog.csdn.net/qq_30379689/article/details/53...
- 前言 Google Play应用市场对于应用的targetSdkVersion有了更为严格的要求。从 2018 年...