其实网上已经有许多python语言书写的串口,但大部分都是python2写的,没有找到一个合适的python编写的串口助手,只能自己来写一个串口助手,由于我只需要串口能够接收读取数据就可以了,故而这个串口助手只实现了数据的接收读取。
创建串口助手首先需要创建一个类,重构类的实现过程如下:
- #coding=gb18030
-
- import threading
- import time
- import serial
-
- class ComThread:
- def __init__(self, Port='COM3'):
- #构造串口的属性
- self.l_serial = None
- self.alive = False
- self.waitEnd = None
- self.port = Port
- self.ID = None
- self.data = None
- #定义串口等待的函数
- def waiting(self):
- if not self.waitEnd is None:
- self.waitEnd.wait()
-
- def SetStopEvent(self):
- if not self.waitEnd is None:
- self.waitEnd.set()
- self.alive = False
- self.stop()
- #启动串口的函数
- def start(self):
- self.l_serial = serial.Serial()
- self.l_serial.port = self.port
- self.l_serial.baudrate = 115200
- #设置等待时间,若超出这停止等待
- self.l_serial.timeout = 2
- self.l_serial.open()
- #判断串口是否已经打开
- if self.l_serial.isOpen():
- self.waitEnd = threading.Event()
- self.alive = True
- self.thread_read = None
- self.thread_read = threading.Thread(target=self.FirstReader)
- self.thread_read.setDaemon(1)
- self.thread_read.start()
- return True
- else:
- return False
创建好类后,就要实现串口读取的功能,其读取数据的函数如下:
首先要创建一个字符串来存放接收到的数据:
- data = ''
- data = data.encode('utf-8')#由于串口使用的是字节,故而要进行转码,否则串口会不识别
在创建好变量来接收数据后就要开始接收数据:
- n = self.l_serial.inWaiting()#获取接收到的数据长度
- if n:
- #读取数据并将数据存入data
- data = data + self.l_serial.read(n)
- #输出接收到的数据
- print('get data from serial port:', data)
- #显示data的类型,便于如果出错时检查错误
- print(type(data))
将数据接收完后,就要对接收到的数据进行处理,提取出有用信息,由于下位机使用的协议不一样,因此处理的方法也不一样,我使用的协议是**+ID+*Data+*,因此我的提取方法如下:
- #获取还没接收到的数据长度
- n = self.l_serial.inWaiting()
- #判断是否已经将下位机传输过来的数据全部提取完毕,防止之前没有获取全部数据
- if len(data)>0 and n==0:
- try:
- #将数据转换为字符型
- temp = data.decode('gb18030')
- #输出temp类型,看转码是否成功
- print(type(temp))
- #输出接收到的数据
- print(temp)
- 将数据按换行分割并输出
- car,temp = str(temp).split("\n",1)
- print(car,temp)
-
- #将temp按':'分割,并获取第二个数据
- string = str(temp).strip().split(":")[1]
- #由于前面分割后的数据类型是列表,因此需要转换成字符串,而后按照'*'分割,得到的也就是我们需要的Id和data
- str_ID,str_data = str(string).split("*",1)
-
- print(str_ID)
- print(str_data)
- print(type(str_ID),type(str_data))
- #判断data最后一位是否是'*',若是则退出,若不是则输出异常
- if str_data[-1]== '*':
- break
- else:
- print(str_data[-1])
- print('str_data[-1]!=*')
- except:
- print("读卡错误,请重试!\n")
其输出结果为:
- get data from serial port:b'ID:4A622E29\n\xbf\xa8\xd6\xd0\xbf\xe924\xca\xfd\xbe\xdd\xce\xaa:1*80*\r\n'
- <class 'bytes'>
- <class 'str'>
- ID:4A622E29
- 卡中块24数据为:1*80*
-
- ID:4A622E29 卡中块24数据为:1*80*
- 80*
- <class 'str'> <class 'str'>
串口助手完整代码如下:
- #coding=gb18030
-
- import threading
- import time
- import serial
-
- class ComThread:
- def __init__(self, Port='COM3'):
- self.l_serial = None
- self.alive = False
- self.waitEnd = None
- self.port = Port
- self.ID = None
- self.data = None
-
- def waiting(self):
- if not self.waitEnd is None:
- self.waitEnd.wait()
-
- def SetStopEvent(self):
- if not self.waitEnd is None:
- self.waitEnd.set()
- self.alive = False
- self.stop()
-
- def start(self):
- self.l_serial = serial.Serial()
- self.l_serial.port = self.port
- self.l_serial.baudrate = 115200
- self.l_serial.timeout = 2
- self.l_serial.open()
- if self.l_serial.isOpen():
- self.waitEnd = threading.Event()
- self.alive = True
- self.thread_read = None
- self.thread_read = threading.Thread(target=self.FirstReader)
- self.thread_read.setDaemon(1)
- self.thread_read.start()
- return True
- else:
- return False
-
- def SendDate(self,i_msg,send):
- lmsg = ''
- isOK = False
- if isinstance(i_msg):
- lmsg = i_msg.encode('gb18030')
- else:
- lmsg = i_msg
- try:
- # 发送数据到相应的处理组件
- self.l_serial.write(send)
- except Exception as ex:
- pass;
- return isOK
-
- def FirstReader(self):
- while self.alive:
- time.sleep(0.1)
-
- data = ''
- data = data.encode('utf-8')
-
- n = self.l_serial.inWaiting()
- if n:
- data = data + self.l_serial.read(n)
- print('get data from serial port:', data)
- print(type(data))
-
- n = self.l_serial.inWaiting()
- if len(data)>0 and n==0:
- try:
- temp = data.decode('gb18030')
- print(type(temp))
- print(temp)
- car,temp = str(temp).split("\n",1)
- print(car,temp)
-
- string = str(temp).strip().split(":")[1]
- str_ID,str_data = str(string).split("*",1)
-
- print(str_ID)
- print(str_data)
- print(type(str_ID),type(str_data))
-
- if str_data[-1]== '*':
- break
- else:
- print(str_data[-1])
- print('str_data[-1]!=*')
- except:
- print("读卡错误,请重试!\n")
-
- self.ID = str_ID
- self.data = str_data[0:-1]
- self.waitEnd.set()
- self.alive = False
-
- def stop(self):
- self.alive = False
- self.thread_read.join()
- if self.l_serial.isOpen():
- self.l_serial.close()
- #调用串口,测试串口
- def main():
- rt = ComThread()
- rt.sendport = '**1*80*'
- try:
- if rt.start():
- print(rt.l_serial.name)
- rt.waiting()
- print("The data is:%s,The Id is:%s"%(rt.data,rt.ID))
- rt.stop()
- else:
- pass
- except Exception as se:
- print(str(se))
-
- if rt.alive:
- rt.stop()
-
- print('')
- print ('End OK .')
- temp_ID=rt.ID
- temp_data=rt.data
- del rt
- return temp_ID,temp_data
-
-
- if __name__ == '__main__':
-
- #设置一个主函数,用来运行窗口,便于若其他地方下需要调用串口是可以直接调用main函数
- ID,data = main()
-
- print("******")
- print(ID,data)
以上这篇对python3 Serial 串口助手的接收读取数据方法详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持w3xue。