Python实现简单多线程任务队列

Python011

Python实现简单多线程任务队列,第1张

Python实现简单多线程任务队列

最近我在用梯度下降算法绘制神经网络的数据时,遇到了一些算法性能的问题。梯度下降算法的代码如下(伪代码):

defgradient_descent(): # the gradient descent code plotly.write(X, Y)

一般来说,当网络请求 plot.ly 绘图时会阻塞等待返回,于是也会影响到其他的梯度下降函数的执行速度。

一种解决办法是每调用一次 plotly.write 函数就开启一个新的线程,但是这种方法感觉不是很好。 我不想用一个像 cerely(一种分布式任务队列)一样大而全的任务队列框架,因为框架对于我的这点需求来说太重了,并且我的绘图也并不需要 redis 来持久化数据。

那用什么办法解决呢?我在 python 中写了一个很小的任务队列,它可以在一个单独的线程中调用 plotly.write函数。下面是程序代码。

fromthreadingimportThreadimportQueueimporttime classTaskQueue(Queue.Queue):

首先我们继承 Queue.Queue 类。从 Queue.Queue 类可以继承 get 和 put 方法,以及队列的行为。

def__init__(self, num_workers=1): Queue.Queue.__init__(self) self.num_workers=num_workers self.start_workers()

初始化的时候,我们可以不用考虑工作线程的数量。

defadd_task(self, task,*args,**kwargs): args=argsor() kwargs=kwargsor{} self.put((task, args, kwargs))

我们把 task, args, kwargs 以元组的形式存储在队列中。*args 可以传递数量不等的参数,**kwargs 可以传递命名参数。

defstart_workers(self): foriinrange(self.num_workers):t=Thread(target=self.worker)t.daemon=Truet.start()

我们为每个 worker 创建一个线程,然后在后台删除。

下面是 worker 函数的代码:

defworker(self): whileTrue:tupl=self.get()item, args, kwargs=self.get()item(*args,**kwargs)self.task_done()

worker 函数获取队列顶端的任务,并根据输入参数运行,除此之外,没有其他的功能。下面是队列的代码:

我们可以通过下面的代码测试:

defblokkah(*args,**kwargs): time.sleep(5) print“Blokkah mofo!” q=TaskQueue(num_workers=5) foriteminrange(1): q.add_task(blokkah) q.join()# wait for all the tasks to finish. print“Alldone!”

Blokkah 是我们要做的任务名称。队列已经缓存在内存中,并且没有执行很多任务。下面的步骤是把主队列当做单独的进程来运行,这样主程序退出以及执行数据库持久化时,队列任务不会停止运行。但是这个例子很好地展示了如何从一个很简单的小任务写成像工作队列这样复杂的程序。

defgradient_descent(): # the gradient descent code queue.add_task(plotly.write, x=X, y=Y)

修改之后,我的梯度下降算法工作效率似乎更高了。如果你很感兴趣的话,可以参考下面的代码。fromthreadingimportThreadimportQueueimporttime classTaskQueue(Queue.Queue): def__init__(self, num_workers=1):Queue.Queue.__init__(self)self.num_workers=num_workersself.start_workers() defadd_task(self, task,*args,**kwargs):args=argsor()kwargs=kwargsor{}self.put((task, args, kwargs)) defstart_workers(self):foriinrange(self.num_workers):t=Thread(target=self.worker)t.daemon=Truet.start() defworker(self):whileTrue:tupl=self.get()item, args, kwargs=self.get()item(*args,**kwargs)self.task_done() deftests():defblokkah(*args,**kwargs):time.sleep(5)print"Blokkah mofo!" q=TaskQueue(num_workers=5) foriteminrange(10):q.add_task(blokkah) q.join()# block until all tasks are doneprint"All done!" if__name__=="__main__":tests()

在实际处理数据时,因系统内存有限,我们不可能一次把所有数据都导出进行操作,所以需要批量导出依次操作。为了加快运行,我们会采用多线程的方法进行数据处理, 以下为我总结的多线程批量处理数据的模板:

主要分为三大部分:

共分4部分对多线程的内容进行总结。

先为大家介绍线程的相关概念:

在飞车程序中,如果没有多线程,我们就不能一边听歌一边玩飞车,听歌与玩 游戏 不能并行;在使用多线程后,我们就可以在玩 游戏 的同时听背景音乐。在这个例子中启动飞车程序就是一个进程,玩 游戏 和听音乐是两个线程。

Python 提供了 threading 模块来实现多线程:

因为新建线程系统需要分配资源、终止线程系统需要回收资源,所以如果可以重用线程,则可以减去新建/终止的开销以提升性能。同时,使用线程池的语法比自己新建线程执行线程更加简洁。

Python 为我们提供了 ThreadPoolExecutor 来实现线程池,此线程池默认子线程守护。它的适应场景为突发性大量请求或需要大量线程完成任务,但实际任务处理时间较短。

其中 max_workers 为线程池中的线程个数,常用的遍历方法有 map 和 submit+as_completed 。根据业务场景的不同,若我们需要输出结果按遍历顺序返回,我们就用 map 方法,若想谁先完成就返回谁,我们就用 submit+as_complete 方法。

我们把一个时间段内只允许一个线程使用的资源称为临界资源,对临界资源的访问,必须互斥的进行。互斥,也称间接制约关系。线程互斥指当一个线程访问某临界资源时,另一个想要访问该临界资源的线程必须等待。当前访问临界资源的线程访问结束,释放该资源之后,另一个线程才能去访问临界资源。锁的功能就是实现线程互斥。

