part 1
https://blog.csdn.net/mr_evanchen/article/details/77879967
# -*- coding: UTF-8 -*-
import bottle
import json
import re
import time
import uuid
from datetime import date, datetime, timedelta
from bottle import get, post, request
config = {
'user' : 'demo',
'password' : '123456',
'host' : '10.12.2.87',
'database' : 'testdb',
'raise_on_warnings' : True,
}
client_tokens = {
#token for FOXTEL sanity test, PC in Beijing
#'Y' == enabled
'foxtel_beijing_pc':'Y'
}
def verify_client_token(client_tok):
'''
If the token is there and the token is enabled.
'''
ret_val = False
if client_tokens.has_key(client_tok):
if client_tokens[client_tok] == 'Y':
ret_val = True
return ret_val
@bottle.route('/task/add', method= ['GET', 'POST'])
def execution_schedule_add():
'''
Add a task into the queue
1. Abstract the parameters
2. Append a task into the queue
'''
#method should not be 'get'
'''
if bottle.request.method == 'GET':
return ''
#to-do: abstract parameters
token = bottle.request.POST.get('tok')
if verify_client_token(token) is False:
return ''
'''
print str(bottle.request.body.readlines())
job_assigment_id = bottle.request.POST.get('job_assigment_id')
script_name = bottle.request.POST.get('script_name')
script_parameter = bottle.request.POST.get('script_parameter')
slot_id = bottle.request.POST.get('slot_id')
if job_assigment_id is None:
#debug, error
job_assigment_id = "123123"
script_name = "test_tools_ft_0000_SampleScript.py"
script_parameter = "None"
slot_id = "1"
print job_assigment_id, script_name, script_parameter, slot_id
#create a task with time-stamp
t_timestamp = str(int(time.time() * 100))
f_name = "s" + t_timestamp + ".txt"
task_file = open(".\\dart_task_q\\" + f_name,'w')
task_file.write(job_assigment_id + '\n')
task_file.write(script_name + '\n')
task_file.write(script_parameter + '\n')
task_file.write(slot_id + '\n')
task_file.close()
return f_name
@bottle.route('/')
def index():
return 'hello'
if __name__ == '__main__':
#bottle.run(host = '127.0.0.1', port = 9996)
bottle.run(host = '10.12.2.147', port = 9091)
part 2
# -*- coding: UTF-8 -*-
import datetime
import glob
import os
import re
import shutil
import stat
import subprocess
import time
def clear_folder(target_path):
for item in os.listdir(target_path):
path_file = os.path.join(target_path, item)
if os.path.isfile(path_file):
pass
else:
shutil.rmtree(path_file)
return
def abstract_script_result_from_log_line(the_line):
if "Result = Pass" in the_line:
return "Pass"
if "Result = Fail" in the_line:
return "Fail"
else:
return "Error"
def convert_script_result_to_code(the_result):
if the_result == "Pass":
return '111'
if the_result == "Fail":
return '100'
if the_result == "Block":
return '101'
else:
return '000'
def abstract_time_tick_from_log_line(the_line):
the_tick = ""
the_tick = the_line.split(':')[0]
the_tick = time.strptime(the_tick, '%Y-%m-%d %H-%M-%S')
the_tick = time.mktime(the_tick)
return the_tick
def analyze_log(target_path):
'''
Pre-condition:
There is only one log inside the folder
Post-condition:
Return the data collected from the log
Exception:
If the folder is empty, still return None/Error values
'''
script_start_time = None
script_end_time = None
script_status_code = "000" #default to be Error(000)
for item in os.listdir(target_path):
path_file = os.path.join(target_path, item)
if not os.path.isfile(path_file):
stdout_path = os.path.join(path_file, "stdout.txt")
stdout_f = open(stdout_path, 'r')
stdout_contents = stdout_f.readlines()
first_line = stdout_contents[0].strip()
script_start_time = abstract_time_tick_from_log_line(first_line)
last_line = stdout_contents[-1].strip()
script_end_time = abstract_time_tick_from_log_line(last_line)
script_execution_status = abstract_script_result_from_log_line(last_line)
script_status_code = convert_script_result_to_code(script_execution_status)
return script_start_time, script_end_time, script_status_code
def tick_to_string(time_tick):
return time.strftime('%Y-%m-%dT%H:%M:%SZ',time.localtime(time_tick))
def tester():
script_start_time, script_end_time, script_status_code = analyze_log(".\\Log_cases")
print script_start_time, script_end_time, script_status_code
print time.time()
print tick_to_string(time.time())
print datetime.datetime.strptime(tick_to_string(time.time()), "%Y-%m-%dT%H:%M:%SZ")
exit(0)
def schedule_get_parameters(schedule_file):
sche_f = open(schedule_file, 'r')
sche_contents = sche_f.readlines()
sche_f.close()
assignment_id = None
script_name = None
try:
#1st line = assignment id
assignment_id = sche_contents[0].strip()
#2nd line = script name
script_name = sche_contents[1].strip()
#3rd line = script_parameter
script_parameter = sche_contents[2].strip()
#4rd line = slot id
slot_id = sche_contents[3].strip()
except:
return None
return (assignment_id, script_name, script_parameter, slot_id)
if __name__ == '__main__':
while True:
f_selected = None
arr_files = []
#filter the task files
arr_files = glob.glob(".\\dart_task_q\\"+"/s*.txt")
if len(arr_files) > 0:
#find the 1st one in the queue
min_val = 999999000000
for sub_file in arr_files:
f_name = os.path.basename(sub_file)
v_tick = long(f_name.replace('s', '').replace('.txt','').strip())
if v_tick < min_val:
min_val = v_tick
f_selected = "s" + str(min_val) + ".txt"
f_selected = os.path.join(".\\dart_task_q\\", f_selected)
else:
pass
#collect the parameters
p_tuple = None
if f_selected is not None:
p_tuple = schedule_get_parameters(f_selected)
if p_tuple is not None:
assignment_id = p_tuple[0]
script_name = p_tuple[1]
slot_id = p_tuple[3]
print assignment_id, script_name, slot_id
#notify the server of the start
#clear the log file folder
clear_folder(".\\Log_cases")
#clear the old schedule file
if os.path.exists("execution_schedule.txt"):
os.remove("execution_schedule.txt")
#prepare the schedule file
o_f = open("execution_schedule.txt", 'w')
o_f.write(script_name)
o_f.close()
#run the script
r_start_time = time.time()
str_command = r"python O_runner.py --device {0}".format(slot_id)
print str_command
process = subprocess.Popen(str_command.strip(), shell=True)
process.wait()
r_end_time = time.time()
#collect the data from log
script_start_time, script_end_time, script_status_code = analyze_log(".\\Log_cases")
if script_start_time is None:
#script is not executed
script_start_time = r_start_time
script_end_time = r_end_time
pass
print tick_to_string(script_start_time)
print tick_to_string(script_end_time)
print script_status_code
#copy the log-file to some public address
#report back the data
#delete the task file
if os.path.exists(f_selected):
os.remove(f_selected)
else:
pass
#wait for next one
time.sleep(5)