Python实现简单多线程任务队列

Python010

Python实现简单多线程任务队列,第1张

Python实现简单多线程任务队列

最近我在用梯度下降算法绘制神经网络的数据时,遇到了一些算法性能的问题。梯度下降算法的代码如下(伪代码):

defgradient_descent(): # the gradient descent code plotly.write(X, Y)

一般来说,当网络请求 plot.ly 绘图时会阻塞等待返回,于是也会影响到其他的梯度下降函数的执行速度。

一种解决办法是每调用一次 plotly.write 函数就开启一个新的线程,但是这种方法感觉不是很好。 我不想用一个像 cerely(一种分布式任务队列)一样大而全的任务队列框架,因为框架对于我的这点需求来说太重了,并且我的绘图也并不需要 redis 来持久化数据。

那用什么办法解决呢?我在 python 中写了一个很小的任务队列,它可以在一个单独的线程中调用 plotly.write函数。下面是程序代码。

fromthreadingimportThreadimportQueueimporttime classTaskQueue(Queue.Queue):

首先我们继承 Queue.Queue 类。从 Queue.Queue 类可以继承 get 和 put 方法,以及队列的行为。

def__init__(self, num_workers=1): Queue.Queue.__init__(self) self.num_workers=num_workers self.start_workers()

初始化的时候,我们可以不用考虑工作线程的数量。

defadd_task(self, task,*args,**kwargs): args=argsor() kwargs=kwargsor{} self.put((task, args, kwargs))

我们把 task, args, kwargs 以元组的形式存储在队列中。*args 可以传递数量不等的参数,**kwargs 可以传递命名参数。

defstart_workers(self): foriinrange(self.num_workers):t=Thread(target=self.worker)t.daemon=Truet.start()

我们为每个 worker 创建一个线程,然后在后台删除。

下面是 worker 函数的代码:

defworker(self): whileTrue:tupl=self.get()item, args, kwargs=self.get()item(*args,**kwargs)self.task_done()

worker 函数获取队列顶端的任务,并根据输入参数运行,除此之外,没有其他的功能。下面是队列的代码:

我们可以通过下面的代码测试:

defblokkah(*args,**kwargs): time.sleep(5) print“Blokkah mofo!” q=TaskQueue(num_workers=5) foriteminrange(1): q.add_task(blokkah) q.join()# wait for all the tasks to finish. print“Alldone!”

Blokkah 是我们要做的任务名称。队列已经缓存在内存中,并且没有执行很多任务。下面的步骤是把主队列当做单独的进程来运行,这样主程序退出以及执行数据库持久化时,队列任务不会停止运行。但是这个例子很好地展示了如何从一个很简单的小任务写成像工作队列这样复杂的程序。

defgradient_descent(): # the gradient descent code queue.add_task(plotly.write, x=X, y=Y)

修改之后,我的梯度下降算法工作效率似乎更高了。如果你很感兴趣的话,可以参考下面的代码。fromthreadingimportThreadimportQueueimporttime classTaskQueue(Queue.Queue): def__init__(self, num_workers=1):Queue.Queue.__init__(self)self.num_workers=num_workersself.start_workers() defadd_task(self, task,*args,**kwargs):args=argsor()kwargs=kwargsor{}self.put((task, args, kwargs)) defstart_workers(self):foriinrange(self.num_workers):t=Thread(target=self.worker)t.daemon=Truet.start() defworker(self):whileTrue:tupl=self.get()item, args, kwargs=self.get()item(*args,**kwargs)self.task_done() deftests():defblokkah(*args,**kwargs):time.sleep(5)print"Blokkah mofo!" q=TaskQueue(num_workers=5) foriteminrange(10):q.add_task(blokkah) q.join()# block until all tasks are doneprint"All done!" if__name__=="__main__":tests()

使用Python中的线程模块,能够同时运行程序的不同部分,并简化设计。如果你已经入门Python,并且想用线程来提升程序运行速度的话,希望这篇教程会对你有所帮助。

线程与进程

什么是进程

进程是系统进行资源分配和调度的一个独立单位 进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位。每个进程都有自己的独立内存空间,不同进程通过进程间通信来通信。由于进程比较重量,占据独立的内存,所以上下文进程间的切换开销(栈、寄存器、虚拟内存、文件句柄等)比较大,但相对比较稳定安全。

什么是线程

CPU调度和分派的基本单位 线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。线程间通信主要通过共享内存,上下文切换很快,资源开销较少,但相比进程不够稳定容易丢失数据。

进程与线程的关系图

线程与进程的区别:

进程

现实生活中,有很多的场景中的事情是同时进行的,比如开车的时候 手和脚共同来驾驶 汽车 ,比如唱歌跳舞也是同时进行的,再比如边吃饭边打电话;试想如果我们吃饭的时候有一个领导来电,我们肯定是立刻就接听了。但是如果你吃完饭再接听或者回电话,很可能会被开除。

