经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Ruby » 查看文章
进程与线程
来源:cnblogs  作者:panda/勇  时间:2018/9/25 20:34:14  对本文有异议

生产者消费者模型

  主要用于解耦

  1. from multiprocessing import Queue
  2. #队列是安全的,不用加锁.
  3. q = Queue(num)
  4. num : 队列的最大长度
  5. q.get()#阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待
  6. q.put()#阻塞,如果可以继续往队列中放数据就直接放,不能放就阻塞等待
  7. q.get_nowait()#不阻塞,如果有数据就直接获取,没有数据就报错
  8. q.put_nowait()#不阻塞, 如果能继续往队列中放数据,就直接放,不能放就报错
  9. q = Queue(3)
  10. q.put(1)
  11. q.put('abc')
  12. q.put([4,5,6])
  13. print('此时队列已不能再放入了')
  14. q.put('呵呵')#此处阻塞等待空位置放入
  15. #q.putnowait('呵呵')#队列已满,不再等待,直接报错
  16. print('此处不会被打印')

    print(q.get())#先进先出,先取出 1
  1. print(q.get())
  1. print(q.get())
  1. #print(q.get())#队列为空,取不出会阻塞等待新数据取出
  1. print(q.getnowait())#不再等待直接报错

 

  借助队列实现生产者消费者模型 (队列(First In First Out 简称 : FIFO) : 先进先出 )

  1. from multiprocessing import Queue ,Process
  2. def consumer(q,name):
  3. while 1:
  4. pro_info = q.get()#如果消费者不知道生产者停止生产,将会一直等待获取
  5. if pro_info :
  6. print('%s拿走了%s' % (name,pro_info))
  7. else:#当收到None时,结束获取,退出程序
  8. break
  9.  
  10. def producer(q,product):
  11. for i in range(10):
  12. pro_info = product + '的成品%s号' % str(i)
  13. q.put(pro_info)
  14. q.put(None)#生产者停止生产的标识
  15.  
  16. if __name__ == '__main__' :
  17. q = Queue(5)#规定队列最大为5
  18. pro = Process(target=producer, args=(q,'版本一'))
  19. con = Process(target=consumer, args=(q,'小潘'))
  20. pro.start()
  21. con.start()
  22. #把成产表示符放入父进程
  23. from multiprocessing import Queue ,Process
  24. def consumer(q,name):
  25. while 1:
  26. pro_info = q.get()#如果消费者不知道生产者停止生产,将会一直等待获取
  27. if pro_info :
  28. print('%s拿走了%s' % (name,pro_info))
  29. else:#当收到None时,结束获取,退出程序
  30. break
  31.  
  32. def producer(q,product):
  33. for i in range(10):
  34. pro_info = product + '的成品%s号' % str(i)
  35. q.put(pro_info)
  36. if __name__ == '__main__' :
  37. q = Queue(5)#规定队列最大为5
  38. pro = Process(target=producer, args=(q,'版本一'))
  39. con = Process(target=consumer, args=(q,'小潘'))
  40. pro.start()
  41. con.start()
  42. pro.join()
  43. q.put(None)#生产者停止生产的标识



  1. #多个生产者消费者
    from multiprocessing import Queue ,Process

    def consumer(q,name):
    while 1:
    pro_info = q.get()#如果消费者不知道生产者停止生产,将会一直等待获取
    if pro_info :
    print('%s拿走了%s' % (name,pro_info))
    else:#当收到None时,结束获取,退出程序
    break

    def producer(q,product):
    for i in range(20):
    pro_info = product + '的成品%s号' % str(i)
    q.put(pro_info)

    if __name__ == '__main__' :
    q = Queue(5)#规定队列最大为5
    pro1 = Process(target=producer, args=(q,'版本一'))
    pro2 = Process(target=producer, args=(q, '版本二'))
    pro3 = Process(target=producer, args=(q, '版本三'))
    con1 = Process(target=consumer, args=(q,'小潘'))
    con2 = Process(target=consumer, args=(q, '李四'))
    li = [pro1,pro2,pro3,con1,con2]
    [i.start() for i in li]
    pro1.join()
    pro2.join()
    pro3.join()
    q.put(None)#生产者停止生产的标识
    q.put(None)
  1.  

 

  joinablequeue模块

  1. from multiprocessing import JoinableQueue
  2. #继承了multiprocessing.Queue 类,新添加了join(),q.task_done()
  3. q = JoinableQueue()
  4. q.join()#等待q.task_done的返回结果
  5. q.task_done()#用于消费者,表示每消费队列中一个数据,就给join返回一个标识
  6.  
  7.  
  8.  
  9.  
  10. from multiprocessing import JoinableQueue ,Process
  11. def consumer(q,name):
  12. while 1:
  13. pro_info = q.get()#如果消费者不知道生产者停止生产,将会一直等待获取
  14. if pro_info :
  15. print('%s拿走了%s' % (name,pro_info))
  16. q.task_done()#从队列中,每拿走一个数据,就传给join发送一个标识,共十个数据,则十个标识
  17.  
  18. def producer(q,product):
  19. for i in range(10):
  20. pro_info = product + '的成品%s号' % str(i)
  21. q.put(pro_info)
  22. q.join()#记录生产了20个数据在队列中,此时阻塞等待着对列中的所有数据均被拿取
  23.  
  24. if __name__ == '__main__' :
  25. q = JoinableQueue(5)#规定队列最大为5
  26. pro = Process(target=producer, args=(q,'版本一'))
  27. con = Process(target=consumer, args=(q,'小潘'))
  28. con.daemon = True#把消费者进程设为守护进程,由于主进程等待成产者进程,生产者进程等待消费者进程,
  29. # 所以把消费者进程设为守护进程,主进程代码执行完毕,消费者进程结束,则程序结束.
  30. pro.start()
  31. con.start()
  32. pro.join()#等待生产者进程结束

 

