打开串口后启动一个线程来监听串口数据的进入,有数据时,就做数据的处理。
用python写串口通信程序的示例:
#coding=gb18030
import sys,threading,time
import serial
import binascii,encodings
import re
import socket
class ReadThread:
def __init__(self, Output=None, Port=0, Log=None, i_FirstMethod=True):
self.l_serial = None
self.alive = False
self.waitEnd = None
self.bFirstMethod = i_FirstMethod
self.sendport = ''
self.log = Log
self.output = Output
self.port = Port
self.re_num = None
def waiting(self):
if not self.waitEnd is None:
self.waitEnd.wait()
def SetStopEvent(self):
if not self.waitEnd is None:
self.waitEnd.set()
self.alive = False
self.stop()
def start(self):
self.l_serial = serial.Serial()
self.l_serial.port = self.port
self.l_serial.baudrate = 9600
self.l_serial.timeout = 2
self.re_num = re.compile('\d')
try:
if not self.output is None:
self.output.WriteTex
if not self.log is None:
self.log.info
self.l_serial.open()
except Exception, ex:
if self.l_serial.isOpen():
self.l_serial.close()
self.l_serial = None
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.error(u'%s' % ex)
return False
if self.l_serial.isOpen():
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.info
self.waitEnd = threading.Event()
self.alive = True
self.thread_read = None
self.thread_read = threading.Thread(target=self.FirstReader)
self.thread_read.setDaemon(1)
self.thread_read.start()
return True
else:
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.info
return False
def InitHead(self):
try:
time.sleep(3)
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.info
self.l_serial.flushInput()
self.l_serial.write('\x11')
data1 = self.l_serial.read(1024)
except ValueError,ex:
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.error(u'%s' % ex)
self.SetStopEvent()
return
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.info
self.output.WriteText(u'===================================\r\n')
def SendData(self, i_msg):
lmsg = ''
isOK = False
if isinstance(i_msg, unicode):
lmsg = i_msg.encode('gb18030')
lmsg = i_msg
pass
except Exception, ex:
pass
return isOK
def FirstReader(self):
data1 = ''
isQuanJiao = True
isFirstMethod = True
isEnd = True
readCount = 0
saveCount = 0
RepPos = 0
#read Head Infor content
self.InitHead()
while self.alive:
try:
data = ''
n = self.l_serial.inWaiting()
if n:
data = data + self.l_serial.read(n)
#print binascii.b2a_hex(data),
for l in xrange(len(data)):
if ord(data[l])==0x8E:
isQuanJiao = True
continue
if ord(data[l])==0x8F:
isQuanJiao = False
continue
if ord(data[l]) == 0x80 or ord(data[l]) == 0x00:
if len(data1)>10:
if not self.re_num.search(data1,1) is None:
saveCount = saveCount + 1
if RepPos==0:
RepPos = self.output.GetInsertionPoint()
self.output.Remove(RepPos,self.output.GetLastPosition())
self.SendData(data1)
data1 = ''
continue
except Exception, ex:
if not self.log is None:
self.log.error(u'%s' % ex)
self.waitEnd.set()
self.alive = False
def stop(self):
self.alive = False
self.thread_read.join()
if self.l_serial.isOpen():
self.l_serial.close()
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.info
def printHex(self, s):
s1 = binascii.b2a_hex(s)
print s1
if __name__ == '__main__':
rt = ReadThread()
f = open("sendport.cfg", "r")
rt.sendport = f.read()
f.close()
try:
if rt.start():
rt.waiting()
rt.stop()
else:
pass
except Exception,se:
print str(se)
if rt.alive:
rt.stop()
print 'End OK .'
del rt
python里面使用serial库来操作串口,serial的使用流程跟平常的类似,也是打开、关闭、读、写 一般就是设置端口,波特率。 使用serial.Serial创建实体的时候会去打开串口,之后可以使用is_open开判断下是否串口是否打开正常。 使用ser.close即可关闭串口 数据的写使用ser.write接口,如果写的是十六进制的数据使用bytearray来定义,如 writebuf = bytearray([0x55, 0xaa, 0x00, 0x01, 0x00, 0x00])读数据使用ser.read接口,一般会先使用in_waiting来判断下是否有数据,然后开始读 下面举一个例子,说明下我们在实际的使用情况。 一般会单独创建一个进程来作为数据的接收,然后再配合上标记位或者信号量来处理逻辑Python非常适合写一些测试的脚本,如快速的串口通信测试等。如果使用VC++ QT开发,可能用时较多,使用python,如果掌握使用方法,可以直接读写测试,配合设备或是串口助手,很快验证与实现。Python有没有现成的串口API直接调用呢?经过实践验证,需要安装一个叫 Pyserial的组件即可。这个可以在github上下载。在windows 7 64bit 上可以使用吗?当然可以使用,我安装的python3.5为64位的。把下载后的文件,其中有一个serial的文件夹,拷贝到python35安装路径, C:\Python35\Lib\site-packages\serial网上可以搜一下windows的安装包,安装完也是:C:\Python35\Lib\site-packages\serial ,可以用最新的版本,替换即可。测试的方法:在python IDE里测试:>>>import serial这里如果报错,是python版本与pyserial版本没有配合好。如果正常,不返回,即可以导入serial模块。>>>ser=serial.Serial("COM5",115200)这里为COM5,115200的波特率。如果打不开,请检查安装环境。>>>ser.write('hello,serial test'.encode())17发送测试(如果返回字节数,说明返回成功),这里需要转换一个编码为字节。以上测试,可以使用现在的设备或是串口助手,如安装Virtual Serial Port Driver 7.2 虚拟串口软件,设置一对串口,进行自发自收的测试。>>>print(ser.read()line())b'abcdefg\r\n'这里是串口接收,有接收的超时。设备或是串口助手发送一个字符串,以回车换行结束,这里就可以收到打印出来。也可以用ser.read(),这里只接收一个字符来实现。上面已经实现了基本的串口操作。关闭串口为:>>>ser.close()如果使用python,一般写个py文件,就像windows bat 批处理一样,这是python强大的地方。如果写一个py脚本呢?其实只要把上面的命令,一条条写下来,就是一个脚本,测试如下:import serialser=serial.Serial("COM5",115200,timeout=0.5)for i in range(0,100-1):ser.write('hello\r\n'.encode())print(ser.readline())ser.close()