经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Python » 查看文章
python之多进程
来源:cnblogs  作者:xiaomogugu  时间:2018/9/26 18:52:31  对本文有异议

一、multiprocessing模块

python中的多线程无法利用多核优势,如果想要充分地使用多核cpu的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。python提供了multiprocessing

multiprocessing 模块用来开启子进程。并在子进程中执行我们定制的任务(例如函数)。与多线程threading类似

 

multiprocessing 模块支持很多功能 :子进程、通信、共享数据,执行不同的同步,提供Process、Queue、Pipe、Lock 等组件

ps:与线程不同,进程没有任何共享状态,修改的数据(改动等等)仅限于该进程内。

 

二、Process 类的介绍

 创建进程的类:

Procss([group [, target [,name [, args [, kwargs ] ] ] ] ] ) ,由该类实例化得到对象,表示一个子进程中的人物(尚未启动)

ps:1. 需要指定关键字的方式来制定参数

  2. args 指定为传给 target 函数位置,是一个元组形式,必须有逗号

参数介绍:

  1. 1. group 参数 未使用 ,值始终为 None
  2. 2. target 表示 调用对象 即子进程要执行的任务(不加括号)
  3. 3. args 表示调用对象的位置参数元组,args = x , y ,)
  4. 4. kwargs 表示调用对象的字典,kwargs={'x':1 , 'y':99}
  5. 5. name 为子进程的名称

方法介绍:

  1. 1. p.start() :启动进程,并调用该子进程的 p.run()
  2. 2. p.run() :进程启动时运行的方法 ,它去调用target指定的函数,自定义的类一定要实现该方法。
  3. 3. p.terminate() :强制终止进程p ,不会进行任何清理操作,如果p创建了子进程,该子进程就变成僵尸进程了,使用该方法需要小心这个情况,如果p还保存了一个锁 那么也将不会被释放,进而导致死锁
  4. 4. p.is_alive() 如果p仍然运行,返回True
  5. 5. p.join([timeout]) :主进程等待p终止 ps:是主线程处于等 的状态,而p是处于运行的状态),timeout是可选的超时时间,需要强调的是,p.join 只能join start 开启的进程,而不能join run开启的进程

属性介绍:

  1. 1. p.daemon : 默认为False 如果设为True ,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设置完后,p不能创建自己的新进程,必须在p.start() 之前设置。
  2. 2. p.name :进程的名称
  3. 3. p.pid :进程的pid
  4. 4. p.exitcode :进程运行时为None、如果为-N ,表示被信号N结束(了解)
  5. 5. p.authkey :进程的身份验证键,默认是由 os.urandom()随机生成的32位字符串,这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同身份验证键时才算成功(了解)

 

三、Process类的使用

在windows中Process()必须放到# if __name__ == '__main__':下

创建并开启子进程的两种方式

  1. # 开启子进程的方式1:
  2.  
  3. from multiprocessing import Process
  4. import time
  5. def task(name):
  6. print('%s is running' %name)
  7. time.sleep(3)
  8. print('%s is done' %name)
  9. # 在windows系统上开启子进程的操作必须放到该行代码下
  10. if __name__ == '__main__':
  11. p=Process(target=task,args=('子进程',)) # Process(target=task,kwargs={'name':'子进程'}) #
  12. p.start() # 仅仅只是向操作系统发送一个创造子进程的信号
  13. print('')
方式一
  1. # 开启子进程的方式2:
  2. from multiprocessing import Process
  3. import time
  4. class Myprocess(Process):
  5. def __init__(self,name):
  6. super().__init__()
  7. self.name=name
  8. def run(self):
  9. print('%s is running' %self.name)
  10. time.sleep(3)
  11. print('%s is done' %self.name)
  12. # 在windows系统上开启子进程的操作必须放到该行代码下
  13. if __name__ == '__main__':
  14. p=Myprocess('子进程')
  15. p.start() # 仅仅只是向操作系统发送一个创造子进程的信号,start会调用run方法
  16. p.join()
  17. print('')
方式二

 

