python 制作串口工具(二)

个人博客

所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 nixgnauhcuy's blog

如需转载,请标明出处!

Beautiful is better than ugly.

前言

继续上一篇使用 python 制作串口工具(一),完成要实现的串口工具代码逻辑!


实现

最终效果:

image

获取接入的 com 口

首先,我们先实现获取电脑当前所接入的串口,实现效果为:每次点击 Combo Box,就把当前电脑接入的串口号信息读取出来。所以我们需要对 Combo Box 这个控件进行重写,这里新建个 my_combobox.py,代码如下:

my_combobox.py

import serial
import serial.tools.list_ports

from PyQt5.QtWidgets import QComboBox

class My_ComBoBox(QComboBox):

    def __init__(self, parent = None):
        super(My_ComBoBox,self).__init__(parent)

    # 重写showPopup函数
    def showPopup(self):
        # 先清空原有的选项
        self.clear()
        index = 1

        # 获取接入的所有串口信息,插入combobox的选项中
        port_list = self.get_port_list(self)
        if port_list is not None:
            for i in port_list:
                self.insertItem(index, i)
                index += 1
        QComboBox.showPopup(self)# 弹出选项框

    @staticmethod
    # 获取接入的COM
    def get_port_list(self):
        try:
            port_list = list(serial.tools.list_ports.comports())
            for port in port_list:
                yield str(port)
        except Exception as err:
            print("获取接入的串口设备出错!错误信息为:" + str(err))

上面实现了每次点击 Combo Box 后获取所有接入串口的信息。文件创建好并且写入代码后,我们需要引用它,回到 UI 文件 Ui_uart_tool_ui.py,引入刚才所写的内容,

image

搞定后,我们重新运行下代码,看看效果吧,这里我调试的时候刚好旁边没有串口,所以直接拿了两个 Jlink 演示一下。(我现在是有空的时候写一点,所以偶尔所处的地方,装备有可能不齐全,请谅解!),效果如下:

image

可以看到,每次点击,都会刷新获取一次当前接入的串口,无论是新接入还是刚断开的串口,在每次点击后都会刷新。

实现串口底层接口

上面完成后,我们先来实现串口底层的接口,后面在应用逻辑交互上,需要对串口底层操作,串口底层单独新建个文件来写, 这里我命名为 uart.py,部分代码如下:

uart.py

class Uart_Recv_Data_Thread(threading.Thread):
    def __init__(self, cur_self, main_self):
        super(Uart_Recv_Data_Thread, self).__init__()
        self.cur_self = cur_self
        self.thread = threading.Event()
        self.main_self = main_self

    def stop(self):
        self.thread.set()

    def stopped(self):
        return self.thread.is_set()

    def run(self):
        while True:
            time = ''
            if self.stopped():
                break
            try:
                if False == self.cur_self.recv_queue.empty():
                    show_data = ''
                    data = self.cur_self.recv_queue.get()
                    data_num = len(data)
                    if self.cur_self.uart_time_stamp_flag == 1:# 时间戳开关打开
                        time = datetime.datetime.now().strftime('[%Y-%m-%d %H:%M:%S:%f]\r\n')
                    
                    if self.cur_self.uart_rec_hex_lock == 1:
                        data_list = []
                        data_bytes = bytes(data, encoding='utf-8')
                        for i in range(len(data_bytes)):
                            data_list.append(hex(data_bytes[i])[2:].zfill(2))
                        send_text_to_hex = ' '.join(data_list)
                        show_data += send_text_to_hex
                    else:
                        show_data = data
                    
                    self.main_self.uart_recv_updata_show_data_signal.emit(time + show_data + '\r\n')

                    # 统计接收字符的数量
                    self.main_self.uart_updata_recv_num_signal.emit(data_num)

    
                nums = self.cur_self.serial.inWaiting()
                if (nums > 0):
                    recv_msg = self.cur_self.serial.read(nums)
                else:
                    continue
                if self.cur_self.recv_queue.full():
                    self.cur_self.recv_queue.get()
                self.cur_self.recv_queue.put(recv_msg.decode())

                
            except Exception as e:
                print(e)
                continue

class Uart_Send_Data_Thread(threading.Thread):
    def __init__(self, cur_self, main_self):
        super(Uart_Send_Data_Thread, self).__init__()
        self.cur_self = cur_self
        self.main_self = main_self
        self.thread = threading.Event()

    def stop(self):
        self.thread.set()

    def stopped(self):
        return self.thread.is_set()

    def run(self):
        while True:
            if self.stopped():
                break
            try:
                if not self.cur_self.send_queue.empty():
                    send_data = self.cur_self.send_queue.get(False)
                    data_num = len(send_data)
                    # 统计发送字符的数量
                    self.main_self.uart_updata_send_num_signal.emit(data_num)
                    #ascii 发送
                    self.cur_self.serial.write(send_data)
                else:
                    continue
            except queue.Empty:
                continue


