一、可用的技术路线
- 通过com层操作。建议学习LMS帮助文档Automation.pdf,里面有示例和接口说明。~~
- 通过导出.mat文件,使用matlab进行操作。
- 通过导出.mat文件,使用python进行操作,本文使用的方法。
根据个人喜欢,选择对应方法。
使用LMS处理数据,每个人都有自己比较顺手的方法。因为我个人比较讨厌重复性工作以及抄数据,所以一直都在追求自动化的方法,哪怕只有点点都行。
自动化能够解放我的工作,同时能够减少出错率,废话不多说,直接上代码。
代码块
1. 导入包
import os #系统操作包,我主要用来操作文件夹与文件
import pandas as pd # 非常热门的数据处理工具
from scipy.io import loadmat #科学计算处理包,对标matlab
from scipy import signal #信号处理包,主要用了滤波功能
from scipy.fftpack import fft, fftfreq, fftshift #快速傅里叶变换
from scipy import math #数学计算包
import matplotlib.pyplot as plt # 画图包
import win32com.client #win32com包,这里用不到,因为我看不懂
import numpy as np # 矩阵计算工具,非常厉害
import pymongo #mongo数据库包
2. 定义常用的方法
#求时域信号的有效值,不是均值
def get_rms(records):
return math.sqrt(sum([x**2 for x in records]) / len(records))
def get_max_level(a_rms, a0=10**-6):
# 求加速度振级
return 20*math.log(a_rms/a0, 10)
#求振级
def vibt_level_diff(lvf, lvr):
deltav = lvf - lvr
pj = ''
if deltav>0:
pj = '悬置有隔振效果'
else:
pj = '振动经悬置传递后被放大'
return deltav, pj
# 滤波
# wn = 2*1024/12800 # 采样率12800, 截止频率1024,
def mylowpass(data,wn = 2*1024/12800):
b, a = signal.butter(10, wn, 'lowpass')
filtedData = signal.filtfilt(b, a, data)
return filtedData
# 傅里叶变换
def myfft(mydata, t, n):
l_max_hz = []
l_max_a = []
n = fourier.size
freq = fftfreq(n, t)
freq = fftshift(freq)[int(freq.size/2.0): freq.size]
fourier = fftshift(fourier)[int(fourier.size/2.0): fourier.size]
fourier = 2/n*np.abs(fourier)
max_three_index = fourier.argsort()[::-1][0:1] #最大的前三位数
big1 = [[fourier[i], freq[i]] for i in max_three_index]
l_max_a.append(big1[0][0])
l_max_hz.append(big1[0][1])
return l_max_a, l_max_hz
上述方法可以根据自己的需求添加。建议将方法设置在方法类中,需要什么就取什么。
3. 载入文件
os.chdir(r'E:\2020test\truck01_data\truck01_hulan') #切换到数据目录
print(f'已经切换到路径:{os.getcwd()}')
print('*'*50)
matdirs = [matdir for matdir in os.listdir() if matdir.endswith('.mat')] #读取目录下后缀是.mat的所有文件
print(f'当前路径下的文件有:{[x for x in os.listdir()]}')
4. 计算并输出数据
for matdir in matdirs:
filename = matdir #需要把这个文件名也存到数据库重
print(f'开始处理{matdir}')
m = loadmat(filename) #读取mat文件
if 'Signal' in m.keys():
data1 = pd.DataFrame(m['Signal'][0])
else:
data1 = pd.DataFrame(m['Signal_0'][0])
datay = pd.DataFrame(data1['y_values'][0][0])
datay1 = pd.DataFrame(datay['values'][0]) #这里就是所有振动数据的合集了
dataname1 = pd.DataFrame(data1['function_record'][0][0])
dataname2 = pd.DataFrame(dataname1['name'][0]).T
dataname2['测点位置'] = dataname2[0].apply(lambda a: ''.join(list(a))) # 包含测点的信息
dataname2.drop(columns=[0], inplace=True)
datay1.columns = list(dataname2['测点位置'])
testdata = pd.DataFrame(dataname1['creation_time'][0][0]) #获取文件创建时间
dataname2['创建时间'] = testdata[0].apply(lambda a: ''.join(list(a)))
testdata= pd.DataFrame(dataname1.last_modification_time[0][0]) #获取文件最后修改时间信息
dataname2['修改时间'] = testdata[0].apply(lambda a: ''.join(list(a)))
testdata= pd.DataFrame(dataname1.weighting[0][0]) #获取文件最后修改时间信息
dataname2['加权函数'] = testdata[0].apply(lambda a: ''.join(list(a)))
dataname2['数采通道'] = [x[0][0]for x in list(pd.DataFrame(pd.DataFrame(dataname1.primary_channel[0][0])['id'][0][0])[0])]
dataname2['窗函数'] = pd.DataFrame(pd.DataFrame(pd.DataFrame(dataname1.energy_amplitude_transform[0][0])['windows'][0][0])['type'][0][0])[0].apply(lambda a: ''.join(list(a)))
dataname2['项目文件'] = pd.DataFrame(pd.DataFrame(dataname1['TL_export_properties_annotation'][0][0])['project_name'][0][0])[0].apply(lambda a: ''.join(list(a)))
dataname2['section文件'] = pd.DataFrame(pd.DataFrame(dataname1['TL_export_properties_annotation'][0][0])['section_name'][0][0])[0].apply(lambda a: ''.join(list(a)))
dataname2['run目录'] = pd.DataFrame(pd.DataFrame(dataname1['TL_export_properties_annotation'][0][0])['run_name'][0][0])[0].apply(lambda a: ''.join(list(a)))
dataname2['实际开始测时间'] = pd.DataFrame(pd.DataFrame(dataname1['TL_export_properties_annotation'][0][0])['absolute_time'][0][0])[0].apply(lambda a: ''.join(list(a)))
dataname2['通道类型'] = pd.DataFrame(pd.DataFrame(dataname1['TL_export_properties_annotation'][0][0])['channel_group'][0][0])[0].apply(lambda a: ''.join(list(a)))
dataname2['文件路径'] = pd.DataFrame(pd.DataFrame(dataname1['TL_export_properties_annotation'][0][0])['orig_location'][0][0])[0].apply(lambda a: ''.join(list(a)))
datax = pd.DataFrame(data1['x_values'][0][0])
startvalue = datax.start_value[0][0][0] #时域开始点
increment = datax.increment[0][0][0] #步长
datalen = datax.number_of_values[0][0][0] #时间长度,这个值也可以通过 len(datay1)求出
t = len(datay1)*increment+startvalue #信号长度,这个可以用来校核信号是否取完整了
datainfo = pd.DataFrame(datax.quantity[0][0])
info = datainfo['info'][0][0] #这是显示data保存的信息
dataname2['采集开始点'] = startvalue
dataname2['步长'] = increment
dataname2['采样频率'] = 1/increment
dataname2['样本长度'] = datalen
dataname2['官方信息'] = info
### 经纬度信息
latlong = pd.DataFrame(m['Signal_1'][0][0][1][0][0][0], columns = ['维度', '精度']) # 经纬度信息
factor = pd.DataFrame(m['Signal_1'][0][0][1][0][0][1][0][0][1][0])['factor'][0][0][0] #因子
log_reference = pd.DataFrame(m['Signal_1'][0][0][1][0][0][1][0][0][1][0])['log_reference'][0][0][0] #我也不知道是啥
offset = pd.DataFrame(m['Signal_1'][0][0][1][0][0][1][0][0][1][0])['offset'][0][0][0]
unit = m['Signal_1'][0][0][1][0][0][1][0][0][0][0] #经纬度单位
speed = pd.DataFrame(m['Signal_4'][0][0][1][0][0][0]*3.6, columns=['车速']) # gps车速,单位m/s
altitude = pd.DataFrame(m['Signal_2'][0][0][1][0][0][0], columns = ['altitude'])# Altitulde 海拔信息
data_lat_log_velocity = pd.concat([latlong, speed, altitude], axis=1)
# m['Signal_3'] # 东速度 西速度 啥的 不用管它,不读
# m['Signal_5'] #卫星数
# m['Signal_6'] #can转速,导不出来
# 进行低通滤波
datay1_lp = datay1.apply(mylowpass, axis=0)
# 计算常规评价指标如RMS等
l_rms = [] # 求RMS值
l_zj = [] # 求振级
l_max = [] #求最大值
l_mean = [] # 求均值
l_std = [] # 求均方差
l_min = [] # 求最小值
l_true_max = [] #求绝对值最大值
for col in datay1_lp.columns:
data_describle = datay1_lp[col].describe()
rms = get_rms(datay1_lp[col])
l_rms.append(float('%.3f'%(rms)))
l_zj.append(float('%.3f'%(get_max_level(rms))))
l_max.append('%.3f'%data_describle['max'])
l_mean.append('%.3f'%data_describle['mean'])
l_std.append('%.3f'%data_describle['std'])
l_min.append('%.3f'%data_describle['min'])
l_true_max.append('%.3f'%abs(datay1_lp[col]).max())
data_result = pd.DataFrame([list(dataname2['测点位置']), l_rms, l_zj, l_max, l_mean, l_std, l_min, l_true_max])
data_result = data_result.T
data_result.columns = ['测点', 'RMS', '振级','正向最大值','平均值','方差','最小值','绝对值最大值']
#计算RMS综合值
data_result_v1 = data_result.copy()
data_result_v1['RMS_综合'] = 0
data_result_v1['振级_综合'] = 0
for row in range(int(len(data_result) / 3)):
data_result_v1.loc[3*row+2,['RMS_综合']] = '%.3f'%(np.sqrt(data_result.loc[3*row,'RMS']**2+data_result.loc[3*row+1,'RMS']**2+data_result.loc[3*row+2,'RMS']**2))
data_result_v1.loc[3*row+2,['振级_综合']] = '%.3f'%(np.sqrt(data_result.loc[3*row,'振级']**2+data_result.loc[3*row+1,'振级']**2+data_result.loc[3*row+2,'振级']**2))
# 做傅里叶变换
# def myfft()
datay1.apply(myfft, args=(dataname2.loc[0,'步长'], len(datay1),),axis=0)
data_result_v1['平均车速'] = speed.mean()
dataiii = pd.DataFrame(datay1.apply(myfft, args=(dataname2.loc[0,'步长'], len(datay1),),axis=0))
data_result_v1['最大加速度_a'] = list(pd.DataFrame(dataiii)[0].apply(lambda a:a[0][0]))
data_result_v1['最大加速度_Hz'] = list(pd.DataFrame(dataiii)[0].apply(lambda a:a[1][0]))
#保存文件
data_result_v1.to_excel(dataname2['section文件'][1]+'_'+dataname2['run目录'][1]+'.xlsx',index=False, dtype='float64')
print(f'{matdir}已经处理完毕')
print('Successed!!')
- 振动时域信号放在
datay1
中
- 信号相关信息存放在
dataname2
中
- 输出结果放在
data_result_v1
中
根据业务需求制定相关方法,代入脚本计算出相关结果。
我个人认为上面的自动化脚本好处还是有的。 - 输出的三张表分别存储了时域数据,数据信息,输出结果。其中数据信息和输出结果的索引都是通道,所以他们可以直接关联的。
时域数据列标签就是其他两张表的行标签,也可以关联。针对上述的数据,我大概做了两个数据库的存储模型,非常简陋。 - 数据库模型一
{
"_id" : ObjectId("5e73f335d913cc99e304489d"),
"测点" : "the test point",
"RMS" : "2.323",
"振级" : "127.321",
"正向最大值" : "11.193",
"平均值" : "-0.006",
"方差" : "2.323",
"最小值" : "-12.654",
"绝对值最大值" : "12.654",
"测点位置" : "the test point",
"创建时间" : "2020/03/07 17:37:57",
"修改时间" : "2020/03/07 17:37:57",
"加权函数" : "NONE",
"数采通道" : "3",
"窗函数" : "UNKNOWN",
"项目文件" : "Project1",
"section文件" : "the section name",
"run目录" : "平直路40km_h 1",
"实际开始测时间" : "2020-03-07 09:37:52 ms 726.999",
"通道类型" : "Vibration",
"文件路径" : "filepath" #
"采集开始点" : "1.2251250597895408e-05",
"步长" : "7.8125e-05",
"采样频率" : "12800.0",
"样本长度" : "301940",
"官方信息" : "Data saved by Testlab in MKS",
"mat文件地址" : "J:\\testlabfile", #对应输出的mat文件
"mat文件" : "3.mat"
}
- 数据库模型二
"result" : [
{
resultinfo,
},... ]
...
"info": {...}
存放在数据库中的好处有很多,比如方便crud(增删查改),数据共享……等等。
5. 后续计划
-
共享数据
后续主要是打算通过api的方式共享数据,可能选择flask,fastapi,echarts这些比较简单亲民的框架。 -
数据挖掘
通过使用sklearn,XGBOOST,pytorch,paddlepaddle做一些数据分析。
🤦😂😂🤦,求不要嘲笑。