content: 1. 为什么要多进程编程?和多线程有什么区别? 2. python 多进程编程 3. 进程间通信
=======================================   一. 为什么要多进程编程?和多线程有什么区别?
由于GIL的存在,所以对于某一些多线程任务来说,无法利用多核的优势,对这些耗cpu的任务,用多进程反而能利用多cpu。 所以多cpu的操作用多进程编程。
对io操作较多的任务来说,瓶颈不在于cpu,更多的在于io的切换中的消耗和时间等待。用多线程反而能在io挂起的时候,进行线程切换。
虽然io操作多的时候,也可以用多进程编程,但是因为进程的切换系统的代价是十分大的,所以能使用多线程的情况下,尽量用多线程。  
所以,对于耗费cpu的操作,比如计算、挖矿等,多进程优于多线程。 例:同计算一组斐波拉契数列的时间比较(耗cpu的操作) #多线程 from
concurrent.futuresimport ThreadPoolExecutor,as_completed from concurrent.futures
import ProcessPoolExecutor import time def fib(n): if n <= 2: return 1 return
fib(n-1)+fib(n-2) with ThreadPoolExecutor(3) as excutor: all_task
=[excutor.submit(fib,(num))for num in range(25,35)] start_time=time.time() for
futurein as_completed(all_task): data=future.result() print("result:{}"
.format(data)) end_time=time.time() print("last time : {}".format(end_time-
start_time))#output: result:75025 result:121393 result:196418 result:317811
result:514229 result:832040 result:1346269 result:2178309 result:3524578 result:
5702887 last time : 98.66604399681091
 
#多进程 from concurrent.futures import ThreadPoolExecutor,as_completed from
concurrent.futuresimport ProcessPoolExecutor import time def fib(n): if n <= 2:
return 1 return fib(n-1)+fib(n-2) if __name__ == "__main__": with
ProcessPoolExecutor(3) as excutor: all_task = [excutor.submit(fib, (num)) for
numin range(25, 35)] start_time = time.time() for future in
as_completed(all_task): data= future.result() print("result:{}".format(data))
end_time= time.time() print("last time : {}".format(end_time - start_time)) #
output: result:75025 result:121393 result:196418 result:317811 result:514229
result:832040 result:1346269 result:2178309 result:3524578 result:5702887 last
time :14.470988988876343
 
进程和线程的区别:
* 进程是资源分配的最小单位,线程是程序执行的最小单位。
*
进程有自己的独立地址空间,每启动一个进程,系统就会为它分配地址空间,建立数据表来维护代码段、堆栈段和数据段,这种操作非常昂贵。而线程是共享进程中的数据的,使用相同的地址空间,因此CPU切换一个线程的花费远比进程要小很多,同时创建一个线程的开销也比进程要小很多。
*
线程之间的通信更方便,同一进程下的线程共享全局变量、静态变量等数据,而进程之间的通信需要以通信的方式(IPC)进行。不过如何处理好同步与互斥是编写多线程程序的难点。
* 但是多进程程序更健壮,多线程程序只要有一个线程死掉,整个进程也死掉了,而一个进程死掉并不会对另外一个进程造成影响,因为进程有自己独立的地址空间。  
二、python 多进程编程 1.from concurrent.futures import  ProcessPoolExecutor
ProcessPoolExecutor 和上一章 讲到的多线程的用法是一样的。包括其中用到的Futures类。 基本看它的入口函数就明白,这里不再赘述。  
2.更加底层的multiprocessing 其实在ProcessPoolExecutor底层用的其实也是multiprocessing。
在multiprocess里,有个Progress类。跟Thread用法又是相似的。 #input from concurrent.futures import
ProcessPoolExecutorimport multiprocessing #多进程编程 import time def get_html(n):
time.sleep(n)print("sub_progress sccess") if __name__=="__main__": progress =
multiprocessing.Process(target=get_html,args=(3,)) print(progress.pid)
progress.start()print(progress.pid) progress.join() #output None 12864
sub_progress sccess
 