class Uart(object):
    def __init__(self, parent):
        self.err = 0
        self.parent = parent

        self.recv_queue = queue.Queue(1000)
        self.send_queue = queue.Queue(1000)
        self.uart_time_stamp_flag = 0
        self.uart_rec_hex_lock = 0


    def uart_init(self, port, baud, stopbit, databit, checkbit):
        try:
            checkbitlist = {'None': 'N', 'Odd': 'O', 'Even': 'E'}
            stopbitlist = {'1': 'serial.STOPBITS_ONE', '1.5': 'serial.STOPBITS_ONE', '2': 'serial.STOPBITS_ONE'}
            self.serial = serial.Serial(port.split()[0], baud, int(databit), checkbitlist[checkbit], serial.STOPBITS_ONE)
            # 创建线程
            self.recv_thread = Uart_Recv_Data_Thread(self, self.parent)
            self.send_thread = Uart_Send_Data_Thread(self, self.parent)
            self.err = 0
        except Exception as e:
            print(e)
            self.err = -1


    def open_uart_thread(self):
        self.recv_thread.start()
        self.send_thread.start()
        

    def close_uart_thread(self):
        self.recv_thread.stop()
        self.send_thread.stop()
        self.serial.close()
    
    def uart_send_func(self, data):
        self.send_queue.put(data)

上面实现了对串口的配置初始化,并且设置了两个线程,一个线程用于接收,一个线程用于发送,为了保证数据不会丢失,这里还用到了队列,保证即使快速发送时,也不会丢失数据。

界面逻辑

底层接口完成后,接下来要做界面的逻辑,先从基础的发送和接收的显示开始,要实现这两点,首先要完成的就有串口的打开和关闭发送处理接收显示

串口的打开和关闭

回到 main.py,先来实现串口打开和关闭,设置一个标志,作为判断串口是否运行的标志,这里我将标志命名为 self.uart_com_run_statu,然后我们要绑定该按纽的触发回调事件, self.uart_en_push_button.clicked.connect(self.uart_en_push_button_cb),在每次按下打开串口按钮后,我们获取 combo_box 当前内容的波特率、停止位、数据位等串口相关的信息,调用底层串口初始化 self.uart.uart_init(port, baud, stopbit, databit, checkbit) 传入这些内容,判断初始化是否成功,成功的话,将上面的运行标志置为 1,然后将打开串口按钮的文本设置为关闭串口,并且开启串口的线程,错误的话,说明有可能是串口已经被其他软件或应该打开,这里我们只要提示警告,不做任何处理就可以了。部分代码如下:

def uart_en_push_button_cb(self):
        if self.uart_com_run_status == 0:
            port = self.com_combo_box.currentText()
            if port == '':
                win32api.MessageBox(0, "请选择串口", "警告",win32con.MB_ICONWARNING)
                return
            baud = self.baud_combo_box.currentText()
            stopbit = self.stopbit_combo_box.currentText()
            databit = self.databit_combo_box.currentText()
            checkbit = self.checkbit_combo_box.currentText()
            self.uart.uart_init(port, baud, stopbit, databit, checkbit)
            if self.uart.err == -1:
                self.uart_com_run_status = 0
                win32api.MessageBox(0, port+"已被使用", "警告",win32con.MB_ICONWARNING)
            else:
                self.uart_com_run_status = 1
                self.uart.open_uart_thread()
                self.uart_en_push_button.setText('关闭串口')
        else:
            self.uart_com_run_status = 0
            self.uart.close_uart_thread()
            self.uart_en_push_button.setText('打开串口')

串口开启成功,在此点击时关闭串口,这个时候将标志置为 0,并且关闭串口和串口线程,设置按钮文本为打开串口即可。

附上成功开启串口和失败开启串口的演示:

image

串口的发送

接下来实现串口的发送,发送这里,我们要知道发送的是 ascii 还是 hex 格式,所以我们要先判断当前勾选的发送格式,将之前 UI 的两个 radio button 分别分配一个点击回调事件,分别为 self.send_ascii_radio_button.toggled.connect(self.uart_ascii_to_hex_send_radio_button_cb)self.send_hex_radio_button.toggled.connect(self.uart_hex_to_ascii_send_radio_button_cb),这两个回调事件实现了将发送编辑框的两种格式的相互转换,并且用 self.uart_send_hex_lock 作为当前格式的标志,代码如下:

def uart_ascii_to_hex_send_radio_button_cb(self):
        if self.send_ascii_radio_button.isChecked() == True:
                self.uart_send_hex_lock = 0
                send_text = self.uart_send_show.toPlainText().replace(' ', '')
                self.uart_send_show.clear()
                hex_send_text = self.hex2bin(send_text)
                self.uart_send_show.setText(hex_send_text)
        else:
            return