进程之间的内存空间是隔离的

  1. from multiprocessing import Process
  2. n=100 #在windows系统中应该把全局变量定义在if __name__ == '__main__'之上就可以了
  3. def work():
  4. global n
  5. n=0
  6. print('子进程内: ',n)
  7. if __name__ == '__main__':
  8. p=Process(target=work)
  9. p.start()
  10. print('主进程内: ',n)
View Code

 

将之前的基于TCP的套接字通信改为支持并发

  1. from socket import *
  2. from multiprocessing import Process
  3. FTPS=socket(AF_INET,SOCK_STREAM)
  4. FTPS.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
  5. FTPS.bind(('127.0.0.1',9999))
  6. FTPS.listen(5)
  7. def talk(conn):
  8. while True:
  9. try:
  10. msg=conn.recv(1024)
  11. if not msg :break
  12. conn.sendall(msg.upper())
  13. except Exception:
  14. break
  15.  
  16. if __name__ == '__main__':
  17. while True:
  18. conn,client_addr=FTPS.accept()
  19. p=Process(target=talk,args=(conn,))
  20. p.start()
服务端
  1. from socket import *
  2. FTPC=socket(AF_INET,SOCK_STREAM)
  3. FTPC.connect(('127.0.0.1',9999))
  4. while True:
  5. cmd=input('>>:').strip()
  6. if not cmd:continue
  7. FTPC.sendall(cmd.encode('utf-8'))
  8. msg=FTPC.recv(1024)
  9. print(msg.decode('utf-8'))
客户端

这样做虽然可以实现并发的效果  但是有一个问题

  1. 每来一个客户端,都在服务端开启一个进程,如果并发来十万个客户端,要开启十万个进程吗
  2. 解决方法:进程池

 Process对象的join方法

  1. from multiprocessing import Process
  2. import time
  3. import random
  4. class Task(Process):
  5. def __init__(self,name):
  6. super().__init__()
  7. self.name=name
  8. def run(self):
  9. print('%s is running' %self.name)
  10. time.sleep(random.randint(1,3))
  11. print('%s is ending' %self.name)
  12. if __name__ == '__main__':
  13. p=Task('子进程')
  14. p.start()
  15. p.join(0.5) #等待p停止,等0.5秒就不再等了
  16. print('开始')
View Code
  1. from multiprocessing import Process
  2. import time
  3. import random
  4. class Task(Process):
  5. def __init__(self,name):
  6. super().__init__()
  7. self.name=name
  8. def run(self):
  9. print('%s is running' %self.name)
  10. time.sleep(random.randint(1,3))
  11. print('%s is ending' %self.name)
  12. if __name__ == '__main__':
  13. p_msg = []
  14. for p in range(4):
  15. p=Task('子进程%s'%p)
  16. p_msg.append(p)
  17. p.start()
  18. for p in p_msg:
  19. p.join()
  20. #当然不是了,必须明确:p.join()是让谁等?
  21. #很明显p.join()是让主线程等待p的结束,卡住的是主线程而绝非进程p,
  22.  
  23. #详细解析如下:
  24. #进程只要start就会在开始运行了,所以p1-p4.start()时,系统中已经有四个并发的进程了
  25. #而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键
  26. #join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其余p2,p3,p4仍然在运行,等#p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接通过检测,无需等待
  27. # 所以4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间
  28.  
  29.  
  30. print('主线程')
  31. #上述启动进程与join进程可以简写为
  32. # p_l=[p1,p2,p3,p4]
  33. #
  34. # for p in p_l:
  35. # p.start()
  36. #
  37. # for p in p_l:
  38. # p.join()
p.join详细说明

process对象的其他方法或属性

  1. from multiprocessing import Process
  2. import time
  3. import random
  4. class Myprocess(Process):
  5. def __init__(self,name):
  6. # self.name=name
  7. # super().__init__() #Process的__init__方法会执行self.name=Myprocess-1,
  8. # #所以加到这里,会覆盖我们的self.name=name
  9.  
  10. #为我们开启的进程设置名字的做法
  11. super().__init__()
  12. self.name=name
  13. def run(self):
  14. print('%s is running' %self.name)
  15. time.sleep(random.randrange(1,3))
  16. print('%s is ending' %self.name)
  17. if __name__ == '__main__':
  18. p=Myprocess('子进程')
  19. p.start()
  20. print('开始')
  21. print(p.pid) #查看pid