注意:

多任务的概念

什么叫 多任务 呢?简单地说,就是操作系统可以同时运行多个任务。打个比方,你一边在用浏览器上网,一边在听MP3,一边在用Word赶作业,这就是多任务,至少同时有3个任务正在运行。还有很多任务悄悄地在后台同时运行着,只是桌面上没有显示而已。

现在,多核CPU已经非常普及了,但是,即使过去的单核CPU,也可以执行多任务。由于CPU执行代码都是顺序执行的,那么,单核CPU是怎么执行多任务的呢?

答案就是操作系统轮流让各个任务交替执行,任务1执行0.01秒,切换到任务2,任务2执行0.01秒,再切换到任务3,执行0.01秒,这样反复执行下去。表面上看,每个任务都是交替执行的,但是,由于CPU的执行速度实在是太快了,我们感觉就像所有任务都在同时执行一样。

真正的并行执行多任务只能在多核CPU上实现,但是,由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行。 其实就是CPU执行速度太快啦!以至于我们感受不到在轮流调度。

并行与并发

并行(Parallelism)

并行:指两个或两个以上事件(或线程)在同一时刻发生,是真正意义上的不同事件或线程在同一时刻,在不同CPU资源呢上(多核),同时执行。

特点

并发(Concurrency)

指一个物理CPU(也可以多个物理CPU) 在若干道程序(或线程)之间多路复用,并发性是对有限物理资源强制行使多用户共享以提高效率。

特点

multiprocess.Process模块

process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。

语法:Process([group [, target [, name [, args [, kwargs]]]]])

由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)。

注意:1. 必须使用关键字方式来指定参数;2. args指定的为传给target函数的位置参数,是一个元祖形式,必须有逗号。

参数介绍:

group:参数未使用,默认值为None。

target:表示调用对象,即子进程要执行的任务。

args:表示调用的位置参数元祖。

kwargs:表示调用对象的字典。如kwargs = {'name':Jack, 'age':18}。

name:子进程名称。

代码:

除了上面这些开启进程的方法之外,还有一种以继承Process的方式开启进程的方式:

通过上面的研究,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题。

当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题,我们可以考虑加锁,我们以模拟抢票为例,来看看数据安全的重要性。

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改。加锁牺牲了速度,但是却保证了数据的安全。

因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。

mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。队列和管道都是将数据存放于内存中 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来, 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性( 后续扩展该内容 )。

线程

Python的threading模块

Python 供了几个用于多线程编程的模块,包括 thread, threading 和 Queue 等。thread 和 threading 模块允许程序员创建和管理线程。thread 模块 供了基本的线程和锁的支持,而 threading 供了更高级别,功能更强的线程管理的功能。Queue 模块允许用户创建一个可以用于多个线程之间 共享数据的队列数据结构。

python创建和执行线程

创建线程代码

1. 创建方法一:

2. 创建方法二:

进程和线程都是实现多任务的一种方式,例如:在同一台计算机上能同时运行多个QQ(进程),一个QQ可以打开多个聊天窗口(线程)。资源共享:进程不能共享资源,而线程共享所在进程的地址空间和其他资源,同时,线程有自己的栈和栈指针。所以在一个进程内的所有线程共享全局变量,但多线程对全局变量的更改会导致变量值得混乱。

代码演示:

得到的结果是:

首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行(其中的JPython就没有GIL)。

那么CPython实现中的GIL又是什么呢?GIL全称Global Interpreter Lock为了避免误导,我们还是来看一下官方给出的解释:

主要意思为:

因此,解释器实际上被一个全局解释器锁保护着,它确保任何时候都只有一个Python线程执行。在多线程环境中,Python 虚拟机按以下方式执行:

由于GIL的存在,Python的多线程不能称之为严格的多线程。因为 多线程下每个线程在执行的过程中都需要先获取GIL,保证同一时刻只有一个线程在运行。

由于GIL的存在,即使是多线程,事实上同一时刻只能保证一个线程在运行, 既然这样多线程的运行效率不就和单线程一样了吗,那为什么还要使用多线程呢?

由于以前的电脑基本都是单核CPU,多线程和单线程几乎看不出差别,可是由于计算机的迅速发展,现在的电脑几乎都是多核CPU了,最少也是两个核心数的,这时差别就出来了:通过之前的案例我们已经知道,即使在多核CPU中,多线程同一时刻也只有一个线程在运行,这样不仅不能利用多核CPU的优势,反而由于每个线程在多个CPU上是交替执行的,导致在不同CPU上切换时造成资源的浪费,反而会更慢。即原因是一个进程只存在一把gil锁,当在执行多个线程时,内部会争抢gil锁,这会造成当某一个线程没有抢到锁的时候会让cpu等待,进而不能合理利用多核cpu资源。