def uart_hex_to_ascii_send_radio_button_cb(self):
    if self.send_hex_radio_button.isChecked() == True:
        self.uart_send_hex_lock = 1
        text_list = []
        send_text = bytes(self.uart_send_show.toPlainText(), encoding='utf-8')
        for i in range(len(send_text)):
            text_list.append(hex(send_text[i])[2:])
        send_text_to_hex = ' '.join(text_list)
        self.uart_send_show.clear()
        self.uart_send_show.setText(send_text_to_hex)
    else:
        return

随后,设置一下发送按键的回调事件 self.uart_send_push_button.clicked.connect(self.uart_send_push_button_cb),这里判断当前的格式,将编辑框的内容获取后,相应的处理后,调用底层接口发送出去,代码如下:

def uart_send_push_button_cb(self):
        if self.uart_com_run_status == 0:
            return
        send_data = ''
        send_text = self.uart_send_show.toPlainText()
        if send_text == '':
            return
        if self.send_hex_radio_button.isChecked() == True:  # 十六进制发送
            hex_send_text = self.hex2bin(send_text.replace(' ', ''))
            send_data = bytes(hex_send_text,encoding='utf-8')
        else:
            send_data = send_text.encode()
        self.uart.uart_send_func(send_data)

附上,具体效果:

image

串口的接收

发送也成功后,开始实现接收,接收的话,同样需要将接收分为 ascii 和 hex 格式,所以同样要将接收 UI 的两个 radio button 分别分配一个点击回调事件,分别为 self.rec_ascii_radio_button.toggled.connect(self.uart_ascii_to_hex_rec_radio_button_cb)self.rec_hex_radio_button.toggled.connect(self.uart_hex_to_ascii_rec_radio_button_cb),这两个回调事件实现了将接收编辑框接收的的两种格式的相互转换。具体代码如下:

def uart_ascii_to_hex_rec_radio_button_cb(self):
        if self.rec_ascii_radio_button.isChecked() == True:
            self.uart.uart_set_rec_hex_lock(0)
        else:
            return
    
    def uart_hex_to_ascii_rec_radio_button_cb(self):
        if self.rec_hex_radio_button.isChecked() == True:
            self.uart.uart_set_rec_hex_lock(1)
        else:
            return

上面代码可以看到,这里单纯只是设定标志,因为处理是在底层串口接收线程中对这个标志做了处理,具体可以看上面串口底层的代码,这里就不再赘述了,显示接口通过信号的方式进行更新,设置回调事件 self.uart_recv_updata_show_data_signal.connect(self.update_uart_recv_show_cb),在里面获取接收的数据,数据是由接收线程处理好后发送过来的,所以这边的回调处理同样很简单,具体如下:

def update_uart_recv_show_cb(self, data):
        self.uart_rec_show.insertPlainText(data)
        cursor = self.uart_rec_show.textCursor()
        self.uart_rec_show.moveCursor(cursor.End)

具体效果:

image

定时发送

接收搞定后,我们增加定时发送功能,定时发送功能需要用到定时器,同时要获取定时功能的开启和定时时间,相关的代码如下:

# 定时器
self.uart_timer_num = 1000
self.uart_timer_line_edit.setText('1000')
self.uart_timer_send = QTimer()
self.uart_timer_send.timeout.connect(self.uart_timer_send_cb)

def uart_timer_send_cb(self):
    self.uart_send_push_button_cb()

上面代码可以看出,创建了个定时器,并且将事件回调绑定在了 uart_timer_send_cb 中,回调里实现的就是不断的调用发送事件的回调,定时器的打开则通过 check box 控件,具体代码如下:

def uart_time_en_check_box_cb(self):
    if self.uart_com_run_status == 0:
        self.uart_timer_check_box.setChecked(False)
        return None

    if self.uart_timer_check_box.isChecked() == True:
        self.uart_timer_send.start(int(self.uart_timer_num))
    else:
        self.uart_timer_send.stop()

上述代码实现了,如果串口未运行,则不使用定时器,判断用户是否勾选该功能来开启和停止定时器。

串口关闭后也需要相应的关闭定时器,防止无用的调用发送。

具体效果如下:

image

其他

定时功能做好后,还有个时间戳的功能,时间戳则是获取当前时间,根据标志判断时间戳是否开启,开启的话,则添加到显示数据中,相关的代码在串口底层的接收线程中,并且功能也比较简单,就不再详细的说了。同样的,还有发送清除和接收清除等简单的小功能,也不浪费太多篇幅。具体实现可以访问我的 github 中的 python_uart_tool 库中,详细的代码都会提交在上边。

结语

python 串口逻辑代码的编写,就到这里结束了,这是一个简单的 demo,很多例如:限制输入内容、回显、快速发送时显示问题等功能我没有添加上去。

发现自己对写这种代码加解释的博文不是很熟悉,总感觉会有些地方不清除该怎样描述,而且我本身不是主学 python 的,所以难免会有出错或者你认为代码不合理的地方,这些也希望多多谅解和指教,我会及时修改的!

关于本篇的相关代码我也上传到 github 和 csdn 上去了,有兴趣的可以访问 github csdn 查看,后面的代码也会在这个仓库中补充,不介意也可以点个 star。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容