name和pid
  1. #进程对象的其他方法一:terminate,is_alive
  2. from multiprocessing import Process
  3. import time
  4. import random
  5. class Myprocess(Process):
  6. def __init__(self,name):
  7. # self.name=name
  8. # super().__init__() #Process的__init__方法会执行self.name=Myprocess-1,
  9. # #所以加到这里,会覆盖我们的self.name=name
  10.  
  11. #为我们开启的进程设置名字的做法
  12. super().__init__()
  13. self.name=name
  14. def run(self):
  15. print('%s is running' %self.name)
  16. time.sleep(random.randrange(1,3))
  17. print('%s is ending' %self.name)
  18. if __name__ == '__main__':
  19. p=Myprocess('子进程')
  20. p.start()
  21. p.terminate()#关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
  22. print(p.is_alive()) #结果为True
  23.  
  24. print('开始')
  25. print(p.is_alive()) #结果为False
terminate和is_alive

 

僵尸进程与孤儿进程

 参考博客:http://www.cnblogs.com/Anker/p/3271773.html

  1. #总结僵尸进程和孤儿进程
  2.  
  3. #僵尸进程:有害
  4. 例如有个进程,它定期的产 生一个子进程,这个子进程需要做的事情很少,做完它该做的事情之后就退出了,因此这个子进程的生命周期很短,但是,父进程只管生成新的子进程,至于子进程 退出之后的事情,则一概不闻不问,这样,系统运行上一段时间之后,系统中就会存在很多的僵死进程,倘若用ps命令查看的话,就会看到很多状态为Z的进程。 严格地来说,僵死进程并不是问题的根源,罪魁祸首是产生出大量僵死进程的那个父进程。因此,当我们寻求如何消灭系统中大量的僵死进程时,答案就是把产生大 量僵死进程的那个元凶枪毙掉(也就是通过kill发送SIGTERM或者SIGKILL信号啦)。枪毙了元凶进程之后,它产生的僵死进程就变成了孤儿进 程,这些孤儿进程会被init进程接管,init进程会wait()这些孤儿进程,释放它们占用的系统进程表中的资源,这样,这些已经僵死的孤儿进程 就能瞑目而去了。
  5. #孤儿进程无害:会被系统接管
总结孤儿和僵尸进程

 

四、守护进程

主进程创建守护进程

  其一:守护进程会在主进程代码执行结束后就终止

  其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

  1. from multiprocessing import Process
  2. import time
  3. def foo():
  4. print(123)
  5. time.sleep(1)
  6. print("end123")
  7. if __name__ == '__main__':
  8. p1=Process(target=foo)
  9. p1.daemon = True
  10. p1.start()
  11. print('主进程')
View Code
  1. #主进程代码运行完毕,守护进程就会结束
  2. from multiprocessing import Process
  3. from threading import Thread
  4. import time
  5. def foo():
  6. print(123)
  7. time.sleep(1)
  8. print("end123")
  9. def bar():
  10. print(456)
  11. time.sleep(3)
  12. print("end456")
  13. if __name__ == '__main__':
  14. p1=Process(target=foo)
  15. p2=Process(target=bar)
  16. p1.daemon=True
  17. p1.start()
  18. p2.start()
  19. print("main-------") #打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止
特殊的例子

 

五、互斥锁

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,

而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理

  1. from multiprocessing import Process,Lock
  2. import os,time
  3. #互斥锁 可理解为生活中卫生间的锁
  4.  
  5. def work(mutex):
  6. # mutex.acquire() 抢锁操作
  7. print('%s is running' %os.getpid())
  8. time.sleep(2)
  9. print('%s is done' %os.getpid())
  10. # mutex.release() #释放锁操作
  11.  
  12. if __name__ == '__main__':
  13. # mutex=Lock() #多进程需加在main后
  14. for i in range(3):
  15. p=Process(target=work)#args=(mutex,)#传mutex
  16. p.start()
互斥锁mutex

