码迷,mamicode.com
首页 > 编程语言 > 详细

python学习笔记-Day11 (线程、进程、queue队列、生产消费模型、携程)

时间:2016-07-23 07:25:32      阅读:594      评论:0      收藏:0      [点我收藏+]

标签:

线程使用

###方式一

import  threading

def f1(arg):
    print(arg)

t = threading.Thread(target=f1, args=(123,))
t.start()    # start会调用run方法执行

# t是threading.Thread类的一个对象
# t.start()就会以线程的方式执行函数,可以使用pycharm ctrl选择start方法
# 找到Thread类的start方法,在start方法的注释中就已经写明,会去调用run()方法执行
###方式二

# 既然我们通过方法一了解到使用线程运行时t.start()最后会调用
# threading.Thread类的run方法,
# 所以我们可以继承Thread类,自定义线程

class MyThread(threading.Thread):
    def __init__(self,func,args):
        self.func = func
        self.args = args
        super(MyThread, self).__init__()

    def run(self):
        self.func(self.args)

def f2(args):
    print(args)

obj = MyThread(f2, 123)
obj.start()

Queue队列

# put方法、get方法

import queue

q = queue.Queue(2)  # 允许队列的最大长度

# put方法
q.put(111)   # 向队列传入一个111元素
q.put(222)
q.put(333, block=False)   # 默认队列满时,再次传入会阻塞,block设置不阻塞,如果队列满直接报错
q.put(444, timeout=2)      # 设置阻塞时间最多等待2秒,等待结束没有进入队列报错

# get方法
# 先进先出
print(q.get())
print(q.get())
print(q.get(block=False))  # 同Put,获取不到值时不阻塞
print(q.get(timeout=2))    # 同put,获取队列值时,最多等待2秒
import queue

# empty方法
q2 = queue.Queue()
print(q2.empty())    # 判断队列是否为空,为空则为真
q2.put(11)
q2.put(22)
print(q2.empty())    # 队列中有元素,返回False
print(q2.qsize())    # 队列元素长度


# task_done方法(一般会配合join方法)
q2.get()
q2.task_done()  # task_done告诉队列,我已经取完值了
q2.get()
q2.task_done()

# join方法
q2.join()   # 当队列有未取值时,阻塞; 取值完成时,不阻塞
# python 四种队列
# queue.Queue              先进先出队列
# queue.LifoQueue        后进先出队列
# queue.PriorityQueue   优先级队列
# queue.deque               双向队列   

# 后劲先出队列
q3 = queue.LifoQueue()
q3.put(123)
q3.put(456)
print(q3.get())   # 取出值为456

# 优先级队列
q4 = queue.PriorityQueue()
q4.put((1, alex1))  # 传入优先级和值到队列中
q4.put((1, alex2))
q4.put((3, alex3))
print(q4.get())   # 取出数字最小的(如果有优先级相同情况,为先进先出)

# 双端队列
q5 = queue.deque()
q5.append(123)   # 向队列右侧添加值
q5.append(456)
q5.appendleft(789) # 向队列左侧添加值
q5.pop()   # 从队列右侧取值
q5.popleft() # 从队列左侧取值

生产者消费模型

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯法[1]等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

                                                                                                      --维基百科

# 生产者消费者
# 上引用是一个比较完善的概念,这里我们实现它一部分功能
# 即: 生产者有三个窗口不断地卖票
#        消费者300人不断地消耗票
# 实现如下

import queue,threading,time


q = queue.Queue()

def f1(arg):  # 会源源不断的去向队列提交
    """
    买票
    :param arg:
    :return:
    """
    q.put(     NO.%s - 票 % str(arg))

for i in range(300):  # 创建线程,将300人添加到队列(相当于有300个人去排队买票),因为我们封装q对象时没有指定数量,所以到此循环结束,队列会有300值
    t = threading.Thread(target=f1, args=(i,))
    t.start()

# 这里for循环使用线程方式排队纯粹是因为线程并行,执行的快
# 同样的,使用普通方法也是可以,上for循环可换做以下
# for i in range(300):
#     f1(i)

def f2(arg): # 会源源不断的去队列拿请求,处理
    """
    服务器后台
    :param arg:
    :return:
    """
    while True:
        print("窗口:%s" % arg,q.get())
        time.sleep(2)

for j in range(3): # 后台创建线程,使用3人处理队列中内容 (相当于3个窗口处理买票的请求)
    t = threading.Thread(target=f2, args=(j,))
    t.start()
# 因为我们的f2函数中是while True使用,并且有sleep,
# 所以,这里的for循环会先将三个线程创建完成,
# 然后三个线程,并行的去while 循环get数据

