如何用python写个串口通信的程序?

Python016

如何用python写个串口通信的程序?,第1张

打开串口后启动一个线程来监听串口数据的进入,有数据时,就做数据的处理。

用python写串口通信程序的示例:

#coding=gb18030

import sys,threading,time

import serial

import binascii,encodings

import re

import socket

class ReadThread:

def __init__(self, Output=None, Port=0, Log=None, i_FirstMethod=True):

self.l_serial = None

self.alive = False

self.waitEnd = None

self.bFirstMethod = i_FirstMethod

self.sendport = ''

self.log = Log

self.output = Output

self.port = Port

self.re_num = None

def waiting(self):

if not self.waitEnd is None:

self.waitEnd.wait()

def SetStopEvent(self):

if not self.waitEnd is None:

self.waitEnd.set()

self.alive = False

self.stop()

def start(self):

self.l_serial = serial.Serial()

self.l_serial.port = self.port

self.l_serial.baudrate = 9600

self.l_serial.timeout = 2

self.re_num = re.compile('\d')

try:

if not self.output is None:

self.output.WriteTex

if not self.log is None:

self.log.info

self.l_serial.open()

except Exception, ex:

if self.l_serial.isOpen():

self.l_serial.close()

self.l_serial = None

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.error(u'%s' % ex)

return False

if self.l_serial.isOpen():

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.info

self.waitEnd = threading.Event()

self.alive = True

self.thread_read = None

self.thread_read = threading.Thread(target=self.FirstReader)

self.thread_read.setDaemon(1)

self.thread_read.start()

return True

else:

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.info

return False

def InitHead(self):

try:

time.sleep(3)

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.info

self.l_serial.flushInput()

self.l_serial.write('\x11')

data1 = self.l_serial.read(1024)

except ValueError,ex:

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.error(u'%s' % ex)

self.SetStopEvent()

return

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.info

self.output.WriteText(u'===================================\r\n')

def SendData(self, i_msg):

lmsg = ''

isOK = False

if isinstance(i_msg, unicode):

lmsg = i_msg.encode('gb18030')

lmsg = i_msg

pass

except Exception, ex:

pass

return isOK

def FirstReader(self):

data1 = ''

isQuanJiao = True

isFirstMethod = True

isEnd = True

readCount = 0

saveCount = 0

RepPos = 0

#read Head Infor content

self.InitHead()

while self.alive:

try:

data = ''

n = self.l_serial.inWaiting()

if n:

data = data + self.l_serial.read(n)

#print binascii.b2a_hex(data),

for l in xrange(len(data)):

if ord(data[l])==0x8E:

isQuanJiao = True

continue

if ord(data[l])==0x8F:

isQuanJiao = False

continue

if ord(data[l]) == 0x80 or ord(data[l]) == 0x00:

if len(data1)>10:

if not self.re_num.search(data1,1) is None:

saveCount = saveCount + 1

if RepPos==0:

RepPos = self.output.GetInsertionPoint()

self.output.Remove(RepPos,self.output.GetLastPosition())

self.SendData(data1)

data1 = ''

continue

except Exception, ex:

if not self.log is None:

self.log.error(u'%s' % ex)

self.waitEnd.set()

self.alive = False

def stop(self):

self.alive = False

self.thread_read.join()

if self.l_serial.isOpen():

self.l_serial.close()

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.info

def printHex(self, s):

s1 = binascii.b2a_hex(s)

print s1

if __name__ == '__main__':

rt = ReadThread()

f = open("sendport.cfg", "r")

rt.sendport = f.read()

f.close()

try:

if rt.start():

rt.waiting()

rt.stop()

else:

pass

except Exception,se:

print str(se)

if rt.alive:

rt.stop()

print 'End OK .'

del rt

python里面使用serial库来操作串口,serial的使用流程跟平常的类似,也是打开、关闭、读、写 一般就是设置端口,波特率。 使用serial.Serial创建实体的时候会去打开串口,之后可以使用is_open开判断下是否串口是否打开正常。 使用ser.close即可关闭串口 数据的写使用ser.write接口,如果写的是十六进制的数据使用bytearray来定义,如 writebuf = bytearray([0x55, 0xaa, 0x00, 0x01, 0x00, 0x00])读数据使用ser.read接口,一般会先使用in_waiting来判断下是否有数据,然后开始读 下面举一个例子,说明下我们在实际的使用情况。 一般会单独创建一个进程来作为数据的接收,然后再配合上标记位或者信号量来处理逻辑

Python非常适合写一些测试的脚本,如快速的串口通信测试等。如果使用VC++ QT开发,可能用时较多,使用python,如果掌握使用方法,可以直接读写测试,配合设备或是串口助手,很快验证与实现。Python有没有现成的串口API直接调用呢?经过实践验证,需要安装一个叫 Pyserial的组件即可。这个可以在github上下载。在windows 7 64bit 上可以使用吗?当然可以使用,我安装的python3.5为64位的。把下载后的文件,其中有一个serial的文件夹,拷贝到python35安装路径, C:\Python35\Lib\site-packages\serial网上可以搜一下windows的安装包,安装完也是:C:\Python35\Lib\site-packages\serial ,可以用最新的版本,替换即可。测试的方法:在python IDE里测试:>>>import serial这里如果报错,是python版本与pyserial版本没有配合好。如果正常,不返回,即可以导入serial模块。>>>ser=serial.Serial("COM5",115200)这里为COM5,115200的波特率。如果打不开,请检查安装环境。>>>ser.write('hello,serial test'.encode())17发送测试(如果返回字节数,说明返回成功),这里需要转换一个编码为字节。以上测试,可以使用现在的设备或是串口助手,如安装Virtual Serial Port Driver 7.2 虚拟串口软件,设置一对串口,进行自发自收的测试。>>>print(ser.read()line())b'abcdefg\r\n'这里是串口接收,有接收的超时。设备或是串口助手发送一个字符串,以回车换行结束,这里就可以收到打印出来。也可以用ser.read(),这里只接收一个字符来实现。上面已经实现了基本的串口操作。关闭串口为:>>>ser.close()如果使用python,一般写个py文件,就像windows bat 批处理一样,这是python强大的地方。如果写一个py脚本呢?其实只要把上面的命令,一条条写下来,就是一个脚本,测试如下:import serialser=serial.Serial("COM5",115200,timeout=0.5)for i in range(0,100-1):ser.write('hello\r\n'.encode())print(ser.readline())ser.close()