加锁:由并发变成‘串行’,牺牲了运行效率,但保障了数据安全

例子:模拟12306抢票

  1. from multiprocessing import Process,Lock
  2. import json
  3. import time,random
  4. def search(i):
  5. with open('db.json','rt',encoding='utf-8') as f: #db.json 文件 {"count": 1},表示余一张票
  6. dic=json.load(f)
  7. time.sleep(1)
  8. print('路人%s查看到剩余票数:%s' %(i,dic['count']))
  9. def get(i):
  10. with open('db.json','rt',encoding='utf-8') as f:
  11. dic=json.load(f)
  12. if dic['count'] > 0:
  13. # 有票
  14. dic['count']-=1
  15. time.sleep(random.randint(1,3)) #关键在于此处 睡的同时所有子进程都读取到1张票
  16. with open('db.json','wt',encoding='utf-8') as f:
  17. json.dump(dic,f)
  18. print('路人%s抢票成功' % i)
  19. else:
  20. print('路人%s抢票失败' %i)
  21. def task(i,mutex):
  22. search(i)
  23. # mutex.acquire()
  24. get(i)
  25. # mutex.release()
  26.  
  27. if __name__ == '__main__':
  28. mutex = Lock()
  29. for i in range(1,11):
  30. p=Process(target=task,args=(i,mutex))
  31. p.start()
  32. # p.join()
  33.  
  34. print('主进程')
模拟12306抢票:互斥锁的应用

 

总结

  1. #加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
  2. 虽然可以用文件共享数据实现进程间通信,但问题是:
  3. 1.效率低(共享数据基于文件,而文件是硬盘上的数据)
  4. 2.需要自己加锁处理
  5. #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
  6. 1 队列和管道都是将数据存放于内存中
  7. 2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
  8. 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

 

六、IPC机制

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

1、队列

创建队列的类(底层就是以管道和锁定的方式实现的)

  Queue(maxsize):创建共享的进程队列,Queue是多进程安全的队列

  1. 可以使用Queue实现多进程之间的数据传递。参数:maxsize 是队列中允许最大数,不写则无限制
  1. from multiprocessing import Queue

 

主要方法:

  1. '''
  2. multiprocessing模块支持进程间通信的两种主要形式:管道和队列
  3. 都是基于消息传递实现的,但是队列接口
  4. '''
  5.  
  6. from multiprocessing import Process,Queue
  7. import time
  8. q=Queue(3)
  9. #put ,get ,put_nowait,get_nowait,full,empty
  10. q.put(3)
  11. q.put(3)
  12. q.put(3)
  13. print(q.full()) #满了
  14.  
  15. print(q.get())
  16. print(q.get())
  17. print(q.get())
  18. print(q.empty()) #空了