但是在使用多线程抓取网页内容时,遇到IO阻塞时,正在执行的线程会暂时释放GIL锁,这时其它线程会利用这个空隙时间,执行自己的代码,因此多线程抓取比单线程抓取性能要好,所以我们还是要使用多线程的。

GIL对多线程Python程序的影响

程序的性能受到计算密集型(CPU)的程序限制和I/O密集型的程序限制影响,那什么是计算密集型和I/O密集型程序呢?

计算密集型:要进行大量的数值计算,例如进行上亿的数字计算、计算圆周率、对视频进行高清解码等等。这种计算密集型任务虽然也可以用多任务完成,但是花费的主要时间在任务切换的时间,此时CPU执行任务的效率比较低。

IO密集型:涉及到网络请求(time.sleep())、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。

当然为了避免GIL对我们程序产生影响,我们也可以使用,线程锁。

Lock&RLock

常用的资源共享锁机制:有Lock、RLock、Semphore、Condition等,简单给大家分享下Lock和RLock。

Lock

特点就是执行速度慢,但是保证了数据的安全性

RLock

使用锁代码操作不当就会产生死锁的情况。

什么是死锁

死锁:当线程A持有独占锁a,并尝试去获取独占锁b的同时,线程B持有独占锁b,并尝试获取独占锁a的情况下,就会发生AB两个线程由于互相持有对方需要的锁,而发生的阻塞现象,我们称为死锁。即死锁是指多个进程因竞争资源而造成的一种僵局,若无外力作用,这些进程都将无法向前推进。

所以,在系统设计、进程调度等方面注意如何不让这四个必要条件成立,如何确定资源的合理分配算法,避免进程永久占据系统资源。

死锁代码

python线程间通信

如果各个线程之间各干各的,确实不需要通信,这样的代码也十分的简单。但这一般是不可能的,至少线程要和主线程进行通信,不然计算结果等内容无法取回。而实际情况中要复杂的多,多个线程间需要交换数据,才能得到正确的执行结果。

python中Queue是消息队列,提供线程间通信机制,python3中重名为为queue,queue模块块下提供了几个阻塞队列,这些队列主要用于实现线程通信。

在 queue 模块下主要提供了三个类,分别代表三种队列,它们的主要区别就在于进队列、出队列的不同。

简单代码演示

此时代码会阻塞,因为queue中内容已满,此时可以在第四个queue.put('苹果')后面添加timeout,则成为 queue.put('苹果',timeout=1)如果等待1秒钟仍然是满的就会抛出异常,可以捕获异常。

同理如果队列是空的,无法获取到内容默认也会阻塞,如果不阻塞可以使用queue.get_nowait()。

在掌握了 Queue 阻塞队列的特性之后,在下面程序中就可以利用 Queue 来实现线程通信了。

下面演示一个生产者和一个消费者,当然都可以多个

使用queue模块,可在线程间进行通信,并保证了线程安全。

协程

协程,又称微线程,纤程。英文名Coroutine。

协程是python个中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元(理解为需要的资源)。为啥说它是一个执行单元,因为它自带CPU上下文。这样只要在合适的时机, 我们可以把一个协程 切换到另一个协程。只要这个过程中保存或恢复 CPU上下文那么程序还是可以运行的。

通俗的理解:在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行,注意不是通过调用函数的方式做到的,并且切换的次数以及什么时候再切换到原来的函数都由开发者自己确定。

在实现多任务时,线程切换从系统层面远不止保存和恢复 CPU上下文这么简单。操作系统为了程序运行的高效性每个线程都有自己缓存Cache等等数据,操作系统还会帮你做这些数据的恢复操作。所以线程的切换非常耗性能。但是协程的切换只是单纯的操作CPU的上下文,所以一秒钟切换个上百万次系统都抗的住。

greenlet与gevent

为了更好使用协程来完成多任务,除了使用原生的yield完成模拟协程的工作,其实python还有的greenlet模块和gevent模块,使实现协程变的更加简单高效。

greenlet虽说实现了协程,但需要我们手工切换,太麻烦了,gevent是比greenlet更强大的并且能够自动切换任务的模块。

其原理是当一个greenlet遇到IO(指的是input output 输入输出,比如网络、文件操作等)操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。

模拟耗时操作:

如果有耗时操作也可以换成,gevent中自己实现的模块,这时候就需要打补丁了。

使用协程完成一个简单的二手房信息的爬虫代码吧!

以下文章来源于Python专栏 ,作者宋宋

文章链接:https://mp.weixin.qq.com/s/2r3_ipU3HjdA5VnqSHjUnQ

到这里,我们要聊一下线程通信的内容;

首先,我们抛开语言不谈,先看看比较基础的东西,线程间通信的方式;其实也就是哪几种(我这里说的,是我的所谓的知道的。。。)事件,消息队列,信号量,条件变量(锁算不算?我只是认为是同步的一种);所以我们也就是要把这些掌握了,因为各有各的好处嘛;

