所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 nixgnauhcuy's blog!
如需转载,请标明出处!
Beautiful is better than ugly.
前言
继续上一篇使用 python 制作串口工具(一),完成要实现的串口工具代码逻辑!
实现
最终效果:
获取接入的 com 口
首先,我们先实现获取电脑当前所接入的串口,实现效果为:每次点击 Combo Box,就把当前电脑接入的串口号信息读取出来。所以我们需要对 Combo Box 这个控件进行重写,这里新建个 my_combobox.py,代码如下:
my_combobox.py
import serial
import serial.tools.list_ports
from PyQt5.QtWidgets import QComboBox
class My_ComBoBox(QComboBox):
def __init__(self, parent = None):
super(My_ComBoBox,self).__init__(parent)
# 重写showPopup函数
def showPopup(self):
# 先清空原有的选项
self.clear()
index = 1
# 获取接入的所有串口信息,插入combobox的选项中
port_list = self.get_port_list(self)
if port_list is not None:
for i in port_list:
self.insertItem(index, i)
index += 1
QComboBox.showPopup(self)# 弹出选项框
@staticmethod
# 获取接入的COM
def get_port_list(self):
try:
port_list = list(serial.tools.list_ports.comports())
for port in port_list:
yield str(port)
except Exception as err:
print("获取接入的串口设备出错!错误信息为:" + str(err))
上面实现了每次点击 Combo Box 后获取所有接入串口的信息。文件创建好并且写入代码后,我们需要引用它,回到 UI 文件 Ui_uart_tool_ui.py,引入刚才所写的内容,
搞定后,我们重新运行下代码,看看效果吧,这里我调试的时候刚好旁边没有串口,所以直接拿了两个 Jlink 演示一下。(我现在是有空的时候写一点,所以偶尔所处的地方,装备有可能不齐全,请谅解!),效果如下:
可以看到,每次点击,都会刷新获取一次当前接入的串口,无论是新接入还是刚断开的串口,在每次点击后都会刷新。
实现串口底层接口
上面完成后,我们先来实现串口底层的接口,后面在应用逻辑交互上,需要对串口底层操作,串口底层单独新建个文件来写, 这里我命名为 uart.py,部分代码如下:
uart.py
class Uart_Recv_Data_Thread(threading.Thread):
def __init__(self, cur_self, main_self):
super(Uart_Recv_Data_Thread, self).__init__()
self.cur_self = cur_self
self.thread = threading.Event()
self.main_self = main_self
def stop(self):
self.thread.set()
def stopped(self):
return self.thread.is_set()
def run(self):
while True:
time = ''
if self.stopped():
break
try:
if False == self.cur_self.recv_queue.empty():
show_data = ''
data = self.cur_self.recv_queue.get()
data_num = len(data)
if self.cur_self.uart_time_stamp_flag == 1:# 时间戳开关打开
time = datetime.datetime.now().strftime('[%Y-%m-%d %H:%M:%S:%f]\r\n')
if self.cur_self.uart_rec_hex_lock == 1:
data_list = []
data_bytes = bytes(data, encoding='utf-8')
for i in range(len(data_bytes)):
data_list.append(hex(data_bytes[i])[2:].zfill(2))
send_text_to_hex = ' '.join(data_list)
show_data += send_text_to_hex
else:
show_data = data
self.main_self.uart_recv_updata_show_data_signal.emit(time + show_data + '\r\n')
# 统计接收字符的数量
self.main_self.uart_updata_recv_num_signal.emit(data_num)
nums = self.cur_self.serial.inWaiting()
if (nums > 0):
recv_msg = self.cur_self.serial.read(nums)
else:
continue
if self.cur_self.recv_queue.full():
self.cur_self.recv_queue.get()
self.cur_self.recv_queue.put(recv_msg.decode())
except Exception as e:
print(e)
continue
class Uart_Send_Data_Thread(threading.Thread):
def __init__(self, cur_self, main_self):
super(Uart_Send_Data_Thread, self).__init__()
self.cur_self = cur_self
self.main_self = main_self
self.thread = threading.Event()
def stop(self):
self.thread.set()
def stopped(self):
return self.thread.is_set()
def run(self):
while True:
if self.stopped():
break
try:
if not self.cur_self.send_queue.empty():
send_data = self.cur_self.send_queue.get(False)
data_num = len(send_data)
# 统计发送字符的数量
self.main_self.uart_updata_send_num_signal.emit(data_num)
#ascii 发送
self.cur_self.serial.write(send_data)
else:
continue
except queue.Empty:
continue
class Uart(object):
def __init__(self, parent):
self.err = 0
self.parent = parent
self.recv_queue = queue.Queue(1000)
self.send_queue = queue.Queue(1000)
self.uart_time_stamp_flag = 0
self.uart_rec_hex_lock = 0
def uart_init(self, port, baud, stopbit, databit, checkbit):
try:
checkbitlist = {'None': 'N', 'Odd': 'O', 'Even': 'E'}
stopbitlist = {'1': 'serial.STOPBITS_ONE', '1.5': 'serial.STOPBITS_ONE', '2': 'serial.STOPBITS_ONE'}
self.serial = serial.Serial(port.split()[0], baud, int(databit), checkbitlist[checkbit], serial.STOPBITS_ONE)
# 创建线程
self.recv_thread = Uart_Recv_Data_Thread(self, self.parent)
self.send_thread = Uart_Send_Data_Thread(self, self.parent)
self.err = 0
except Exception as e:
print(e)
self.err = -1
def open_uart_thread(self):
self.recv_thread.start()
self.send_thread.start()
def close_uart_thread(self):
self.recv_thread.stop()
self.send_thread.stop()
self.serial.close()
def uart_send_func(self, data):
self.send_queue.put(data)
上面实现了对串口的配置初始化,并且设置了两个线程,一个线程用于接收,一个线程用于发送,为了保证数据不会丢失,这里还用到了队列,保证即使快速发送时,也不会丢失数据。
界面逻辑
底层接口完成后,接下来要做界面的逻辑,先从基础的发送和接收的显示开始,要实现这两点,首先要完成的就有串口的打开和关闭
、发送处理
和接收显示
。
串口的打开和关闭
回到 main.py,先来实现串口打开和关闭,设置一个标志,作为判断串口是否运行的标志,这里我将标志命名为 self.uart_com_run_statu
,然后我们要绑定该按纽的触发回调事件, self.uart_en_push_button.clicked.connect(self.uart_en_push_button_cb)
,在每次按下打开串口按钮后,我们获取 combo_box 当前内容的波特率、停止位、数据位等串口相关的信息,调用底层串口初始化 self.uart.uart_init(port, baud, stopbit, databit, checkbit)
传入这些内容,判断初始化是否成功,成功的话,将上面的运行标志置为 1,然后将打开串口按钮的文本设置为关闭串口,并且开启串口的线程,错误的话,说明有可能是串口已经被其他软件或应该打开,这里我们只要提示警告,不做任何处理就可以了。部分代码如下:
def uart_en_push_button_cb(self):
if self.uart_com_run_status == 0:
port = self.com_combo_box.currentText()
if port == '':
win32api.MessageBox(0, "请选择串口", "警告",win32con.MB_ICONWARNING)
return
baud = self.baud_combo_box.currentText()
stopbit = self.stopbit_combo_box.currentText()
databit = self.databit_combo_box.currentText()
checkbit = self.checkbit_combo_box.currentText()
self.uart.uart_init(port, baud, stopbit, databit, checkbit)
if self.uart.err == -1:
self.uart_com_run_status = 0
win32api.MessageBox(0, port+"已被使用", "警告",win32con.MB_ICONWARNING)
else:
self.uart_com_run_status = 1
self.uart.open_uart_thread()
self.uart_en_push_button.setText('关闭串口')
else:
self.uart_com_run_status = 0
self.uart.close_uart_thread()
self.uart_en_push_button.setText('打开串口')
串口开启成功,在此点击时关闭串口,这个时候将标志置为 0,并且关闭串口和串口线程,设置按钮文本为打开串口即可。
附上成功开启串口和失败开启串口的演示:
串口的发送
接下来实现串口的发送,发送这里,我们要知道发送的是 ascii 还是 hex 格式,所以我们要先判断当前勾选的发送格式,将之前 UI 的两个 radio button 分别分配一个点击回调事件,分别为 self.send_ascii_radio_button.toggled.connect(self.uart_ascii_to_hex_send_radio_button_cb)
和 self.send_hex_radio_button.toggled.connect(self.uart_hex_to_ascii_send_radio_button_cb)
,这两个回调事件实现了将发送编辑框的两种格式的相互转换,并且用 self.uart_send_hex_lock
作为当前格式的标志,代码如下:
def uart_ascii_to_hex_send_radio_button_cb(self):
if self.send_ascii_radio_button.isChecked() == True:
self.uart_send_hex_lock = 0
send_text = self.uart_send_show.toPlainText().replace(' ', '')
self.uart_send_show.clear()
hex_send_text = self.hex2bin(send_text)
self.uart_send_show.setText(hex_send_text)
else:
return
def uart_hex_to_ascii_send_radio_button_cb(self):
if self.send_hex_radio_button.isChecked() == True:
self.uart_send_hex_lock = 1
text_list = []
send_text = bytes(self.uart_send_show.toPlainText(), encoding='utf-8')
for i in range(len(send_text)):
text_list.append(hex(send_text[i])[2:])
send_text_to_hex = ' '.join(text_list)
self.uart_send_show.clear()
self.uart_send_show.setText(send_text_to_hex)
else:
return
随后,设置一下发送按键的回调事件 self.uart_send_push_button.clicked.connect(self.uart_send_push_button_cb)
,这里判断当前的格式,将编辑框的内容获取后,相应的处理后,调用底层接口发送出去,代码如下:
def uart_send_push_button_cb(self):
if self.uart_com_run_status == 0:
return
send_data = ''
send_text = self.uart_send_show.toPlainText()
if send_text == '':
return
if self.send_hex_radio_button.isChecked() == True: # 十六进制发送
hex_send_text = self.hex2bin(send_text.replace(' ', ''))
send_data = bytes(hex_send_text,encoding='utf-8')
else:
send_data = send_text.encode()
self.uart.uart_send_func(send_data)
附上,具体效果:
串口的接收
发送也成功后,开始实现接收,接收的话,同样需要将接收分为 ascii 和 hex 格式,所以同样要将接收 UI 的两个 radio button 分别分配一个点击回调事件,分别为 self.rec_ascii_radio_button.toggled.connect(self.uart_ascii_to_hex_rec_radio_button_cb)
和 self.rec_hex_radio_button.toggled.connect(self.uart_hex_to_ascii_rec_radio_button_cb)
,这两个回调事件实现了将接收编辑框接收的的两种格式的相互转换。具体代码如下:
def uart_ascii_to_hex_rec_radio_button_cb(self):
if self.rec_ascii_radio_button.isChecked() == True:
self.uart.uart_set_rec_hex_lock(0)
else:
return
def uart_hex_to_ascii_rec_radio_button_cb(self):
if self.rec_hex_radio_button.isChecked() == True:
self.uart.uart_set_rec_hex_lock(1)
else:
return
上面代码可以看到,这里单纯只是设定标志,因为处理是在底层串口接收线程中对这个标志做了处理,具体可以看上面串口底层的代码,这里就不再赘述了,显示接口通过信号的方式进行更新,设置回调事件 self.uart_recv_updata_show_data_signal.connect(self.update_uart_recv_show_cb)
,在里面获取接收的数据,数据是由接收线程处理好后发送过来的,所以这边的回调处理同样很简单,具体如下:
def update_uart_recv_show_cb(self, data):
self.uart_rec_show.insertPlainText(data)
cursor = self.uart_rec_show.textCursor()
self.uart_rec_show.moveCursor(cursor.End)
具体效果:
定时发送
接收搞定后,我们增加定时发送功能,定时发送功能需要用到定时器,同时要获取定时功能的开启和定时时间,相关的代码如下:
# 定时器
self.uart_timer_num = 1000
self.uart_timer_line_edit.setText('1000')
self.uart_timer_send = QTimer()
self.uart_timer_send.timeout.connect(self.uart_timer_send_cb)
def uart_timer_send_cb(self):
self.uart_send_push_button_cb()
上面代码可以看出,创建了个定时器,并且将事件回调绑定在了 uart_timer_send_cb
中,回调里实现的就是不断的调用发送事件的回调,定时器的打开则通过 check box 控件,具体代码如下:
def uart_time_en_check_box_cb(self):
if self.uart_com_run_status == 0:
self.uart_timer_check_box.setChecked(False)
return None
if self.uart_timer_check_box.isChecked() == True:
self.uart_timer_send.start(int(self.uart_timer_num))
else:
self.uart_timer_send.stop()
上述代码实现了,如果串口未运行,则不使用定时器,判断用户是否勾选该功能来开启和停止定时器。
串口关闭后也需要相应的关闭定时器,防止无用的调用发送。
具体效果如下:
其他
定时功能做好后,还有个时间戳的功能,时间戳则是获取当前时间,根据标志判断时间戳是否开启,开启的话,则添加到显示数据中,相关的代码在串口底层的接收线程中,并且功能也比较简单,就不再详细的说了。同样的,还有发送清除和接收清除等简单的小功能,也不浪费太多篇幅。具体实现可以访问我的 github 中的 python_uart_tool 库中,详细的代码都会提交在上边。
结语
python 串口逻辑代码的编写,就到这里结束了,这是一个简单的 demo,很多例如:限制输入内容、回显、快速发送时显示问题等功能我没有添加上去。
发现自己对写这种代码加解释的博文不是很熟悉,总感觉会有些地方不清除该怎样描述,而且我本身不是主学 python 的,所以难免会有出错或者你认为代码不合理的地方,这些也希望多多谅解和指教,我会及时修改的!
关于本篇的相关代码我也上传到 github 和 csdn 上去了,有兴趣的可以访问 github 或 csdn 查看,后面的代码也会在这个仓库中补充,不介意也可以点个 star。