Queue 队列
  1. 1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blockedtimeout。如果blockedTrue(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blockedFalse,但该Queue已满,会立即抛出Queue.Full异常。
  2. 2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blockedtimeout。如果blockedTrue(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blockedFalse,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
  3. 3
  4. 4 q.get_nowait():同q.get(False)
  5. 5 q.put_nowait():同q.put(False)
  6. 6
  7. 7 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
  8. 8 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
  9. 9 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
详细

2、管道(了解)

创建管道的类

Pipe(duplex)在进程之间创建一条管道,并返回元组(coon1,coon2)表示管道的两端

  ps:必须在产生process对象之前产生管道

  1. 参数:dumplex:默认管道是全双工的,如果将duplex设成Falseconn1只能用于接收,conn2只能用于发送。

主要方法:

  1. conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError
  2. conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
  3. #其他方法:
  4. conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
  5. conn1.fileno():返回连接使用的整数文件描述符
  6. conn1.poll([timeout]):如果连接上的数据可用,返回Truetimeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
  7. conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
  8. conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收
  9. conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
Pipe 管道
  1. from multiprocessing import Process,Pipe
  2. import time,os
  3. def consumer(p,name):
  4. left,right=p
  5. left.close()
  6. while True:
  7. try:
  8. baozi=right.recv()
  9. print('%s 收到包子:%s' %(name,baozi))
  10. except EOFError:
  11. right.close()
  12. break
  13. def producer(seq,p):
  14. left,right=p
  15. right.close()
  16. for i in seq:
  17. left.send(i)
  18. # time.sleep(1)
  19. else:
  20. left.close()
  21. if __name__ == '__main__':
  22. left,right=Pipe()
  23. c1=Process(target=consumer,args=((left,right),'c1'))
  24. c1.start()
  25. seq=(i for i in range(10))
  26. producer(seq,(left,right))
  27. right.close()
  28. left.close()
  29. c1.join()
  30. print('主进程')
Pipe 示例

七、生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

 特点:

  • 实现了 生产者 与 消费者 解耦合
  • 平衡了生产者的生产能力和消费者的消费能力

 为什么使用此模式:

  1. 当程序中存在明显的两类任务,一类负责造数据,另外一类负责处理数据,就可以用生产者消费者模型来实现
    解耦和从而提升效率

如何实现

  1. 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
  2. 基于队列实现生产者消费者模型
实现方法
  1. 生产者进程-------->queue<----------消费者进程
  1. from multiprocessing import Process,Queue
  2. import time,random,os
  3. def consumer(q):
  4. while True:
  5. res=q.get()
  6. time.sleep(random.randint(1,3))
  7. print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
  8. def producer(q):
  9. for i in range(10):
  10. time.sleep(random.randint(1,3))
  11. res='包子%s' %i
  12. q.put(res)
  13. print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
  14. if __name__ == '__main__':
  15. q=Queue()
  16. #生产者们:即厨师们
  17. p1=Process(target=producer,args=(q,))
  18. #消费者们:即吃货们
  19. c1=Process(target=consumer,args=(q,))
  20. #开始
  21. p1.start()
  22. c1.start()
  23. print('')
示例

上述示例存在一个问题:主进程永远不会结束,因为子进程的生产者在生产完后就结束了,但是消费者在取空q之后,则之后一直处于死循环且阻塞在q.get()这步。

  解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,此时即可在收到结束信号时break出死循环

 

另外一种队列提供了这种机制

  1. #JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
  2.  
  3. #参数介绍:
  4. maxsize是队列中允许最大项数,省略则无大小限制。
  5.   #方法介绍:
  6. JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
  7. q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
  8. q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
  1. from multiprocessing import Process,JoinableQueue
  2. import time,random
  3. def producer(food,name,q):
  4. for i in range(3):
  5. time.sleep(random.randint(1,3))
  6. res='%s%s' %(food,i)
  7. q.put(res)
  8. print('厨师%s生产了%s' %(name,res))
  9. def consumer(name,q):
  10. while True:
  11. res=q.get()
  12. if res is None:break
  13. time.sleep(random.randint(1,3))
  14. print('吃货%s吃了%s' %(name,res))
  15. q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了
  16.  
  17. if __name__ == '__main__':
  18. q=JoinableQueue()
  19. #生产者们
  20. p1=Process(target=producer,args=('包子','tom',q))
  21. p2=Process(target=producer,args=('蛋汤','jerry',q))
  22. p3=Process(target=producer,args=('红烧肉','bob',q))
  23. #消费者们
  24. c1 = Process(target=consumer,args=('吃货1',q))
  25. c2 = Process(target=consumer,args=('吃货2',q))
  26. c1.daemon=True
  27. c2.daemon=True
  28. p1.start()
  29. p2.start()
  30. p3.start()
  31. c1.start()
  32. c2.start()
  33. p1.join()
  34. p2.join()
  35. p3.join()
  36. q.join() #主进程最后一行代码运行完毕,生产者全部正常死亡,消费者也没有存在的意义了
  37.  
  38. print('')
另一种Queue
  1. #主进程等--->p1,p2,p3等---->c1,c2
  2. #p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
  3. #因而c1,c2也没有存在的价值了,应该随着主进程的结束而结束,所以设置成守护进程

 

八、信号量(了解)

  1. 互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去,如果指定信号量为3,那么来一个人获得一把锁,计数加1,当计数等于3时,后面的人均需要等待。一旦释放,就有人可以获得一把锁
  2. 信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
  3. from multiprocessing import Process,Semaphore
  4. import time
  5. def test(sem,user):
  6. sem.acquire()
  7. print('%s 占一个位置' %user)
  8. time.sleep(2)
  9. sem.release()
  10. if __name__ == '__main__':
  11. sem=Semaphore(3)
  12. p_l=[]
  13. for i in range(12):
  14. p=Process(target=test,args=(sem,'user%s' %i,))
  15. p.start()
  16. p_l.append(p)
  17. for i in p_l:
  18. i.join()
  19. print('')
信号量

九、事件(了解)

  1. python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 setwaitclear
  2. 事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
  3. clear:将“Flag”设置为False
  4. set:将“Flag”设置为True
  5. from multiprocessing import Process,Event
  6. import time,random
  7. def car(e,n):
  8. while True:
  9. if not e.is_set(): #Flase
  10. print('\033[31m红灯亮\033[0m,car%s等着' %n)
  11. e.wait()
  12. print('\033[32m车%s 看见绿灯亮了\033[0m' %n)
  13. time.sleep(random.randint(3,6))
  14. if not e.is_set():
  15. continue
  16. print('走你,car', n)
  17. break
  18.  
  19. def police_car(e,n):
  20. while True:
  21. if not e.is_set():
  22. print('\033[31m红灯亮\033[0m,car%s等着' % n)
  23. e.wait(1)
  24. print('灯的是%s,警车走了,car %s' %(e.is_set(),n))
  25. break
  26.  
  27. def traffic_lights(e,inverval):
  28. while True:
  29. time.sleep(inverval)
  30. if e.is_set():
  31. e.clear() #e.is_set() ---->False
  32. else:
  33. e.set()
  34. if __name__ == '__main__':
  35. e=Event()
  36. # for i in range(10):
  37. # p=Process(target=car,args=(e,i,))
  38. # p.start()
  39.  
  40. for i in range(5):
  41. p = Process(target=police_car, args=(e, i,))
  42. p.start()
  43. t=Process(target=traffic_lights,args=(e,10))
  44. t.start()
  45. print('============》')
View Code

十、进程池(重要)

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:

  1. 很明显需要并发执行的任务通常要远大于核数
  2. 一个操作系统不可能无限开启进程,通常有几个核就开几个进程
  3. 进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)

 

例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

  1. 用池是为了将并发的进程或线程数目控制在计算机可承受的范围内

我们就可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数... 

ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

    创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程

  1. from concurrent.futures import ProcessPoolExecutor
  1. from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
  2. import os
  3. import time
  4. import random
  5. def task(x):
  6. print('%s is running' %os.getpid())
  7. time.sleep(random.randint(1,3))
  8. return x**2
  9.  
  10. if __name__ == '__main__':
  11. p=ProcessPoolExecutor() #不指定参数默认池的大写等于cpu的核数
  12. for i in range(10):
  13. p.submit(task,i) #提交任务
  14. print('')
进程池

重点:

submit  提交任务

  1. 同步调用 vs 异步调用
    提交任务的两种方式

    同步调用:提交完任务后,就在原地等待任务执行完毕,拿到运行结果/返回值后再执行下一行代码
    同步调用下任务的执行是串行执行

    异步调用:提交完任务后,不会原地等待任务执行完毕,不等结果 直接执行下一行代码
    同步调用下任务的执行是并发执行
  1. if __name__ == '__main__':
  2. # 异步调用
  3. p=ProcessPoolExecutor() #不指定参数默认池的大写等于cpu的核数
  4. futures=[]
  5. for i in range(10):
  6. future=p.submit(task,i) #提交任务
  7. futures.append(future)
  8. p.shutdown(wait=True) #关闭了进程池的入口,然后执行join操作
  9. for future in futures:
  10. print(future.result()) #拿到结果
  11. print('')
  12. #main上方代码 与上面示例一致
异步调用
  1. if __name__ == '__main__':
  2. p=ProcessPoolExecutor() #不指定参数默认池的大写等于cpu的核数
  3. for i in range(10):
  4. res=p.submit(task,i).result()
  5. print(res)
  6. print('')
同步调用

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号