管道  

  管道是不安全的 , 一般单进程不要用管道

  用于多进程之间通信的一种方式

  如果在单进程中使用管道,那么就是con1收数据,con2发数据 ; 如果是con1发数据 , con2收数据

  如果是多进程中使用管道,那么必须是父进程使用con1收,子进程就必须使用con2发 ;

    父进程用con1发 , 子进程必须用con2收 ; 

    父进程用con2收 , 子进程必须用con1发 ;

    父进程用con2收 , 子进程必须用con1收

  管道中EOFError错误,是指父进程中如果关闭了发送端,子进程还继续接收数据,就会引

  发EOFError错误.

 

  1. from multiprocessing import Pipe,Process
  2. #单进程下的管道
  3. # con1 , con2 = Pipe()
  4. #
  5. # con1.send('adc')
  6. # print(con2.recv())
  7. # con2.send(123)
  8. # print(con1.recv())
  9.  
  10.  
  11.  
  12. #多进程
  13. def func(con):
  14. con1,con2 = con
  15. con1.close()
  16. print(con2.recv())
  17. con2.send('主进程con2收')
  18. #print(con1.recv())#在同一进程中,con1和con2不能同时开启,否则程序不能关闭
  19.  
  20. if __name__ == '__main__':
  21. con1 , con2 = Pipe()
  22. p = Process(target=func,args=((con1,con2),))
  23. p.start()
  24. con2.close()
  25. con1.send('子进程con2收')#con1发送,必须是con2接收
  26. print(con1.recv())
  27. def func(con):
  28. con1,con2 = con
  29. con1.close()
  30. con2.send('主进程con2收')
  31. while 1 :
  32. try :
  33. print(con2.recv())#如果父进程不关闭con1管道,则子进程一直阻塞在此处等待接收,报错
  34. except EOFError :#try 一下当报该类型错误时自动执行下面程序
  35. con2.close()
  36. break
  37.  
  38. if __name__ == '__main__':
  39. con1 , con2 = Pipe()
  40. p = Process(target=func,args=((con1,con2),))
  41. p.start()
  42. con2.close()
  43. print(con1.recv())
  44. for i in range(10):
  45. con1.send('子进程con2收%s' % i)#con1发送,必须是con2接收
  46. con1.close()#发送完毕后,关闭管道

 