线程锁之互斥锁

# 当我们线程需要访问共同资源时

import threading
import time
# 线程锁
NUM = 10

def func(arg):
    global NUM
    NUM -= 1
    time.sleep(1)
    print(NUM)
for i in range(10):
    t = threading.Thread(target=func, args=(None,))
    t.start()


# 其结果输出为:
0
0
0
0
0
0
0
0
0
0
# 因为我for循环瞬间开了10个线程去每线程处理你的函数
# 所以,NUM瞬间就会减到0
# 那以上情况,我既要使用线程又要NUM按我的想法去依次减后输出
# 这里就要使用线程锁

import threading
import time
# 线程锁
NUM = 10

def func(l):
    global NUM
    # 上锁
    l.acquire()
    NUM -= 1
    time.sleep(1)
    print(NUM)
    # 开锁
    l.release()

# 锁对象
# RLock  支持多重锁(嵌套),一般经常使用RLock,这里仅拿Lock实验
lock = threading.Lock()

for i in range(10):
    t = threading.Thread(target=func, args=(lock,))
    t.start()

# 其输出为
9
8
7
6
5
4
3
2
1
0

线程锁之信号量

# 信号量
# 互斥锁一次可以放一个,信号量就可以定义,我一次可以放行多少个
NUM = 10

def func(i,l):
    global NUM
    # 上锁
    l.acquire()  # 注意在这里信号量不是一个个线程放进去,下边定义了一次5个,所以这里会一下放进去5个锁住
    NUM -= 1
    time.sleep(1)
    print(NUM,i)
    # 开锁
    l.release()

# 锁对象
lock = threading.BoundedSemaphore(5)  # 这里定义,一次可以放出几个(从队列中)

for i in range(30):
    t = threading.Thread(target=func, args=(i,lock,))
    t.start()

线程锁之事务锁

# 事务锁, 也成红绿灯锁
# 信号量可以设置每次锁几个,执行,解锁
# 而事务锁是,你有多少过来,遇见红灯,全部停下;  当我变成绿灯,全部通过。
def func(i,e):
    print(i)
    e.wait()  # 在这里阻塞,监测是什么灯,如果是红灯,停; 绿灯,行
    print(i+100)

event = threading.Event()

for i in range(10):
    t = threading.Thread(target=func, args=(i,event))
    t.start()

event.clear() # 默认灯为红灯,左为设置成红灯
inp = input(">>>")
if inp == 1:
    event.set() # 设置为绿灯

 线程锁之条件锁

# 方法一
import threading
def func(i, con):
    print(i)
    con.acquire()  # 加锁,配合下边的wait使用
    con.wait()    # 在这里全部阻塞
    print(i+100)
    con.release()

# 创建条件锁对象
c = threading.Condition()
for i in range(10):
    t = threading.Thread(target=func, args=(i,c))
    t.start()

while True:
    inp = input(">>>")
    if inp == q:
        break
    # 到此,我们的for循环创建完进程了,但是所有进程都被func函数中的con.wait()阻塞了
    # 现在,掌控权也就是我允许放行几个,全部在我们定义的条件锁对象:c
    c.acquire()   # 传达我要加锁
    c.notify(int(inp))   # 传达一个数字,我要放行多少
    c.release()   # 传达执行完毕,解锁
# 方法二
import threading

def condition():
    ret = False
    r = input(">>>")
    if r == true:
        ret = True
    else:
        ret = False
    return ret

def func(i, con):
    print(i)
    con.acquire()
    con.wait_for(condition)  # wait_for传入一个函数体, 根据函数体返回的真则加锁进入,假则阻塞
    print(i+100)
    con.release()

c = threading.Condition()
for i in range(10):
    t = threading.Thread(target=func, args=(i,c))
    t.start()

线程小模块:Timer定时器

# 定时器
from threading import Timer

def hello():
    print(hello world)

t = Timer(1,hello)  # 等待1s之后,执行函数
t.start()

 线程池

在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。

# 低级版本线程池
# 完全仅供理解

import queue
import threading
import time

class ThreadPool:  # 自定义一个线程池类
    def __init__(self, maxsize=5):  # 定义一个参数maxsize,相当于我每次只放行数量
        self.maxsize = maxsize
        # 创建一个队列
        self._q = queue.Queue(maxsize)
        for i in range(maxsize):
            self._q.put(threading.Thread)

    def get_thread(self):  # 从队列取值
        return self._q.get()

    def add_thread(self): # 往队列添加线程模块
        self._q.put(threading.Thread)

pool = ThreadPool(5)

