python多线程并发数量控制

Python018

python多线程并发数量控制,第1张

python多线程如果不进行并发数量控制,在启动线程数量多到一定程度后,会造成线程无法启动的错误。

控制多线程并发数量的方法有好几钟,下面介绍用queue控制多线程并发数量的方法。python3

Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情。

借助这个包,可以轻松完成从单进程到并发执行的转换。

1、新建单一进程

如果我们新建少量进程,可以如下:

import multiprocessing

import time

def func(msg):

for i in xrange(3):

print msg

time.sleep(1)

if __name__ == "__main__":

p = multiprocessing.Process(target=func, args=("hello", ))

p.start()

p.join()

print "Sub-process done."12345678910111213

2、使用进程池

是的,你没有看错,不是线程池。它可以让你跑满多核CPU,而且使用方法非常简单。

注意要用apply_async,如果落下async,就变成阻塞版本了。

processes=4是最多并发进程数量。

import multiprocessing

import time

def func(msg):

for i in xrange(3):

print msg

time.sleep(1)

if __name__ == "__main__":

pool = multiprocessing.Pool(processes=4)

for i in xrange(10):

msg = "hello %d" %(i)

pool.apply_async(func, (msg, ))

pool.close()

pool.join()

print "Sub-process(es) done."12345678910111213141516

3、使用Pool,并需要关注结果

更多的时候,我们不仅需要多进程执行,还需要关注每个进程的执行结果,如下:

import multiprocessing

import time

def func(msg):

for i in xrange(3):

print msg

time.sleep(1)

return "done " + msg

if __name__ == "__main__":

pool = multiprocessing.Pool(processes=4)

result = []

for i in xrange(10):

msg = "hello %d" %(i)

result.append(pool.apply_async(func, (msg, )))

pool.close()

pool.join()

for res in result:

print res.get()

print "Sub-process(es) done."1234567891011121314151617181920

2014.12.25更新

根据网友评论中的反馈,在Windows下运行有可能崩溃(开启了一大堆新窗口、进程),可以通过如下调用来解决:

multiprocessing.freeze_support()1

附录(自己的脚本):

#!/usr/bin/python

import threading

import subprocess

import datetime

import multiprocessing

def dd_test(round, th):

test_file_arg = 'of=/zbkc/test_mds_crash/1m_%s_%s_{}' %(round, th)

command = "seq 100 | xargs -i dd if=/dev/zero %s bs=1M count=1" %test_file_arg

print command

subprocess.call(command,shell=True,stdout=open('/dev/null','w'),stderr=subprocess.STDOUT)

def mds_stat(round):

p = subprocess.Popen("zbkc mds stat", shell = True, stdout = subprocess.PIPE)

out = p.stdout.readlines()

if out[0].find('active') != -1:

command = "echo '0205pm %s round mds status OK, %s' >>/round_record" %(round, datetime.datetime.now())

command_2 = "time (ls /zbkc/test_mds_crash/) 2>>/round_record"

command_3 = "ls /zbkc/test_mds_crash | wc -l >>/round_record"

subprocess.call(command,shell=True)

subprocess.call(command_2,shell=True)

subprocess.call(command_3,shell=True)

return 1

else:

command = "echo '0205 %s round mds status abnormal, %s, %s' >>/round_record" %(round, out[0], datetime.datetime.now())

subprocess.call(command,shell=True)

return 0

#threads = []

for round in range(1, 1600):

pool = multiprocessing.Pool(processes = 10) #使用进程池

for th in range(10):

#th_name = "thread-" + str(th)

#threads.append(th_name) #添加线程到线程列表

#threading.Thread(target = dd_test, args = (round, th), name = th_name).start() #创建多线程任务

pool.apply_async(dd_test, (round, th))

pool.close()

pool.join()

#等待线程完成

#for t in threads:

#t.join()

if mds_stat(round) == 0:

subprocess.call("zbkc -s",shell=True)

break