进程之间的共享内存

  

  1. from multiprocessing import Manager , Process
  2. # m = Manager()
  3. # num = m.dict({'键':'值'})#数据可以是字典或者其他形式
  4. # num = m.list([1,2,3])
  5.  
  6. def func(num):
  7. num[0] -= 1
  8. print('子进程中的num的值是', num)
  9. if __name__ == '__main__':
  10. m = Manager()
  11. num = m.list([1,2,3])
  12. p = Process(target=func , args=(num,))
  13. p.start()
  14. p.join()
  15. print('父进程中的num',num)

 

进程池

  在实际业务中,任务量是有多有少的,如果任务量特别多,不可能要开对应那摩多的进程数,开启那摩多进程首先

  需要大量的时间让操作系统来为你管理他,其次还需要消耗大量时间让CPU帮你调度他.

  进程池还会帮程序员管理进程池中的进程

 进程池 : 一个形象化的池子,里面有给定的进程,这些进程一直处于待命状态,一旦有任务,就有进程去处理.

  进程池中的进程都是守护进程,主进程代码执行完毕,守护进程就结束了 

  1. from multiprocessing import Pool
  2. import os
  3. import time
  4. def func(num):
  5. num += 1
  6. print(num)
  7. # if __name__ == '__main__':
  8. # p = Pool(os.cpu_count()+1)#oscpu_count+1 最佳进程数量
  9. # start = time.time()
  10. # p.map(func , [i for i in range(20)])
  11. # p.close()#不允许再向进程池中添加任务
  12. # p.join()#等待进程池中所有进程执行完所有任务

  13. #p.apply()#让进程池中的进程同步的做任务
  14. # if __name__ == '__main__':
  15. # p = Pool(5)
  16. # for i in range(20):#同步处理20个任务,同步是指不管进程池中有多少个进程依然一个进程一个进程的执行,不需要join等待和close.
  17. # p.apply(func , args=(i ,))
  18. # time.sleep(0.5)

  19. #p.apply_async()#让进程池中的进程异步做任务
  20. if __name__ == '__main__':
  21. p = Pool(5)
  22. l = []
  23. for i in range(20):#异步处理20个任务,异步是指进程池中有几个进程,一下就处理几个任务,那个进程任务处理完了,就接收下一个任务.
  24. re = p.apply_async(func , args=(i ,))
  25. l.append(re)
  1.    res= [i.get() for i in l]
  1.    p.close()#不再接受新的任务,准备关闭
       p.join()#等待进程池中所有进程执行任务完毕.

       print(res)
  2. time.sleep(0.5)

 

  回调函数(只有异步有)

   在进程池中的回调函数是父进程调用的,和子进程无关.

  1. from multiprocessing import Pool
  2. import requests
  3. def func(url):
  4. re = requests.get()
  5. print(re.text)
  6. if re.status_code == 200:
  7. return url , re.text
  8. def call_back(sta):#func函数的返回值,会被回调函数的形参接收,
  9. url ,text = sta
  10. #print('回调函数',sta)
  11. print('回调',url)
  12. if __name__ == '__main__':
  13. p = Pool(4)
  14. l = ['https//www.baidu.com',
  15. 'https // www.jd.com'
  16. 'https // www.taobao'
  17. 'https // www.mi.com'
  18. 'https // www.bilibili'
  19. ]
  20. for i in l :
  21. p.apply_async(func,args=(i,),call_back=call_back)
  22. #异步执行func任务,每一个进程执行完任务,在func中return一个结果,结果会自动被callback指定的函数
  23. #当成形参来接收到.
  24. p.close()
  25. p.join()

 