3.继承Progress类(与之前的Thread类一样) import multiprocessing #多进程编程 import time class
progress_get_html(multiprocessing.Process):def __init__(self,n): self.n=n
super().__init__() def run(self): time.sleep(self.n) print("sub progress success
") class MyProgress(multiprocessing.Process): def __init__(self,n): self.n=n
super().__init__() def run(self): pro=progress_get_html(self.n) pro.start()
print("progress end") if __name__=="__main__": progress = MyProgress(3) print
(progress.pid) progress.start()print(progress.pid) progress.join() #output: None
8744 progress end sub progress success
 
4.使用进程池 指明进程数,不指明的话,可以直接默认为cpu数(cpu_count() or 1)。 from concurrent.futures
import ProcessPoolExecutor from multiprocessing import pool import
multiprocessing#多进程编程 import time def get_html(n): time.sleep(n) print("
sub_progress sccess") return n if __name__=="__main__": pool=
multiprocessing.Pool(multiprocessing.cpu_count()) result
=pool.apply_async(get_html,args=(3,)) print(result.get()) #pool在调用join之前
需要调用close 来让它不再接收任务。否则会报错 pool.close() pool.join() print(result.get()) #output
sub_progress sccess3 3
 
其他方法: - imap:按照参数输入顺序 if __name__=="__main__": pool=
multiprocessing.Pool(multiprocessing.cpu_count())for result in
pool.imap(get_html,[1,5,3]): print("sleep {} successed ".format(result)) #
output: sub_progress sccess sleep 1 successed sub_progress sccess sub_progress
sccess sleep5 successed sleep 3 successed imap_unordered: 按照执行完成顺序 if __name__==
"__main__": pool=multiprocessing.Pool(multiprocessing.cpu_count()) #for result
in pool.imap(get_html,[1,5,3]): # print("sleep {} successed ".format(result))
for result in pool.imap_unordered(get_html,[1,5,3]): print("sleep {} successed "
.format(result))#output: sub_progress sccess sleep 1 successed sub_progress
sccess sleep3 successed sub_progress sccess sleep 5 successed
 
三. 进程间通信 与线程间不同的是,线程间同步的类和锁是不可用的。 1.Queue(注意是multiprocessing而不是thread的) from
multiprocessingimport Process,Queue import time def producer(queue): queue.put("
a") time.sleep(2) def consumer(queue): time.sleep(2) data=queue.get() print
(data)if __name__== "__main__": queue=Queue(10) my_producer =
Process(target=producer,args=(queue,)) my_consumer =
Process(target=consumer,args=(queue,)) my_producer.start() my_consumer.start()
my_producer.join() my_consumer.join()#outpu: a
注意:multprocess中的Queue是不能用于pool进程池的   2.Manager(与进程池共用)
Manager中有个Queue,如果像实现pool中的进程间通信,需要使用Manager中的Queue。 from multiprocessing import
Process,pool,Manager,Poolimport time def producer(queue): queue.put("a")
time.sleep(2) def consumer(queue): time.sleep(2) data=queue.get() print(data) if
__name__== "__main__": queue=Manager().Queue() pool=Pool(3)
pool.apply_async(producer,args=(queue,)) pool.apply_async(consumer,args=
(queue,)) pool.close() pool.join()#output: a   3.管道pipe pipe只能适用于两个指定的进程。
pipe的性能高于queue的,queue加了很多的锁操作。 from multiprocessing import
Process,pool,Manager,Pool,Pipeimport time def producer(pipe): pipe.send("hello")
def consumer(pipe): print(pipe.recv()) if __name__== "__main__":
recv_pipe,send_pipe=Pipe() my_producer=Process(target=producer,args=
(send_pipe,)) my_consumer=Process(target=consumer,args=(recv_pipe,))
my_producer.start() my_consumer.start() my_producer.join() my_consumer.join()#
output: hello
 
4.进程间共享内存操作 Mnager的dict、list、value等。 from multiprocessing import
Process,pool,Manager,Pool,Pipeimport time def add_data(p_dict,key,value):
p_dict[key]=value if __name__ == "__main__": progress_dict= Manager().dict()
first_progress= Process(target=add_data,args=(progress_dict,"name","tangrong"))
second_progress= Process(target=add_data,args=(progress_dict,"age","18"))
first_progress.start() second_progress.start() first_progress.join()
second_progress.join()print(progress_dict) #output: {'name': 'tangrong', 'age':
'18'} 在使用的时候,可以用Manager中的数据结构,但是注意数据同步(LOCK,RLOCK等)

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信