第一段代码实现功能:
1、将数据根据指定的协议转换成txt文件
2、将列表的数值的长度变为随机数
3、校验txt文件内容是否正确(这个比较鸡肋)后来发现winhex有搜索功能
4、建立TCP服务端发送指定数据,可以自定义数据量(可以优化一下检测对方啥时候断开连接)
第二段代码实现功能:
将以下格式的db文件的data9行转换成图像
CREATE TABLE member (
id INT PRIMARY KEY,
DATE VARCHAR (256),
[data1 ] VARCHAR (128),
[data2 ] VARCHAR (128),
[data3 ] VARCHAR (128),
[data4 ] VARCHAR (128),
[data5 ] VARCHAR (128),
[data6 ] VARCHAR (128),
[data7 ] VARCHAR (128),
[data8 ] VARCHAR (128),
[data9 ] VARCHAR (128)
);
第三段代码实现功能:
因为发送的数据是固定的格式一直循环这里根据造数据的公式将,对端处理过后的数据算出出来然后读取db文件在和前20列数据进行对比校验这样可以验证数据内容是否正确
import time
import socket
import struct
import math
import datetime
import threading
import random
def the_not(value, pos):
"""
指定位数取反
:param value:数据
:param pos: 位数
leng 传递二进制长度 - 1
将 获取到的数据转换为2进制后,将数据的最高位去除,
如数据长度未满足pos长度将会在高位补零,并将其余所
有位数0和1置换在从最高位补1并返回
:return:
"""
if value == 0:
value = 1
value = value - 1
value = "{0:b}".format(value)
a = ((pos * 8) - 1)
value = value.rjust(a, '0')
value = value.replace('1', '2')
value = value.replace('0', '1')
value = value.replace('2', '0')
value = '1' + value
value = hex((int(value, 2)))[2:]
value = value.rjust((pos * 2), '0')
return value
# -> object 描述返回值数据类型
def Override_override(ov: object, value: object, leng: object) -> object:
"""
更改字符串指定位置的字符
:param ov: 被更改的老字符串
:param value: 更改的值
:param leng: 起始位置
:return: 更改后的字符串
"""
len_ = int(len(value))
# print(f"ov值为:{ov},新的值为:{ov[:leng * 2] + value + ov[leng + (leng * 2):]}")
return ov[:leng * 2] + value + ov[len_ + (leng * 2):]
def float_to_hex(f):
return hex(struct.unpack('<I', struct.pack('<f', f))[0])
def type_hex(value_, type_, Positive_and_Negative=0) -> str:
"""
输入值和数据类型 默认为正数 并根据对应的解析方式返回16进制的值
:param value_: 值
:param type_: 数据类型
:param Positive_and_Negative: 正负项
:return: 16进制的值
"""
if Positive_and_Negative == 1:
if type_ == 'f':
data10 = float(value_)
data10hex = float_to_hex(data10)[2:]
data10hex = bin(int(data10hex, 16))[2:]
data10hex = hex(int('1' + data10hex.rjust(31, '0'), 2))[2:]
return data10hex.rjust(8, '0')
elif type_ == 'i':
data10 = int(value_)
return the_not(data10, 4)
elif type_ == 's':
data10 = int(value_)
return the_not(data10, 2)
elif type_ == 'c':
data10 = hex(int(value_))[2:]
return data10.rjust(2, '0')
elif Positive_and_Negative == 0:
if type_ == 'f':
data10 = float(value_)
data10hex = float_to_hex(data10)[2:]
return data10hex.rjust(8, '0')
elif type_ == 'i':
data8hex = hex(int(value_))[2:]
return data8hex.rjust(8, '0')
elif type_ == 's':
data8hex = hex(int(value_))[2:]
return data8hex.rjust(4, '0')
elif type_ == 'c':
data10 = hex(int(value_))[2:]
return data10.rjust(2, '0')
def type_hex_new(value_, type_, Positive_and_Negative=0) -> str:
"""
输入值和数据类型 默认为正数 并根据对应的解析方式返回 置返的16进制的值
:param value_:
:param type_:
:param Positive_and_Negative:
:return:
"""
hex_ = type_hex(value_, type_, Positive_and_Negative=Positive_and_Negative)
if type_ == 'f':
a = int(hex_[0:2], 16)
b = int(hex_[2:4], 16) * 256
c = int(hex_[4:6], 16) * (256 * 256)
d = int(hex_[6:8], 16) * (256 * 256 * 256)
max_ = hex(int(a + b + c + d))[2:]
max_ = max_.rjust(8, '0')
return max_
elif type_ == 'i':
a = int(hex_[0:2], 16)
b = int(hex_[2:4], 16) * 256
c = int(hex_[4:6], 16) * (256 * 256)
d = int(hex_[6:8], 16) * (256 * 256 * 256)
max_ = hex(int(a + b + c + d))[2:]
max_ = max_.rjust(8, '0')
return max_
# = '0x' + hex_[0:2]
elif type_ == 's':
a = int(hex_[0:2], 16)
b = int(hex_[2:4], 16) * 256
max_ = hex(int(a + b))[2:]
max_ = max_.rjust(4, '0')
return max_
elif type_ == 'c':
return hex_
def Generate_data(message_len: int, type_='00', models: str = '01') -> object:
"""
输入 数据长度 和数据类型,以及分机号,返回一个长度400的数组
data_type:更改为0 返回的数据为1024随机数据类型,1为自定义数据
:type type_: object
:param message_len: 报文长度 1 - 1024 int类型
:param type_: 报文类型 01 00 str
:param models: 分机号 为 01 - 04 str
:return:返回制造的数据数组
"""
message_list = []
data_num = 0
for z in range(-200, 201):
z = int(math.fabs(z))
a = int((z * z) / (2 * 100))
data_type = 1
if data_type == 1:
# char
Data1 = type_hex(str(z % 2), 'c')
# shoar
data2 = type_hex_new(str(31267 + a), 's')
# float
data3 = type_hex_new(str((z * z) / (2 * 100)), 'f')
# int
data4 = type_hex_new(str(100000 + (z * 100000)), 'i')
print(data4,'==',(100000 + (z * 100000)))
# char
data5 = type_hex(str(((z + 1) % 2) * 5), 'c')
# float
data6 = type_hex_new(str(0.99 + (z * 10)), 'f')
# shoar 负数 递增
data7 = type_hex_new(str(31267 - z), 's', Positive_and_Negative=1)
# int1
data8 = type_hex_new(str(800000 + (a * 50000)), 'i')
# int 负数
data9 = type_hex_new(str(100000 + (z * 100000)), 'i', Positive_and_Negative=1)
# float 负数
data10 = type_hex_new(str((z * z) / (2 * 100)), 'f', Positive_and_Negative=1)
# char 55
data11 = type_hex_new(str(z % 2 * 85), 'c')
# shoar 负数递减
data12 = type_hex_new(str(31267 + z), 's', Positive_and_Negative=1)
# float 改
data13 = type_hex_new(str(0.4 + (z * 2)), 'f')
# int 改
data14 = type_hex_new(str(100000 + (z * 83000)), 'i')
# float 改
data15 = type_hex_new(str(0.3 + (z * 0.1)), 'f')
# shoar
data16 = type_hex_new(str(150 * z), 's')
# char
data17 = type_hex_new(str(z % 2 * 165), 'c')
# shoar 负数
data18 = type_hex_new(str((30 * a)), 's')
# int
data19 = type_hex_new(str((a * 20000)), 'i')
# char
data20 = type_hex(str((z % 2) * 90), 'c')
Data_Sum = '0'
# 设置长度
Data_Sum = Data_Sum.ljust((2 * message_len), "0")
Data_Sum = Override_override(Data_Sum, Data1, 0)
Data_Sum = Override_override(Data_Sum, data2, 1)
Data_Sum = Override_override(Data_Sum, data3, 3)
Data_Sum = Override_override(Data_Sum, data4, 7)
Data_Sum = Override_override(Data_Sum, data5, 11)
Data_Sum = Override_override(Data_Sum, data6, 12)
Data_Sum = Override_override(Data_Sum, data7, 16)
Data_Sum = Override_override(Data_Sum, data8, 18)
Data_Sum = Override_override(Data_Sum, data9, 22)
Data_Sum = Override_override(Data_Sum, data10, 26)
Data_Sum = Override_override(Data_Sum, data11, 30)
Data_Sum = Override_override(Data_Sum, data12, 31)
Data_Sum = Override_override(Data_Sum, data13, 33)
Data_Sum = Override_override(Data_Sum, data14, 37)
Data_Sum = Override_override(Data_Sum, data15, 41)
Data_Sum = Override_override(Data_Sum, data16, 45)
Data_Sum = Override_override(Data_Sum, data17, 47)
Data_Sum = Override_override(Data_Sum, data18, 48)
Data_Sum = Override_override(Data_Sum, data19, 50)
Data_Sum = Override_override(Data_Sum, data20, 54)
for value_w in range(1, 18):
Data_Sum = Override_override(Data_Sum, Data_Sum[0:55 * 2], 55 * value_w)
start = 990
Data_Sum = Override_override(Data_Sum, Data1, start + 0)
Data_Sum = Override_override(Data_Sum, data2, start + 1)
Data_Sum = Override_override(Data_Sum, data3, start + 3)
Data_Sum = Override_override(Data_Sum, data4, start + 7)
Data_Sum = Override_override(Data_Sum, data5, start + 11)
Data_Sum = Override_override(Data_Sum, data6, start + 12)
Data_Sum = Override_override(Data_Sum, data7, start + 16)
Data_Sum = Override_override(Data_Sum, data8, start + 18)
Data_Sum = Override_override(Data_Sum, data9, start + 22)
Data_Sum = Override_override(Data_Sum, data10, start + 26)
Data_Sum = Override_override(Data_Sum, data11, start + 30)
Data_Sum = Override_override(Data_Sum, data12, start + 31)
Data_Sum = Override_override(Data_Sum, data20, start + 33)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024 * 2)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024 * 3)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024 * 4)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024 * 5)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024 * 6)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024 * 7)
elif data_type == 0:
data_num = data_num + 1
if data_num > 255:
break
# int 正
# data_1 = type_hex_new(str(100000 + (z * 100000)), 'i')
# # flot 负数
# data_1 = type_hex_new(str((z * z) / (2 * 100)), 'f', Positive_and_Negative=1)
# # char
# data_1 = type_hex(str(z % 2), 'c')
# data_1 = type_hex_new(str(31267 - z), 's', Positive_and_Negative=1)
# 报文长度
Data_Sum = str()
data_num1 = hex(data_num)[2:].rjust(2, '0')
# print(data_num)
for i in range(message_len + 1):
Data_Sum = Data_Sum + data_num1
# Data_Sum = Data_Sum.rjust(2,message_len, data_num)
# 填满1024
# for value_w in range(0, message_len, 2):
#
# Data_Sum = Override_override(Data_Sum, hex(value_w), 2)
# 设置报文类型
Message_type = type_
#
# if type_ == '00' and '01':
# Message_type = type_
# else:
# if z % 2 == 0:
# Message_type = '00'
# else:
# Message_type = '01'
# 设置长度
byte_len = int(message_len) * 2
Data_Sum = Data_Sum[0:byte_len]
Message_len = hex(int(byte_len / 2))[2:].rjust(4, '0')
# test = (z, '=======', '1', Data_Sum)
#
# with open('test.txt', 'a') as f:
# f.write(str(test) + '\n')
data_sum = ()
message = 'ffffffffffff' + Data_Sum + 'ffffffffffff'
# message = '01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 %s %s %s '\
# '00 00 00 00 00 00 %s 30 30' % (models, Message_type, Message_len, Data_Sum)
message_list.append(Data_Sum)
print(f'数据长度为{len(message_list[0]) / 2}Generate_data')
return message_list
def change_list_lengths(list_data, v_min, v_max):
# 生成随机的 n 值
# l = len(list_data)
# Value_len = len(list_data[0])
# str_len = l * Value_len
new_list = []
buf_value = ''
for i in reversed(list_data):
r = random.randint(v_min, v_max) * 2
buf_value = i + buf_value
while buf_value != '' and r < len(buf_value):
if r > len(buf_value):
break
new_list.append(buf_value[-r:])
buf_value = buf_value[:-r]
r = random.randint(v_min, v_max) * 2
if buf_value:
new_list.append(buf_value)
return new_list[::-1]
def abjust_list_length(lst, length):
if length < len(lst):
lst = lst[:length]
elif length > len(lst):
extra = length - len(lst)
multiplier = (extra // len(lst)) + 2
lst = (lst * multiplier)[:length]
return lst
def write_list_to_txt(lst, filename):
with open(filename, 'w') as file:
for item in lst:
file.write(item)
def write_data_to_file(data_list, file_name):
header = "GLSecurity".encode("utf-8")
footer = "GLSecurity".encode("utf-8")
interval = 0.2 # 包时间戳递增间隔,单位为秒
timestamps = time.time() - 86400 * 100
with open(file_name, "wb") as f:
packets = []
for i, data in enumerate(data_list):
current_timestamp = timestamps + i * interval
timestamp = time.strftime("%y %m %d %H %M %S", time.localtime(current_timestamp))
hex_time = ''.join(hex(int(num_str))[2:].zfill(2)for num_str in timestamp.split())
timestamp_bytes = bytes.fromhex(hex_time)
data_length = int(len(data)/2).to_bytes(2, byteorder='big')
data_length = bytearray([data_length[1],data_length[0]])
checksum = b"\x00\x00" # 校验位先预留两个字节,填充为00
packet = header + timestamp_bytes + data_length + bytes.fromhex(data) + checksum + footer
packets.append(packet)
if i % 10 == 9:
f.write(b''.join(packets))
f.flush()
packets.clear()
if packets:
f.write(b''.join(packets))
f.flush()
print(
f"总长度为:{len(packet)},header长度:{len(header)},时间戳长度:{len(timestamp_bytes)},len长度{len(data_length)},数据长度:{len(bytes.fromhex(data))},数据尾长度:{len(checksum)},footer长度:{len(footer)}")
print(f"数据已写入文件: {file_name}")
def test_write_data_to_file(data_list, file_name):
with open(file_name, "w") as f:
for i, data in enumerate(data_list):
f.write(data)
def split_string(string,n):
n = n * 2
substrings = []
while len(string) > n:
substring = string[:n]
substrings.append(substring)
string =string[n:]
if string:
substrings.append(string)
def package_data(data):
data_list = []
for index, value in enumerate(data):
datas = ascii_to_hex('##AABBFF')
# datas = datas + (hex(int(len(value) / 2))[2:].rjust(4, '0'))
# datas = datas + hex(len(data))[2:].rjust(4, '0')
# datas = datas + hex(index + 1)[2:].rjust(4, '0')
# 后续更改
datas = datas + ascii_to_hex(str(hex(int(len(value) / 2))[2:].rjust(4, '0')))
datas = datas + ascii_to_hex(str(hex(len(data))[2:].rjust(4, '0')))
datas = datas + ascii_to_hex(str(hex(index + 1)[2:].rjust(4, '0')))
datas = datas + value
datas = datas + ascii_to_hex('FFBBAA##')
data_list.append(datas)
return data_list
return package_data(substrings)
def ascii_to_hex(string):
hex_representation = ''
for char in string:
ascii_value = ord(char)
hex_value = hex(ascii_value)[2:]
hex_representation += hex_value + ''
return hex_representation.strip()
def count_data_in_txt(file_name, footer=b'GLSecurity'):
count = 0
with open(file_name, 'rb') as f:
while True:
packet = f.read()
if not packet:
break
if footer in packet:
count += packet.count(footer)
return count/2
class TcpServer:
def __init__(self, host, port, data):
self.host = host
self.port = port
self.server_soket = None
self.client_soket = None
self.data = data
def start(self):
self.server_soket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.server_soket.bind((self.host,self.port))
self.server_soket.listen(1)
print(f"Server listeing on {self.host}:{self.port}")
self.client_soket, addr = self.server_soket.accept()
print(f"Connected to {addr[0]}:{addr[1]}")
while True:
choice = input("输入 1 发送 0 退出")
if choice == "1":
data_list = self.send_messg()
file_name = input("输入保存文件名")
test_write_data_to_file(data_list, file_name + '.txt')
elif choice =='0':
break
else:
print("输入无效重新输入")
return data_list
def send_messg(self):
cont = 0
data_list = []
# num_ = 1
while True:
try:
num_ = int(input("请输入发包数量"))
# num_ = 10
break
except ValueError:
print("输入有误重新输入")
while cont < num_:
for i in self.data:
cont += 1
if cont > num_:
break
print(cont, 'time:', datetime.datetime.now())
for j in split_string(i, 1400):
self.client_soket.sendall(bytes.fromhex(j))
data_list.append(j)
# print(j)
time.sleep(0.05)
return data_list
if __name__ == '__main__':
data1 = Generate_data(10240, type_='01', models='04')
# print(type_hex_new(str(1024), 's'))
# prject = TcpServer('192.168.2.10', 10297,data1)
#
# data_list = prject.start()
# for i in data1:
# for j in split_string(i, 1400):
# print(j)
#
data1 = abjust_list_length(data1, 3000)
# data1 = change_list_lengths(data1, 1, 1474)
# print(data1)
test_write_data_to_file(data1, 'test22.txt')
# num = count_data_in_txt('D:\program_files\PlaybackSoftware\Config\协议自动解析\随机长度30000条.txt')
# print(f'报文数量为:{num}')
import sqlite3
import matplotlib.pyplot as plt
def get_data19():
connection = sqlite3.connect("data_2023_09_13_10_00_57.db")
cursor = connection.cursor()
# 查询"data19"列的所有数据
cursor.execute("SELECT data19 FROM member")
result = cursor.fetchall()
# 将查询结果存储到列表中
data19_list = [row[0] for row in result]
connection.close()
return data19_list
def plot_data(data):
x = range(len(data)) # x轴的数据点为列表的索引
y = data
plt.plot(x, y, marker='o') # 绘制折线图
plt.xlabel('Index') # 设置x轴标签
plt.ylabel('Value') # 设置y轴标签
plt.title('Plot of Data') # 设置图表标题
plt.show()
# 示例用法
data = get_data19()
print(data)
plot_data(data)
校验sql
import sqlite3
import math
def numberCheck(url):
con = sqlite3.connect(url)
cur = con.cursor()
# cur.execute("PRAGMA table_info([member])")
# tabName = cur.fetchall()
# for name in tabName:
# print(name[1])
# cur.execute("SELECT id FROM member WHERE type='table';")
# Tables = cur.fetchall()
# print(Tables)
res = cur.execute("SELECT * FROM member limit 3000")
a = res.fetchall()
con.close()
return a
swj = numberCheck("23-03-12-20-20-53-3.db")
def data_():
data = []
for z in range(-200, 201):
ddar = []
z = int(math.fabs(z))
a = int((z * z) / (2 * 100))
# char 1
ddar.append((bin(z % 2))[2:].rjust(8,'0'))
# shoar2
ddar.append(31267 + a)
# float3
ddar.append((z * z) / (2 * 100))
# int4
ddar.append(100000 + (z * 100000))
# char5
ddar.append(bin(((z + 1) % 2) * 5)[2:].rjust(8,'0'))
# float6
ddar.append(0.99 + (z * 10))
# shoar 负数 递增7
ddar.append(-(31267 - z))
# int8
ddar.append(800000 + (a * 50000))
# int 负数9
ddar.append(-(100000 + (z * 100000)))
# float 负数10
ddar.append(-((z * z) / (2 * 100)))
# char 55 11
ddar.append(bin((z % 2 * 85))[2:].rjust(8, '0'))
# shoar 负数递减12
ddar.append(-(31267 + z))
# float 改13
ddar.append(0.4 + (z * 2))
# int 改14
ddar.append(100000 + (z * 83000))
# float 改15
ddar.append(0.3 + (z * 0.1))
# shoar16
ddar.append(150 * z)
# char17
ddar.append((z % 2 * 165))
# shoar 负数18
ddar.append(((30 * a)))
# int19
ddar.append(a * 20000)
# char20
ddar.append(((z % 2) * 90))
ddar = ddar * 18
ddar = ddar + ddar[:12]
ddar.append(ddar[19])
# print(ddar[19])
# for i,j in enumerate(ddar):
# print(i,j)
ddar = ddar * 8
data.append(ddar)
return data + data
#
# for i in data_():
# print(i)
b = data_() * 8
dataError = []
print("实际发送的数据\t|\t上位机解析后的数据")
for i in range(3000):
charError = []
floatError = []
intErrot = []
for j in range(373):
test_data = swj[i][j + 2]
data = b[i][j]
if type(data) == str:
test_data = str(test_data)
if data == test_data:
...
else:
charError.append(j + 1)
print(f'源数据:{data},过滤数据:{test_data}\t列:{j + 1}\t第{i}条报文')
elif type(data) == float:
test_data = float(test_data)
if (math.fabs(data - test_data)) <0.01:
...
else:
floatError.append(j + 1)
print(f'源数据:{data},过滤数据:{test_data}\t列:{j + 1}\t第{i}条报文')
elif type(data) == int:
test_data = int(test_data)
if data == test_data:
...
else:
intErrot.append(j + 1)
print(f'源数据:{data},过滤数据:{test_data}\t列:{j + 1}\t第{i}条报文')
# print(f'charError:{charError},floatError:{floatError},intError:{intErrot}')
# for i in charError:
# print(f"char出错列:{i}")
# for i in floatError:
# print(f"float出错列:{i}")
# for i in intErrot:
# print(f"int出错列:{i}")
# print(f"{j + 1}\t{test_data}\t\t{data}")