线程

  计算机的最小执行单位是线程;

  进程是资源分配的基本单位.线程是可执行的基本单位,是可被调度的基本单位.

  线程不可以自己独立拥有资源 ,线程的执行必须依赖于所属进程中的资源.

  线程被称为轻量级的进程, 线程的切换速度比进程快

  进程中必须至少有一个线程.

 

  线程分为用户级和内核级线程

    用户级线程 : 对于程序员来说,这样的线程完全被程序员控制执行和调度;

    内核级线程 : 对于计算机内核来说 , 这样的线程完全被内核调度.

    线程组成 : 代码段 ; 数据段 ; TCB(Thread  control  block)

  开启现成的方法

  1. #方法一
    from threading import Thread
  2. import time
  3. def func () :
  4. print('子线程')
  5. time.sleep(1)
  6. #if __name__ == '__main__' :#线程中可以不用写这句代码
  7. t = Thread(target = func , args=())
  8. t.start()


    #方法二
  1. from threading import Thread
  2. import time

    class Mythread(Thread)
      def __init__(self):
        super(
    Mythread,self).__init__()
      def run(self):
        print('我是子线程')

    t = Mythread()
    t.start()

 

  线程和进程的比较

   (1) CPU切换进程要比CPU切换线程慢得多

        在Python中,如果IO操作过多,最好使用线程 ; 

   (2) 在同一个进程中,所有线程共享这个进程的pid,也就是所有线程共享所属进程的资源和内存地址

   (3) 在同一个进程内,所有线程共享该进程中的全局变量(各个线程之间的局部变量不能共享)

   (4) 关于守护进程与守护线程

    守护进程 : 要摸自己正常结束,要摸根据父进程代码的执行结束而结束

    守护线程 : 要摸自己正常结束,要摸根据父进程的执行结束而结束

   (5) 全局解释器锁 , 只有cpython解释器才有,对于线程来说有了GIL,所以没有真正并行,但是有真正的

    多进程并行

    在cPython中,IO密集用多线程,计算密集用多进程

 

  1. from multiprocessing import Process
  2. from threading import Thread
  3. import time
  4. def func():
  5. pass
  6.  
  7. if __name__ == '__main__':
  8. start = time.time()
  9. for i in range(50):
  10. p = Process(target=func)
  11. p.start()
  12. print('开50个进程的时间:',time.time() - start)
  13. start = time.time()
  14. for i in range(50):
  15. p = Thread(target=func)
  16. p.start()
  17. print('开50个线程的时间:', time.time() - start)

 

  GIL锁

    全局解释器锁 , 只有cpython解释器才有,对于线程来说有了GIL,所以没有真正并行,但是有真正的

    多进程并行

    强制线程放弃CPU

    在同一时间内它只允许一个线程执行.

    当你的任务是计算密集的情况下,使用多进程好

    

  1. from multiprocessing import Process
  2. from threading import Thread
  3. import time,os
  4. def func():
  5. global num
  6. number = num
  7. time.sleep(0.1)
  8. #执行此处会等待,GIL会令该线程退出执行,允许下一线程进入,这一线程也要等待,同样退出执行,依次循环.
  9. #当等待时间结束,第一个线程再次进入,会从上一断点开始执行,直接执行下一步,num = number -1 结果为99,
  10. #第二线程同样从上一断点执行,直接执行下一步,num = number -1,结果也为99,以此类推.
  11. num = number -1
  12.  
  13. if __name__ == '__main__':
  14. num = 100
  15. t = []
  16. for i in range(50):
  17. p = Thread(target=func,)
  18. p.start()
  19. t.append(p)
  20. [p.join() for p in t]
  21. print(num)

 

  递归锁 

    RLock 可以有无数把锁,但是只有一把万能钥匙(一把钥匙配若干把锁)

    在同一个线程内,递归锁可以无止境的acquire , 但是互斥锁不行 

    在不同进程内,递归锁是保证只能被一个线程拿到钥匙,然后无止境的acquire,其它线程等待

 

   互斥锁 

    lock() 一把钥匙配一把锁

      一把钥匙配一把锁,主要用于保护数据安全;

      共享资源,又叫玲姐资源.

      共享带码,又叫临界代码.

      对临界资源进行操作时,一定要加锁.

   GIL : 全局锁

    锁的是线程,是cpy解释器上的一把锁,锁的是线程,意思是同一时间只允许一个线程访问CPU

  1. #>>>>>>>死锁
  2. from multiprocessing import Process
  3. from threading import Thread ,Lock
  4. import time,os
  5. def man(m_tot,m_pap):
  6. m_tot.acquire()#男的获得厕所资源,把厕所锁上了
  7. print('男在上厕所')
  8. time.sleep(1)
  9. m_pap.acquire()#男的拿纸资源
  10. print('男的拿到纸资源了')
  11. time.sleep(1)
  12. print('男的上完厕所了')
  13. m_tot.release()#男的还纸资源
  14. m_pap.release()#男的还厕所资源
  15.  
  16. def woman(m_tot,m_pap):
  17. m_pap.acquire() # 女的获得纸资源
  18. print('女的拿到纸资源了')
  19. time.sleep(1)
  20. m_tot.acquire() # 女的拿厕所资源,把厕所锁上了
  21. print('女在上厕所')
  22. time.sleep(1)
  23. print('女的上完厕所了')
  24. m_tot.release() # 女的还厕所资源
  25. m_pap.release() # 女的还纸资源
  26.  
  27. if __name__ == '__main__':
  28. m_tot = Lock()
  29. m_pop = Lock()
  30. m = Thread(target=man,args=(m_tot,m_pop))
  31. w = Thread(target=woman, args=(m_tot, m_pop))
  32. m.start()
  33. w.start()
  34. #结果
  35. #男在上厕所
  36. #女的拿到纸资源了




    #>>>>>>解决死锁
    #>>>>递归锁 : 只有一把钥匙,但是可以开所有锁,层层开锁

  1. from multiprocessing import Process
  2. from threading import Thread ,RLock
  3. import time,os
  4. def man(m_tot,m_pap):
  5. m_tot.acquire()#男的手中有一把钥匙获得厕所资源,把厕所锁上了
  6. print('男在上厕所')
  7. time.sleep(1)
  8. m_pap.acquire()#男的拿纸资源
  9. print('男的拿到纸资源了')
  10. time.sleep(1)
  11. print('男的上完厕所了')
  12. m_tot.release()#男的还纸资源
  13. m_pap.release()#男的还厕所资源
  14. def woman(m_tot,m_pap):
  15. m_pap.acquire() # 女的拿到一把钥匙,获得纸资源
  16. print('女的拿到纸资源了')
  17. time.sleep(1)
  18. m_tot.acquire() # 女的拿厕所资源,把厕所锁上了
  19. print('女在上厕所')
  20. time.sleep(1)
  21. print('女的上完厕所了')
  22. m_tot.release() # 女的还厕所资源
  23. m_pap.release() # 女的还纸资源
  24. if __name__ == '__main__':
  25. m_tot = RLock()
  26. m_pop = RLock()
  27. m = Thread(target=man,args=(m_tot,m_pop))
  28. w = Thread(target=woman, args=(m_tot, m_pop))
  29. m.start()
  30. w.start()
  1.  

 

线程间的通信与进程的用法一样(线程可以不写__main__)

  信号量

   from threading import Semaphore

  事件

  from threading import Event

  条件

  from threading import Condition

  条件是让程序员自行去调度线程的一个机制

            # Condition涉及4个方法

            # acquire()

            # release()

            # wait()    是指让线程阻塞住

            # notify(int)  是指给wait发一个信号,让wait变成不阻塞

            # int是指,你要给多少给wait发信号

 

  定时器

  from threading import Timer

   Timer(time , func )

   time :睡眠时间,(秒为单位)

   func : 睡眠过后,要执行的函数

  1. from threading import Timer
  2. def func():
  3. print('定时器')
  4. Timer(3,func).start()

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号