def task(arg,p):
    # 打印执行次数,和每秒
    print(arg)
    time.sleep(1)
    p.add_thread()  # 向队列添加一个线程

for i in range(100):
    # 每取一个线程,我就向队列添加一个线程
    t = pool.get_thread()
    obj = t(target=task, args=(i,pool))
    obj.start()
# 线程池
import queue
import threading
import contextlib
import time

StopEvent = object()  # 线程池线程没有任务了,需要终止时使用,不是必要使用object类的


class ThreadPool(object):

    def __init__(self, max_num, max_task_num = None):
        if max_task_num:  # 指定最大队列则使用最大数,不指定则无限制
            self.q = queue.Queue(max_task_num)   # 这里创建的队列,是要装任务的,而不是线程
        else:
            self.q = queue.Queue()
        self.max_num = max_num   # 指定最多有多少个线程
        self.cancel = False
        self.terminal = False
        self.generate_list = []   # 保存当前已经创建了多少线程(假设线程池为10,你使用了2个,他就只有两个线程)
        self.free_list = []      # 当前空闲的进程

    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:   # 判断1:查看空闲线程列表还有没有空闲线程,有则不创建,无进入下一个判断,判断2:已经创建的线程,有没有大于最大可以创建的线程数,小于则创建
            self.generate_thread()  # 调用创建线程函数
        w = (func, args, callback,)  # 将传入的将要执行的任务存入元组中
        self.q.put(w)  # 将任务放入队列

    def generate_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.call)   # t对象封装了一个self.call函数,每一个线程在t.start时,都会执行call方法
        t.start()

    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.currentThread
        self.generate_list.append(current_thread)   # 我没创建一个线程,就将线程保存到generate_list列表中

        event = self.q.get()   # 取任务
        while event != StopEvent: # 取到任务不等于StopEvent,表示一直有任务,一直循环

            func, arguments, callback = event  # 刚刚我们放入队列时,使用的是 (函数,元祖,函数),同样使用3个参数取出
            try:
                result = func(*arguments)  # 执行第一个函数
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result) # 执行第二个函数
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread): #当正常执行完以上函数后,我就去执行worker_state函数
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:

            self.generate_list.remove(current_thread)  # 当没有任务可取时,表示线程结束

    def close(self):
        """
        执行完所有的任务后,所有线程停止

        """
        self.cancel = True
        full_size = len(self.generate_list)  # 有几个线程,终止介个
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1

    def terminate(self):
        """
        无论是否还有任务,终止线程
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.empty()

    @contextlib.contextmanager  # 上下文管理器
    def worker_state(self, state_list, worker_thread):
        """
        用于记录线程中正在等待的线程数
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)



# How to use


pool = ThreadPool(5)

def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass


def action(i):
    print(i)

for i in range(300):
    ret = pool.run(action, (i,), callback)

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
pool.close()
pool.terminate()

进程基本使用

# 进程基本使用
from multiprocessing import Process

def f1(i):
    print(say hi ,i)

if __name__ == __main__:
    for i in range(10):
        p = Process(target=f1, args=(i,))
        p.start()

进程之间共享数据

# 进程之间共享数据
def foo(i,arg):
    arg.put(i)
    print(say hi, i, arg.qsize())

if __name__ == "__main__":
    li = queues.Queue(20,ctx=multiprocessing)   # 注意这里并不是队列的queue模块,而是进程的queues模块,queues支持封装时传入ctx=multiprocessing 使进程共享
    for i in range(10):
        p = Process(target=foo, args=(i,li))
        p.start()
    # import time
    # time.sleep(1)

进程池

# 进程池

from multiprocessing import Pool
import time

def f1(arg):
    time.sleep(1)
    print(arg)

if __name__ == __main__:
    pool = Pool(5)

    for i in range(30):
        # pool.apply(func=f1, args=(i,))
        pool.apply_async(func=f1, args=(i,))

    # 等待所有进程执行完毕结束
    # pool.close()

    # 当前任务执行到哪,就到哪结束
    # time.sleep(10)
    pool.terminate()  # 立即终止

    pool.join()

携程

# 携程
from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
    print(GET: %s % url)
    resp = requests.get(url)
    data = resp.text
    print(%d bytes received from %s. % (len(data),url))

gevent.joinall([
    gevent.spawn(f, https://www.python.org),
    gevent.spawn(f, https://www.yahoo.com),
    gevent.spawn(f, https://github.com) 
])

 

python学习笔记-Day11 (线程、进程、queue队列、生产消费模型、携程)

标签:

原文地址:http://www.cnblogs.com/coolking/p/5692337.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!