python 多进程

Python018

python 多进程,第1张

基于官方文档:

https://docs.python.org/zh-cn/3/library/multiprocessing.html

日乐购,刚才看到的一个博客,写的都不太对,还是基于官方的比较稳妥

我就是喜欢抄官方的,哈哈

通常我们使用Process实例化一个进程,并调用 他的 start() 方法启动它。

这种方法和 Thread 是一样的。

上图中,我写了 p.join() 所以主进程是 等待 子进程执行完后,才执行 print("运行结束")

否则就是反过来了(这个不一定,看你的语句了,顺序其实是随机的)例如:

主进加个 sleep

所以不加join() ,其实子进程和主进程是各干各的,谁也不等谁。都执行完后,文件运行就结束了

上面我们用了 os.getpid() 和 os.getppid() 获取 当前进程,和父进程的id

下面就讲一下,这两个函数的用法:

os.getpid()

返回当前进程的id

os.getppid()

返回父进程的id。 父进程退出后,unix 返回初始化进程(1)中的一个

windows返回相同的id (可能被其他进程使用了)

这也就解释了,为啥我上面 的程序运行多次, 第一次打印的parentid 都是 14212 了。

而子进程的父级 process id 是调用他的那个进程的 id : 1940

视频笔记:

多进程:使用大致方法:

参考: 进程通信(pipe和queue)

pool.map (函数可以有return 也可以共享内存或queue) 结果直接是个列表

poll.apply_async() (同map,只不过是一个进程,返回结果用 xx.get() 获得)

报错:

参考 : https://blog.csdn.net/xiemanR/article/details/71700531

把 pool = Pool() 放到 if name == " main ": 下面初始化搞定。

结果:

这个肯定有解释的

测试多进程计算效果:

进程池运行:

结果:

普通计算:

我们同样传入 1 2 10 三个参数测试:

其实对比下来开始快了一半的;

我们把循环里的数字去掉一个 0;

单进程:

多进程:

两次测试 单进程/进程池 分别为 0.669 和 0.772 几乎成正比的。

问题 二:

视图:

post 视图里面

Music 类:

直接报错:

写在 类里面也 在函数里用 self.pool 调用也不行,也是相同的错误。

最后 把 pool = Pool 直接写在 search 函数里面,奇迹出现了:

前台也能显示搜索的音乐结果了

总结一点,进程这个东西,最好 写在 直接运行的函数里面,而不是 一个函数跳来跳去。因为最后可能 是在子进程的子进程运行的,这是不许的,会报错。

还有一点,多进程运行的函数对象,不能是 lambda 函数。也许lambda 虚拟,在内存??

使用 pool.map 子进程 函数报错,导致整个 pool 挂了:

参考: https://blog.csdn.net/hedongho/article/details/79139606

主要你要,对函数内部捕获错误,而不能让异常抛出就可以了。

关于map 传多个函数参数

我一开始,就是正常思维,多个参数,搞个元祖,让参数一一对应不就行了:

报错:

参考:

https://blog.csdn.net/qq_15969343/article/details/84672527

普通的 process 当让可以穿多个参数,map 却不知道咋传的。

apply_async 和map 一样,不知道咋传的。

最简单的方法:

使用 starmap 而不是 map

结果:

子进程结束

1.8399453163146973

成功拿到结果了

关于map 和 starmap 不同的地方看源码:

关于apply_async() ,我没找到多参数的方法,大不了用 一个迭代的 starmap 实现。哈哈

关于 上面源码里面有 itertools.starmap

itertools 用法参考:

https://docs.python.org/zh-cn/3/library/itertools.html#itertool-functions

有个问题,多进程最好不要使用全部的 cpu , 因为这样可能影响其他任务,所以 在进程池 添加 process 参数 指定,cpu 个数:

上面就是预留了 一个cpu 干其他事的

后面直接使用 Queue 遇到这个问题:

解决:

Manager().Queue() 代替 Queue()

因为 queue.get() 是堵塞型的,所以可以提前判断是不是 空的,以免堵塞进程。比如下面这样:

使用 queue.empty() 空为True

这里是自己写下关于 Python 跟进程相关的 threading 模块的一点笔记,跟有些跟 Linux 调用挺像的,有共通之处。

https://docs.python.org/3/library/threading.html?highlight=threading#thread-objects

直接传入

继承 Thread 重写 run 方法

threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

group 线程组,未实现

start() 线程就绪

join([timeout]) 阻塞其他线程,直到调用这方法的进程结束或时间到达

RuntimeError: cannot join thread before it is started

get/setName(name) 获取/设置线程名。

isAlive() 返回线程是否在运行。

is/setDaemon(bool): 获取/设置是后台线程(默认前台线程(False))。(在start之前设置)

The entire Python program exits when no alive non-daemon threads are left.

没有非后台进程运行,Python 就退出。

主线程执行完毕后,后台线程不管是成功与否,主线程均停止

t.start()

t.join()

start() 后 join() 会顺序执行,失去线程意义

https://docs.python.org/3/library/threading.html?#lock-objects

Lock属于全局,Rlock属于线程(R的意思是可重入,线程用Lock的话会死锁,来看例子)

acquire(blocking=True, timeout=-1) 申请锁,返回申请的结果

release() 释放锁,没返回结果

https://docs.python.org/3/library/threading.html#condition-objects

可以在构造时传入rlock lock实例,不然自己生成一个。

acquire([timeout])/release(): 与lock rlock 相同

wait([timeout]): 调用这个方法将使线程进入等待池,并释放锁。调用方法前线程必须已获得锁定,否则将抛出异常。

notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。调用方法前线程必须已获得锁定,否则将抛出异常。

notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

threading.Semaphore(value=1)

https://docs.python.org/3/library/threading.html#semaphore-objects

acquire(blocking=True, timeout=None)

资源数大于0,减一并返回,等于0时等待,blocking为False不阻塞进程

返回值是申请结果

release()

资源数加1

https://docs.python.org/3/library/threading.html#event-objects

事件内置了一个初始为False的标志

is_set() 返回内置标志的状态

set() 设为True

clear() 设为False

wait(timeout=None) 阻塞线程并等待,为真时返回。返回值只会在等待超时时为False,其他情况为True

https://docs.python.org/3/library/threading.html#timer-objects

threading.Timer(interval, function, args=None, kwargs=None)

第一个参数是时间间隔,单位是秒,整数或者浮点数,负数不会报错直接执行不等待

可以用cancel() 取消

https://docs.python.org/3/library/threading.html#barrier-objects

threading.Barrier(parties, action=None, timeout=None)

调用的进程数目达到第一个设置的参数就唤醒全部进程

wait(timeout=None)

reset() 重置,等待中的进程收到 BrokenBarrierError 错误