Python中级精华-并发之启动和停止线程

Python012

Python中级精华-并发之启动和停止线程,第1张

为了让代码能够并发执行,向创建线程并在核实的时候销毁它。

由于目的比较单纯,只是讲解基础的线程创建方法,所以可以直接使用threading库中的Thread类来实例化一个线程对象。

例子,用户输入两个数字,并且求其两个数字的四则运算的结果:

除了以上的一些功能以外,在python线程

中没有其他的诸如给线程发信号、设置线程调度属性、执行任何其他高级操作的功能了,如果需要这些功能,就需要手工编写了。

另外,需要注意的是,由于GIL(全局解释器锁)的存在,限制了在python解释器当中只允许运行一个线程。基于这个原因,不停该使用python线程来处理计算密集型的任务,因为在这种任务重我们希望在多个CPU核心上实现并行处理。Python线程更适合于IO处理以及设计阻塞操作的并发执行任务(即等待IO响应或等待数据库取出结果等)。

如何判断线程是否已经启动?

目的:我们加载了一个线程,但是想要知道这个线程什么时候才会开始运行?

方法:

线程的核心特征我认为就是不确定性,因为其什么时候开始运行,什么时候被打断,什么时候恢复执行,这不是程序员能够控制的,而是有系统调度

来完成的。如果遇到像某个线程的运行依托于其他某个线程运行到某个状态时该线程才能开始运行,那么这就是线程同步

问题,同样这个问题非常棘手。要解决这类问题我们要借助threading中的Event对象。

Event其实和条件标记类似,匀速线程

等待某个时间发生。初始状态时事件被设置成0。如果事件没有被设置而线程正在等待该事件,那么线程就会被阻塞,直到事件被设置位置,当有线程设置了这个事件之后,那么就会唤醒正在等待事件的线程,如果线程等待的事件已经设置了,那么线程会继续执行。

一个例子:

如上能够确定的是,主线程会在线程t运行结束时再运行。

您的意思是要将进程挂起(Suspend) 而非 阻塞(Block)

如果用sleep() 进程将阻塞

假设进程下有两个线程 那么这两个线程会继续运行

要使进程挂起 可以考虑使用psutil

import psutil

p = psutil.Process(pid)

p.suspend() #挂起进程

p.resume()#恢复进程

为了证明效果 我写了一个简单的进程Process

其下有两个线程 读者Reader 和 写者Writer(简单的读者写者问题)

Process:

import threading

from time import ctime, sleep

import ThreadInReadAndWriteProblem

import multiprocessing

import os

class Process(multiprocessing.Process):

def __init__(self):

multiprocessing.Process.__init__(self) #手动实现父类

pid = os.getpid()

def run(self):

print '当前运行进程PID : %s ' %self.pid #子线程的id与父进程的pid相同 属于 同一个进程

for i in range(0,5):

r = ThreadInReadAndWriteProblem.Reader()

w = ThreadInReadAndWriteProblem.Writer()

w.start()

r.start()

print '进程阻塞'

sleep(10) #总共运行时间10秒

Reader&Writer

import threading

from time import ctime, sleep

import os

mutex = threading.Lock()#互斥锁

mutex_readercount = threading.Lock() #计数时的互斥 计算当前正在读的数目

readerCount = 0 number = 0

#不满足条件的 进入阻塞状态

class Reader(threading.Thread): #读者

def __init__(self):

threading.Thread.__init__(self) #继承父类构造函数

def run(self):

global mutex

global readerCount

#print '线程PID: %s ' %os.getpid()

while True:

mutex_readercount.acquire()

readerCount +=1

if readerCount == 1:

print '读者进程等待中,编号%s' %(self.name)

mutex.acquire() == False # 第一个需要申请

mutex_readercount.release()

print '开始读 , 读者编号 %s ,现在时间是 %s' %(self.name,ctime())

sleep(2)

print '完成读 , 读者编号 %s , 现在时间是 %s' %(self.name,ctime())

mutex_readercount.acquire()

readerCount -= 1

if readerCount == 0: #所有读者均完成

print '最后一个读者完成读 '

mutex.release()

mutex_readercount.release()

class Writer(threading.Thread): #写者

def __init__(self):

threading.Thread.__init__(self)

def run(self):

global mutex

global writerCount

#print '线程PID: %s' %os.getpid()

while True:

print '写者进程等待中 编号: %s' %(self.name)

mutex.acquire()

print '开始写 编号:%s 现在时间是: %s ' %(self.name,ctime())

sleep(5)

print '结束写 编号: %s 现在时间是 %s' %(self.name,ctime())

mutex.release()

测试程序

import ThreadInReadAndWriteProblem

import SingleProcessSchedulerMultiprocess

import psutil

import Scheduler

from time import ctime, sleep

def main():

p = SingleProcessSchedulerMultiprocess.Process()

p.start()

sleep(3)

stop(p.pid)

print '进程挂起 %s' %ctime()

sleep(5)

wake(p.pid)

print '唤醒进程 %s' %ctime()

def stop(pid):

print '进程暂停 进程编号 %s ' %(pid)

p = psutil.Process(pid)

