多线程和队列

Python025

多线程和队列,第1张

1、python提供两种方式使用多线程:一个是基于函数:_thread模块或者threading模块。一个是基于类:theading.Thread

使用多线程函数包装线程对象:_thread

_thead.start_new_thead(func,*args,**kwargs)

args,**kwargs是被包装函数的入参,必须传入元祖或字典

使用多线程函数包装线程对象:threading

threading._start_new_thread(func,*args,**kwargs):开启线程,带元祖或字典

threading.currentThread():返回当前线程变量

threading.enumerate():正在运行的线程列表,不含未启动和已结束线程

threading.activeCount():返回正在运行的线程数量

threading.settrace(func):为所有threading模块启动的线程设置追踪函数,在调用run方法之前,func会被传给追踪函数

threading.setprofile(func):为所有threading模块启动的线程设置性能测试函数,也是在run方法调用前就传递给性能测试函数

使用多线程类包装线程对象:threading.Thread

Thread类提供以下方法:

run():表示线程活动的方法,线程需要控制些什么活动都在这里面定义。当线程对象一但被创建,其活动一定会因调用线程的 start() 方法开始。这会在独立的控制线程调用 run() 方法。

start():开启线程活动

join():等待线程中止,阻塞当前线程直到被调用join方法的线程中止。线程A调用线程B的join方法,那线程A将会被阻塞至线程B中止。

isAlive():返回线程是否还活动

getName():获取线程名字

setName():设置线程名字

Lock对象:实例化线程锁,包含acquire方法获取锁 和 release 方法释放锁,在最开始创建锁的时候,锁为未锁定状态,调用acquire方法后锁置为锁定状态,此时其他线程再调用acquire方法就将会被阻塞至其他线程调用release方法释放锁,如果释放一个并未被锁定的锁将会抛出异常。支持上下文管理协议,直接with lock 无需调用锁定,释放方法

Rlock对象:重入锁,相比lock增加了线程和递归的概念。比如:线程目标函数F,在获得锁之后执行函数G,但函数G也需要先获得锁,此时同一线程,F获得锁,G等待,F等待G执行,就造成了死锁,此时使用rlock可避免。一旦线程获得了重入锁,同一个线程再次获取它将不阻塞;但线程必须在每次获取它时释放一次。

daemon属性:设置该线程是否是守护线程,默认为none,需要在调用start方法之前设置好

事件对象:一个线程发出事件信号 ,其他线程收到信号后作出对应活动。实例化事件对象后,初始事件标志为flase。调用其wait方法将阻塞当前所属线程,至事件标志为true时。调用set方法可将事件标志置为true,被阻塞的线程将被执行。调用clear方法可将事件标志置为flase

注意点:

1、继承threading.Thread类,初始化时要记得继承父类的__init__方法

2、run()方法只能有一个入参,故尽量把启动线程时的参数入参到初始化的时候

3、锁要设定全局的,一个子线程获得一个锁没有意义

以下实例:有一个列表,线程A从尾到头遍历元素,线程B从头到尾将元素值重置为1,设置线程锁之前线程A遍历到头部的数据已经被修改,设置线程锁之后不会再有数据不一致的情况

import threading,time

class tt(threading.Thread):

    def __init__(self,name,func,ll):

        threading.Thread.__init__(self) #继承父级的初始化方法

        self.name=name

        self.func=func  #run方法只能带一个入参,故把方法入参到初始化的时候

        self.ll=ll

    def run(self):

        print(self.name)

        threadlock.acquire() #获得锁

        self.func(self.ll)

        threadlock.release() #释放锁

def readd(x):

    a=len(x)

    while a>0:

        print(x[a-1])

        a-=1

def sett(x):

    for i in range(len(x)):

        x[i]=1

    print(x)

if __name__=="__main__":

    l = [0,0,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]

    threadlock=threading.Lock() #实例化全局锁

    th1=tt("read",readd,l)

    th2=tt("set",sett,l)

    th1.start()

    th2.start()

    th_list=[] 

    th_list.append(th1)

    th_list.append(th2)

    for li in th_list:

        li.join()        #主线程被阻塞,直到两个子线程处理结束

    print("主线程结束")

2、队列

queue模块包含queue.Queue(maxsize=0)先入先出队列,queue.LifoQueue()先入后出队列,和queue.PriorityQueue()优先级可设置的队列

Queue 模块中的常用方法:

Queue.qsize() 返回队列的大小,获取的数据不可靠,因为一直有线程在操作队列,数据一直变化