我把线程互斥比作厕所包间上大号的过程,因为包间里只有一个坑,所以只允许一个人进行大号。当第一个人要上厕所时,会将门上上锁,这时如果第二个人也想大号,那就必须等第一个人上完,将锁解开后才能进行,在这期间第二个人就只能在门外等着。这个过程与代码中使用锁的原理如出一辙,这里的坑就是临界资源。 Python 的 threading 模块引入了锁。 threading 模块提供了 Lock 类,它有如下方法加锁和释放锁:

我们会发现这个程序只会打印“第一道锁”,而且程序既没有终止,也没有继续运行。这是因为 Lock 锁在同一线程内第一次加锁之后还没有释放时,就进行了第二次 acquire 请求,导致无法执行 release ,所以锁永远无法释放,这就是死锁。如果我们使用 RLock 就能正常运行,不会发生死锁的状态。

在主线程中定义 Lock 锁,然后上锁,再创建一个子 线程t 运行 main 函数释放锁,结果正常输出,说明主线程上的锁,可由子线程解锁。

如果把上面的锁改为 RLock 则报错。在实际中设计程序时,我们会将每个功能分别封装成一个函数,每个函数中都可能会有临界区域,所以就需要用到 RLock 。

一句话总结就是 Lock 不能套娃, RLock 可以套娃; Lock 可以由其他线程中的锁进行操作, RLock 只能由本线程进行操作。

在socketserver服务端代码中有这么一句:

server = socketserver.ThreadingTCPServer((ip,port), MyServer)

ThreadingTCPServer这个类是一个支持多线程和TCP协议的socketserver,它的继承关系是这样的:

class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

右边的TCPServer实际上是主要的功能父类,而左边的ThreadingMixIn则是实现了多线程的类, ThreadingTCPServer自己本身则没有任何代码。

MixIn在Python的类命名中很常见,称作“混入”,戏称“乱入”,通常为了某种重要功能被子类继承。

我们看看一下ThreadingMixIn的源代码:

class ThreadingMixIn:

daemon_threads = False

def process_request_thread(self, request, client_address):

try:

self.finish_request(request, client_address)

self.shutdown_request(request)

except:

self.handle_error(request, client_address)

self.shutdown_request(request)

def process_request(self, request, client_address):

t = threading.Thread(target = self.process_request_thread,

args = (request, client_address))

t.daemon = self.daemon_threads

t.start()

在ThreadingMixIn类中,其实就定义了一个属性,两个方法。其中的process_request()方法实际调用的正是Python内置的多线程模块threading。这个模块是Python中所有多线程的基础,socketserver本质上也是利用了这个模块。

socketserver通过threading模块,实现了多线程任务处理能力,可以同时为多个客户提供服务。

那么,什么是线程,什么是进程?

进程是程序(软件,应用)的一个执行实例,每个运行中的程序,可以同时创建多个进程,但至少要有一个。每个进程都提供执行程序所需的所有资源,都有一个虚拟的地址空间、可执行的代码、操作系统的接口、安全的上下文(记录启动该进程的用户和权限等等)、唯一的进程ID、环境变量、优先级类、最小和最大的工作空间(内存空间)。进程可以包含线程,并且每个进程必须有至少一个线程。每个进程启动时都会最先产生一个线程,即主线程,然后主线程会再创建其他的子线程。

线程,有时被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元。一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。另外,线程是进程中的一个实体,是被系统独立调度和分派的基本单位,线程自己不独立拥有系统资源,但它可与同属一个进程的其它线程共享该进程所拥有的全部资源。每一个应用程序都至少有一个进程和一个线程。在单个程序中同时运行多个线程完成不同的被划分成一块一块的工作,称为多线程。

举个例子,某公司要生产一种产品,于是在生产基地建设了很多厂房,每个厂房内又有多条流水生产线。所有厂房配合将整个产品生产出来,单个厂房内的流水线负责生产所属厂房的产品部件,每个厂房都拥有自己的材料库,厂房内的生产线共享这些材料。公司要实现生产必须拥有至少一个厂房一条生产线。换成计算机的概念,那么这家公司就是应用程序,厂房就是应用程序的进程,生产线就是某个进程的一个线程。

线程的特点:

线程是一个execution context(执行上下文),即一个cpu执行时所需要的一串指令。假设你正在读一本书,没有读完,你想休息一下,但是你想在回来时继续先前的进度。有一个方法就是记下页数、行数与字数这三个数值,这些数值就是execution context。如果你的室友在你休息的时候,使用相同的方法读这本书。你和她只需要这三个数字记下来就可以在交替的时间共同阅读这本书了。

线程的工作方式与此类似。CPU会给你一个在同一时间能够做多个运算的幻觉,实际上它在每个运算上只花了极少的时间,本质上CPU同一时刻只能干一件事,所谓的多线程和并发处理只是假象。CPU能这样做是因为它有每个任务的execution context,就像你能够和你朋友共享同一本书一样。

进程与线程区别:

同一个进程中的线程共享同一内存空间,但进程之间的内存空间是独立的。

同一个进程中的所有线程的数据是共享的,但进程之间的数据是独立的。

对主线程的修改可能会影响其他线程的行为,但是父进程的修改(除了删除以外)不会影响其他子进程。

线程是一个上下文的执行指令,而进程则是与运算相关的一簇资源。

同一个进程的线程之间可以直接通信,但是进程之间的交流需要借助中间代理来实现。

创建新的线程很容易,但是创建新的进程需要对父进程做一次复制。

一个线程可以操作同一进程的其他线程,但是进程只能操作其子进程。

线程启动速度快,进程启动速度慢(但是两者运行速度没有可比性)。

由于现代cpu已经进入多核时代,并且主频也相对以往大幅提升,多线程和多进程编程已经成为主流。Python全面支持多线程和多进程编程,同时还支持协程。