码迷,mamicode.com
首页 > 其他好文 > 详细

并发编程的那些事儿(四)

时间:2018-08-22 22:48:11      阅读:179      评论:0      收藏:0      [点我收藏+]

标签:lse   可迭代对象   还需   llb   程序   大量   close   sum   one   

1、生产者和消费者模型

  作用:用于解耦。

  原理:生产者将生产的数据放到缓存区,消费者从缓存区直接取。借助队列来实现该模型(队列就是缓存区)

  队列是安全的,自带锁机制。

  q = Queue(num)  num 为队列的最大长度,可以自己设定。

  q.put():  向队列里放数据,如果数据满了就阻塞等待,如果还能放就直接放如。

  q.get():  阻塞等待获取数据,如果队列中有数据直接拿,没有就等待。

  q.put_nowait():  不阻塞,向队列里放数据,如果数据满了就直接报错。

  q.get_nowait():  不阻塞,直接从队列拿数据,如果没有数据就直接报错。

  方法一:

  用 Queue (队列)模块编写

技术分享图片
from multiprocessing import Process, Queue


def producer(q, production):
    for i in range(10):
        info = production + 打败了第%s只 小怪兽 % (i + 1)
        q.put(info)
    q.put(None)         # 当生产者生产完之后,放一个None


def consumer(q, name):
    while 1:
        info = q.get()
        if info:
            print(%s%s % (name, info))
        else:        # 当info 中没有产品了,就会收到一个None,此时结束进程
            break


if __name__ == __main__:
    q = Queue()
    p1 = Process(target=producer, args=(q, 奥德曼))
    p2 = Process(target=consumer, args=(q, 迪迦))
    p1.start()
    p2.start()
生产者、消费者模型(Queue)put(None)放在子进程中

  

  方法二:

  用Queue(队列)模块编写

技术分享图片
from multiprocessing import Process, Queue


def producer(q, production):
    for i in range(10):
        info = production + 打败了第%s只 小怪兽 % (i + 1)
        q.put(info)


def consumer(q, name, color):
    while 1:
        info = q.get()
        if info:
            print(%s%s%s\033[0m % (color, name, info,))
        else:
            break


if __name__ == __main__:
    q = Queue(5)
    p1 = Process(target=producer, args=(q, 奥特曼))
    p2 = Process(target=producer, args=(q, 蝙蝠侠))
    p3 = Process(target=producer, args=(q, 绿巨人))
    p4 = Process(target=consumer, args=(q, 刘奶奶操纵, \033[33m))
    p5 = Process(target=consumer, args=(q, 李二毛骑着, \033[36m))
    p_lst = [p1, p2, p3, p4, p5]
    [i.start() for i in p_lst]
    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
生产者、消费者(put(None))放在父进程

  当有多个生产者和消费者时,有几个消费者取,就放几个put(None)

   

   方法三:

  用 JionableQueue (可加入队列)编写

  JionableQueue是继承Queue,同时多了两个方法

  q.jion():  用于生产者。等待q.task_done的返回结果。生产者就能获得消费者当前消费了多少数据。

  q.task_done():  用于消费者,消费者每拿一个数据都会给jion返回一个标识,标记数据。

 

技术分享图片
from multiprocessing import Process, JoinableQueue


def consumer(j, name, color):
    while 1:
        info = j.get()
        print(%s%s拿走了%s\033[0m % (color, name, info))
        j.task_done()


def producer(j, name):
    for i in range(20):
        info = 第%s个%s % (i + 1, name)
        j.put(info)
    j.join()


if __name__ == __main__:
    j = JoinableQueue(10)
    p1 = Process(target=consumer, args=(j, 猥琐大叔, \033[35m))
    p2 = Process(target=producer, args=(j, 巴拉巴拉小魔仙,))
    p1.daemon = True
    p1.start()
    p2.start()
    p2.join()
生产者、消费者模型(JionableQueue)

 

2、管道

  分为: 单进程管道和多进程管道

  但进程管道收发:

    con1 发送,只能con2接收。

    con2发送, 只能con1接收

  多进程管道收发:

    父进程con1发送,  子进程只能con2接收

    父进程con2发送,  子进程只能con1接收

    父进程con1接收,  子进程只能con2发送

    父进程con2接收,  子进程只能con1发送

技术分享图片
from multiprocessing import Pipe, Process


def fn(*args):
    con1, con2 = args
    con1.close()
    while 1:
        try:
            b = con2.recv()
            print(b)
        except EOFError:
            con2.close()
            break


if __name__ == __main__:
    con1, con2 = Pipe()
    p = Process(target=fn, args=(con1, con2))
    p.start()
    con2.close()
    for i in range(10):
        con1.send(i)
    con1.close()
多进程管道

3、进程间的数据共享

  Manager

  from multiprocessing import Manager

  技术分享图片

4、进程池

  1)定义:一个池子里(其实相当于一个存储场所)有一定数量的进程。这些进城一直处于待命状态,一旦有任务就去执行。

  2)为什么要有进程池?在实际业务中,任务量有多有少,如果任务特别多,不可能有正好开启对应的任务。如果要开启那么多进程就会需要消耗大量的时间让操作系统为你管理。其次还需要消耗大量的时间让cpu调度。

因此,进程池就会帮程序员去管理池中的进程。

from  multiprocessing  import  Pool

  3)规则:进程池中所存放的进程数量最好是cpu核数 + 1

  4)进程池中的三种方法:

    (1)map (func, iterable)

      func:进程池中的进程执行的任务函数

      iterable:可迭代对象,把可迭代对象中的每个元素依次传递给任务函数。

    (2)apply(func,agrs=())

      同步的效率。进程回一个一个的执行任务

      func:进程池中的进程执行的任务函数

      args:可迭代对象型的参数,是传给任务函数的参数

      同步处理时,不需要close 和 join

      同步处理时,进程池中的所有进程都是普通进程

    (3)apply_async(func, args=()), cllback=None):  

      异步效率:进程池中的进程会同时去执行任务。

      func: 进程池中的进程执行的函数

      args:可迭代对象的参数,是传给任务函数的参数

      callback: 回调函数,就是每当进程池中的进程处理任务完成后,会返回结果给回调函数,you回调函数进一步处理,回调函数只有异步才有,同步没有。

      异步处理任务时:进程中的所有进程时守护进程。

      异步处理任务是:必须加close 和 join

    回调函数的使用:

      进程的任务函数的返回值,被当成回调函数的形参被接收到,以此进行进一步操作处理

      回调函数是由主进程调用的,而不是子进程,子进程只负责把结果返回给回调函数。

 

  

并发编程的那些事儿(四)

标签:lse   可迭代对象   还需   llb   程序   大量   close   sum   one   

原文地址:https://www.cnblogs.com/wf123/p/9520784.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!