条件变量我放到了上面的线程同步里面讲了,我总感觉这算是同步的一种,没有很多具体信息的沟通;同时吧,我认为条件变量比较重要,因为这种可以应用于线程池的操作上;所以比较重要;这里,抛开条件变量不谈,我们看看其他的东西;

1、消息队列:

queue 模块下提供了几个阻塞队列,这些队列主要用于实现线程通信。在 queue 模块下主要提供了三个类,分别代表三种队列,它们的主要区别就在于进队列、出队列的不同。

关于这三个队列类的简单介绍如下:

queue.Queue(maxsize=0):代表 FIFO(先进先出)的常规队列,maxsize 可以限制队列的大小。如果队列的大小达到队列的上限,就会加锁,再次加入元素时就会被阻塞,直到队列中的元素被消费。如果将 maxsize 设置为 0 或负数,则该队列的大小就是无限制的。

queue.LifoQueue(maxsize=0):代表 LIFO(后进先出)的队列,与 Queue 的区别就是出队列的顺序不同。

PriorityQueue(maxsize=0):代表优先级队列,优先级最小的元素先出队列。

这三个队列类的属性和方法基本相同, 它们都提供了如下属性和方法:

Queue.qsize():返回队列的实际大小,也就是该队列中包含几个元素。

Queue.empty():判断队列是否为空。

Queue.full():判断队列是否已满。

Queue.put(item, block=True, timeout=None):向队列中放入元素。如果队列己满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到该队列的元素被消费;如果队列己满,且 block 参数为 False(不阻塞),则直接引发 queue.FULL 异常。

Queue.put_nowait(item):向队列中放入元素,不阻塞。相当于在上一个方法中将 block 参数设置为 False。

Queue.get(item, block=True, timeout=None):从队列中取出元素(消费元素)。如果队列已满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到有元素被放入队列中; 如果队列己空,且 block 参数为 False(不阻塞),则直接引发 queue.EMPTY 异常。

Queue.get_nowait(item):从队列中取出元素,不阻塞。相当于在上一个方法中将 block 参数设置为 False。

其实我们想想,这个队列,是python进行封装的,那么我们可以用在线程间的通信;同时也是可以用做一个数据结构;先进先出就是队列,后进先出就是栈;我们用这个栈写个十进制转二进制的例子:

没毛病,可以正常的打印;其中需要注意的就是,maxsize在初始化的时候如果是0或者是个负数的话,那么就会是不限制大小;

那么其实我们想想,我们如果用做线程通信的话,我们两个线程,可以把队列设置为1的大小,如果是1对多,比如是创建者和消费者的关系,我们完全可以作为消息队列,比如说创建者一直在创建一些东西,然后放入到消息队列里面,然后供消费着使用;就是一个很好的例子;所以,其实说是消息队列,也就是队列,没差;

=====================================================================

下面来看一下事件

Event 是一种非常简单的线程通信机制,一个线程发出一个 Event,另一个线程可通过该 Event 被触发。

Event 本身管理一个内部旗标,程序可以通过 Event 的 set() 方法将该旗标设置为 True,也可以调用 clear() 方法将该旗标设置为 False。程序可以调用 wait() 方法来阻塞当前线程,直到 Event 的内部旗标被设置为 True。

Event 提供了如下方法:

is_set():该方法返回 Event 的内部旗标是否为True。

set():该方法将会把 Event 的内部旗标设置为 True,并唤醒所有处于等待状态的线程。

clear():该方法将 Event 的内部旗标设置为 False,通常接下来会调用 wait() 方法来阻塞当前线程。

wait(timeout=None):该方法会阻塞当前线程。

这里我想解释一下;其实对于事件来说,事件可以看成和条件变量是一样的,只是我们说说不一样的地方;

1、对于事件来说,一旦触发了事件,也就是说,一旦set为true了,那么就会一直为true,需要clear调内部的标志,才能继续wait;但是conditon不是,他是一次性的唤醒其他线程;

2、conditon自己带锁;事件呢?不是的;没有自己的锁;比如说有一个存钱的线程,有一个是取钱的线程;那么存钱的线程要存钱;需要怎么办呢?1、发现银行没有钱了(is_set判断);2、锁住银行;3、存钱;4、释放银行;5、唤醒事件;对于取钱的人;1、判断是否有钱;2、被唤醒了,然后锁住银行;3、开始取钱;4、清理告诉存钱的人,我没钱了(clear);5、释放锁;6、等着钱存进去;

其实说白了,就是记住一点;这个旗标需要自己clear就对了

写个例子,怕以后忘了怎么用;

其实时间和信号量比较像;但是信号量不用自己清除标志位;但是事件是需要的;