p.suspend()

def wake(pid):

print '进程恢复 进程编号 %s ' %(pid)

p = psutil.Process(pid)

p.resume()

if __name__ == '__main__':

main()

结果:

当前运行进程PID : 3096

写者进程等待中 编号: Thread-2

开始写 编号:Thread-2 现在时间是: Mon Nov 30 21:12:12 2015

读者进程等待中,编号Thread-1

写者进程等待中 编号: Thread-4

进程阻塞

写者进程等待中 编号: Thread-6

写者进程等待中 编号: Thread-8

写者进程等待中 编号: Thread-10

进程暂停 进程编号 3096

进程挂起 Mon Nov 30 21:12:15 2015

进程恢复 进程编号 3096

唤醒进程 Mon Nov 30 21:12:20 2015

结束写 编号: Thread-2 现在时间是 Mon Nov 30 21:12:20 2015

写者进程等待中 编号: Thread-2

开始读 , 读者编号 Thread-1 ,现在时间是 Mon Nov 30 21:12:20 2015

开始读 , 读者编号 Thread-3 ,现在时间是 Mon Nov 30 21:12:20 2015

开始读 , 读者编号 Thread-5 ,现在时间是 Mon Nov 30 21:12:20 2015

开始读 , 读者编号 Thread-7 ,现在时间是 Mon Nov 30 21:12:20 2015

开始读 , 读者编号 Thread-9 ,现在时间是 Mon Nov 30 21:12:20 2015

完成读 , 读者编号 Thread-1 , 现在时间是 Mon Nov 30 21:12:22 2015

完成读 , 读者编号 Thread-3 , 现在时间是 Mon Nov 30 21:12:22 2015

完成读 , 读者编号 Thread-5 , 现在时间是 Mon Nov 30 21:12:22 2015

完成读 , 读者编号 Thread-7 , 现在时间是 Mon Nov 30 21:12:22 2015

python 线程 暂停, 恢复, 退出

我们都知道python中可以是threading模块实现多线程, 但是模块并没有提供暂停, 恢复和停止线程的方法, 一旦线程对象调用start方法后, 只能等到对应的方法函数运行完毕. 也就是说一旦start后, 线程就属于失控状态. 不过, 我们可以自己实现这些. 一般的方法就是循环地判断一个标志位, 一旦标志位到达到预定的值, 就退出循环. 这样就能做到退出线程了. 但暂停和恢复线程就有点难了, 我一直也不清除有什么好的方法, 直到我看到threading中Event对象的wait方法的描述时.

wait([timeout])

Block until the internal flag is true. If the internal flag is true on entry, return immediately. Otherwise, block until another thread calls set() to set the flag to true, or until the optional timeout occurs.

阻塞, 直到内部的标志位为True时. 如果在内部的标志位在进入时为True时, 立即返回. 否则, 阻塞直到其他线程调用set()方法将标准位设为True, 或者到达了可选的timeout时间.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof).

This method returns the internal flag on exit, so it will always return True except if a timeout is given and the operation times out.

当给定了timeout参数且不为None, 它应该是一个浮点数,以秒为单位指定操作的超时(或是分数)。

此方法在退出时返回内部标志,因此除非给定了超时且操作超时,否则它将始终返回True。

Changed in version 2.7: Previously, the method always returned None.

2.7版本以前, 这个方法总会返回None.

<br>

利用wait的阻塞机制, 就能够实现暂停和恢复了, 再配合循环判断标识位, 就能实现退出了, 下面是代码示例:

#!/usr/bin/env python

# coding: utf-8

import threading

import time

class Job(threading.Thread):

def __init__(self, *args, **kwargs):

super(Job, self).__init__(*args, **kwargs)

self.__flag = threading.Event() # 用于暂停线程的标识

self.__flag.set()# 设置为True

self.__running = threading.Event() # 用于停止线程的标识

self.__running.set() # 将running设置为True

def run(self):

while self.__running.isSet():

self.__flag.wait() # 为True时立即返回, 为False时阻塞直到内部的标识位为True后返回

print time.time()

time.sleep(1)

def pause(self):

self.__flag.clear() # 设置为False, 让线程阻塞

def resume(self):

self.__flag.set() # 设置为True, 让线程停止阻塞

def stop(self):

self.__flag.set()# 将线程从暂停状态恢复, 如何已经暂停的话

self.__running.clear()# 设置为False

下面是测试代码:

a = Job()

a.start()

time.sleep(3)

a.pause()

time.sleep(3)

a.resume()

time.sleep(3)

a.pause()

time.sleep(2)

a.stop()

<br>

测试的结果:

这完成了暂停, 恢复和停止的功能. 但是这里有一个缺点: 无论是暂停还是停止, 都不是瞬时的, 必须等待run函数内部的运行到达标志位判断时才有效. 也就是说操作会滞后一次.

但是这有时也不一定是坏事. 如果run函数中涉及了文件操作或数据库操作等, 完整地运行一次后再退出, 反而能够执行剩余的资源释放操作的代码(例如各种close). 不会出现程序的文件操作符超出上限, 数据库连接未释放等尴尬的情况.