Queue.empty() 如果队列为空,返回True,反之False

Queue.full() 如果队列满了,返回True,反之False

Queue.full 与 maxsize 大小对应

Queue.put(block=true,timeout=none) 将item数据写入队列,block=True,设置线程是否阻塞,设置阻塞当队列数据满了之后就会阻塞,一直到队列数据不满时继续添加,如果设置不阻塞,当队列满了就会一直到timeout到后报错

Queue.get([block[, timeout]]) 取出队列数据,block=True,设置线程是否阻塞。设置阻塞,将会等待直到队列不为空有数据可取出,设置不阻塞直到超过timeout等待时间后报错

Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号

Queue.join() 实际上意味着等到队列为空,再执行别的操作。会在队列有未完成时阻塞,等待队列无未完成的任务,取出数据get()之后还需要配置task_done使用才能让等待队列数-1

import queue,time

import threading

q=queue.Queue(maxsize=5)

def sett():

    a=0

    while a<20:

        q.put(a,True)

        print("%d被put"%a)

        a+=1

def gett():

    time.sleep(1)

    while not q.empty(): #只要队列没空,一直取数据

        print("%d被取出"%q.get(True))

        q.task_done() #取出一次数据,将未完成任务-1,不然使用join方法线程会一直阻塞

if __name__=="__main__":

    th1=threading._start_new_thread(sett,()) #不带参数也要传入空元祖不然会报错

    th2=threading._start_new_thread(gett,())

    time.sleep(1) #延时主线程1S,等待put线程已经put部分数据到队列

    q.join()#阻塞主线程,直到未完成任务为0

在线程世界⾥,⽣产者就是⽣产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果⽣产者处理速度很快,⽽消费者处理速度很慢,那么⽣产者就必须等待消费者处理完,才能继续⽣产数据。同样的道理,如果消费者的处理能⼒⼤于⽣产者,那么消费者就必须等待⽣产者。为了解决这个问题于是引⼊了⽣产者和消费者模式。

⽣产者消费者模式是通过⼀个容器来解决⽣产者和消费者的强耦合问题。⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取,阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒。

比如,对于同时爬取多个网页的多线程爬虫,在某一时刻你可能无法保证他们在处理不同的网站,在某些时刻他们极有可能在处理相同的网站,这岂不浪费?为了解决这个问题,可以将不同网页的url放在queue中,然后多个线程来读取queue中的url进行解析处理,而queue只允许一次出一个,出一个少一个。相同网站上不同网页的url通常有某种规律,比如某个字段的数字加1,这种情况完全可以用这种模式,“生产者程序”负责根据规律把完整的url制作出来,再塞进queue里面(如果queue满了,则等待);“消费者程序(网页解析程序)”从queue的后面挨个取出url进行解析(如果queue里面是空的,则等待),即使是多线程也能保证每个线程得到的是不同的url。这个过程中,生产者和消费彼此互不干涉。

下面以实例说明如何将queue与多线程相结合形成所谓的“ 生产者+消费者 ”模式,同时解决 多线程如何退出 的问题(注意下例中是“一个生产者+多个消费者”的形式,多生产者+多消费者的模式可在此基础上进一步实现):

上述程序的过程如下图:

注意

(1)上述程序中生产者插入queue的时间间隔为0.1s,而消费者的取出时间间隔为2s,显然消费速度不如生产速度,一开始queue是空的,一段时间后queue就变满了,输出结果正说明了这一点。如果将两个时间调换,则结果相反,queue永远不会满,甚至只有1个值,因为只要进去就被消费了。

(2)消费者程序是通过“while”来推动不断执行的,何时结束?上例中通过在queue中增加None的形式告诉消费者,生产者已经结束了,消费者也可以结束了。但消费者有多个,到底由哪个消费者得到None?为解决这个问题,上例中在消费者中先判断当前取出的是不是None,如果是,则先在queue里插入一个None,然后再break当前这个消费者线程,最后的结果是所有的消费者线程都退出了,但queue中还剩下None没有被取出。因此在程序的后面增加了一个for循环来挨个把queue中的元素取出,否则最后的q.join()将永远阻塞,程序无法往下执行。

(3)程序中每一个q.get()后面都跟有一个q.task_done(),其作用是从queue中取出一个元素就给q.join()发送一个信息,否则q.join()将永远处于阻塞状态,直到所有queue元素都被取出。

多线程“生产者-消费者”模式一般性结构图