码迷,mamicode.com
首页 > 编程语言 > 详细

python asyncio协程

时间:2021-03-02 11:44:41      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:dna   inter   其他   origin   bec   traceback   zed   something   throw   

1. 基础的协程

  1. async def 开始声明一个函数
  2. 创建事件循环,并添加协程对象(Corountine)来执行
async def test():  # 声明一个协程函数 test
asyncio.ensure_future(obj) # 将协程对象转变成 future
asyncio.gather(coroutines/futures) # 将协程对象或者future,打包成一个 future
asyncio.wait([task1,task2]) # 等待futures或coroutines完成,返回一个 coroutine
import asyncio

async def test(): # async开头,来定义协程函数
    print(‘1‘)
    await asyncio.sleep(2) # 模拟IO操作,await可等待的有:协程对象,Future,Task
    print(‘3‘)

t = test() # 不会执行test(),t只是一个协程对象

loop = asyncio.get_event_loop() # 创建事件循环
loop.run_until_complete(t) # 将协程函数对象,放入到事件循环中执行
# loop.close() # 关闭循环,不关闭也没问题,只是关闭了以后,就不能再次执行 loop.run_until_complete(t) 了
# asyncio.run(t) # py3.7以后的功能,可以算作等同于上面三句话

2. 多个协程函数执行

  1. 将多个协程对象或多个future,打包成一个future: asyncio.gather(fu1,fu2,..)
import asyncio


async def test(): # async开头,来定义协程函数
    print(‘1‘)
    await asyncio.sleep(2) # 模拟IO操作,await可等待的有:协程对象,Future,Task
    print(‘3‘)

async def test2():
    print(‘2‘)

t = test() # 不会执行test(),t只是一个协程对象
t2 = test2()
loop = asyncio.get_event_loop() # 创建事件循环
all = asyncio.gather(t,t2) # gather可以将多个协程对象或多个future,打包成一个future
loop.run_until_complete(all) # 执行事件循环,run_until_complete 其实接受的参数是 future

3. Future

future是对task的封装,

def done_callback(futu):
    print(‘Done‘)

futu = asyncio.ensure_future(test())
futu.add_done_callback(done_callback)
loop.run_until_complete(futu)

4. Task

Task用来在事件循环中执行协程对象。

import asyncio


async def test(): # async开头,来定义协程函数
    print(‘1‘)
    await asyncio.sleep(2) # 模拟IO操作,await可等待的有:协程对象,Future,Task
    print(‘3‘)
    return ‘Finished.‘

async def test2():
    # 创建task,创建的同时,test()就已经被添加到事件循环中了,此时loop有 [test2(),test(),test()]
    task1 = asyncio.create_task(test())
    task2 = asyncio.create_task(test())

    ret1 = await task1
    ret2 = await task2
    print(ret1,ret2)

asyncio.run(test2()) # 首先创建事件循环,此时loop里面有个 test2()

5. 连接 协程 和 线程,进程

import asyncio
import requests
import functools

async def crawler(url):
    print(‘Start crawling:‘, url)
    headers = {
        ‘User-Agent‘: ‘Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36‘
    }
    # 利用BaseEventLoop.run_in_executor()可以在coroutine中执行第三方的命令,例如requests.get()
    # 第三方命令的参数与关键字利用functools.partial传入
    future = asyncio.get_event_loop().run_in_executor(None, functools.partial(requests.get, url, headers=headers))
    response = await future
    print(‘Response received:‘, url)
    # 处理获取到的URL响应
    with open(url[-6:], ‘wb‘) as f:
        f.write(response.content)

url_list = [‘https://w.wallhaven.cc/full/57/wallhaven-57lzd8.jpg‘,
            ‘https://w.wallhaven.cc/full/zm/wallhaven-zm5lyg.jpg‘,
            ‘https://w.wallhaven.cc/full/28/wallhaven-28ey2g.jpg‘,
            ‘https://w.wallhaven.cc/full/p8/wallhaven-p8gvvp.jpg‘]
tasks = [crawler(url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

附录:

1. Future,Task and Event loop

Event Loop

On any platform, when we want to do something asynchronously, it usually involves an event loop. An event loop is a loop that can register tasks to be executed, execute them, delay or even cancel them and handle different events related to these operations. Generally, we schedule multiple async functions to the event loop. The loop runs one function, while that function waits for IO, it pauses it and runs another. When the first function completes IO, it is resumed. Thus two or more functions can co-operatively run together. This the main goal of an event loop.

在一个平台,我们想要做一些异步的事情,总会牵扯到事件循环。一个事件循环可以注册一些任务去执行,延迟,取消,以及其他和这些操作有关的事情。通常来说,我们安排不同的异步函数到事件循环中去。事件循环会运行函数,当这个函数遇到IO操作时,事件循环会暂停这个函数转而运行另一个函数。当第一个函数完成了IO操作,这个函数会继续执行,这样,两个或者多个函数就可以一起执行,这就是事件循环的主要目的。

The event loop can also pass resource intensive functions to a thread pool for processing. The internals of the event loop is quite complex and we don’t need to worry much about it right away. We just need to remember that the event loop is the mechanism through which we can schedule our async functions and get them executed.

事件循环也可以将资源密集型任务放到线程池中处理。事件循环的内部结构十分复杂,我们不必过多关注这个问题。我们只需要记住事件循环就是一个可以安排多个异步任务,并让它们执行起来的一个机制就好了。

Futures / Tasks

If you are into Javascript too, you probably know about Promise. In Python we have similar concepts – Future/Task. A Future is an object that is supposed to have a result in the future. A Task is a subclass of Future that wraps a coroutine. When the coroutine finishes, the result of the Task is realized.

一个Future就是一个在未来能产出一个result的对象。一个Task是Future的子类,它封装了coroutine(协程),当coroutine完成后,这个Task的结果也就实现了。

Coroutines

We discussed Coroutines in our last blog post. It’s a way of pausing a function and returning a series of values periodically. A coroutine can pause the execution of the function by using the yield yield from or await (python 3.5+) keywords in an expression. The function is paused until the yield statement actually gets a value.

协程,就是暂停一个函数并且定期返回一系列值的一种方法。一个协程可以通过使用关键字yield,yield from,或者await 来暂停函数的执行。在yield获得值之前程序会一直暂停。

Fitting Event Loop and Future/Task Together

It’s simple. We need an event loop and we need to register our future/task objects with the event loop. The loop will schedule and run them. We can add callbacks to our future/task objects so that we can be notified when a future has it’s results.

Very often we choose to use coroutines for our work. We wrap a coroutine in Future and get a Task object. When a coroutine yields, it is paused. When it has a value, it is resumed. When it returns, the Task has completed and gets a value. Any associated callback is run. If the coroutine raises an exception, the Task fails and not resolved.

为了将事件循环和Future/Task整合一起,我们需要用事件循环注册我们的Future/Task,这个循环会计划和运行这些任务。当这些任务有了结果时,我们可以加入回调来获取通知。

2. Combining Coroutines with Threads and Processes

A lot of existing libraries are not ready to be used with asyncio natively. They may block, or depend on concurrency features not available through the module. It is still possible to use those libraries in an application based on asyncio by using an executor from concurrent.futures to run the code either in a separate thread or a separate process.

Threads

The run_in_executor() method of the event loop takes an executor instance, a regular callable to invoke, and any arguments to be passed to the callable. It returns a Future that can be used to wait for the function to finish its work and return something. If no executor is passed in, a ThreadPoolExecutor is created. This example explicitly creates an executor to limit the number of worker threads it will have available.

A ThreadPoolExecutor starts its worker threads and then calls each of the provided functions once in a thread. This example shows how to combine run_in_executor() and wait() to have a coroutine yield control to the event loop while blocking functions run in separate threads, and then wake back up when those functions are finished.

asyncio_executor_thread.py

import asyncio
import concurrent.futures
import logging
import sys
import time


def blocks(n):
    log = logging.getLogger(‘blocks({})‘.format(n))
    log.info(‘running‘)
    time.sleep(0.1)
    log.info(‘done‘)
    return n ** 2


async def run_blocking_tasks(executor):
    log = logging.getLogger(‘run_blocking_tasks‘)
    log.info(‘starting‘)

    log.info(‘creating executor tasks‘)
    loop = asyncio.get_event_loop()
    blocking_tasks = [
        loop.run_in_executor(executor, blocks, i)
        for i in range(6)
    ]
    log.info(‘waiting for executor tasks‘)
    completed, pending = await asyncio.wait(blocking_tasks)
    results = [t.result() for t in completed]
    log.info(‘results: {!r}‘.format(results))

    log.info(‘exiting‘)


if __name__ == ‘__main__‘:
    # Configure logging to show the name of the thread
    # where the log message originates.
    logging.basicConfig(
        level=logging.INFO,
        format=‘%(threadName)10s %(name)18s: %(message)s‘,
        stream=sys.stderr,
    )

    # Create a limited thread pool.
    executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=3,
    )

    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(
            run_blocking_tasks(executor)
        )
    finally:
        event_loop.close()

asyncio_executor_thread.py uses logging to conveniently indicate which thread and function are producing each log message. Because a separate logger is used in each call to blocks(), the output clearly shows the same threads being reused to call multiple copies of the function with different arguments.

$ python3 asyncio_executor_thread.py

MainThread run_blocking_tasks: starting
MainThread run_blocking_tasks: creating executor tasks
ThreadPoolExecutor-0_0          blocks(0): running
ThreadPoolExecutor-0_1          blocks(1): running
ThreadPoolExecutor-0_2          blocks(2): running
MainThread run_blocking_tasks: waiting for executor tasks
ThreadPoolExecutor-0_0          blocks(0): done
ThreadPoolExecutor-0_1          blocks(1): done
ThreadPoolExecutor-0_2          blocks(2): done
ThreadPoolExecutor-0_0          blocks(3): running
ThreadPoolExecutor-0_1          blocks(4): running
ThreadPoolExecutor-0_2          blocks(5): running
ThreadPoolExecutor-0_0          blocks(3): done
ThreadPoolExecutor-0_2          blocks(5): done
ThreadPoolExecutor-0_1          blocks(4): done
MainThread run_blocking_tasks: results: [0, 9, 16, 25, 1, 4]
MainThread run_blocking_tasks: exiting

Processes

A ProcessPoolExecutor works in much the same way, creating a set of worker processes instead of threads. Using separate processes requires more system resources, but for computationally-intensive operations it can make sense to run a separate task on each CPU core.

asyncio_executor_process.py

# changes from asyncio_executor_thread.py

if __name__ == ‘__main__‘:
    # Configure logging to show the id of the process
    # where the log message originates.
    logging.basicConfig(
        level=logging.INFO,
        format=‘PID %(process)5s %(name)18s: %(message)s‘,
        stream=sys.stderr,
    )

    # Create a limited process pool.
    executor = concurrent.futures.ProcessPoolExecutor(
        max_workers=3,
    )

    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(
            run_blocking_tasks(executor)
        )
    finally:
        event_loop.close()

The only change needed to move from threads to processes is to create a different type of executor. This example also changes the logging format string to include the process id instead of the thread name, to demonstrate that the tasks are in fact running in separate processes.

$ python3 asyncio_executor_process.py

PID 40498 run_blocking_tasks: starting
PID 40498 run_blocking_tasks: creating executor tasks
PID 40498 run_blocking_tasks: waiting for executor tasks
PID 40499          blocks(0): running
PID 40500          blocks(1): running
PID 40501          blocks(2): running
PID 40499          blocks(0): done
PID 40500          blocks(1): done
PID 40501          blocks(2): done
PID 40500          blocks(3): running
PID 40499          blocks(4): running
PID 40501          blocks(5): running
PID 40499          blocks(4): done
PID 40500          blocks(3): done
PID 40501          blocks(5): done
PID 40498 run_blocking_tasks: results: [1, 4, 9, 0, 16, 25]
PID 40498 run_blocking_tasks: exiting

3. How does asyncio work?, Bharel的回答

Before answering this question we need to understand a few base terms, skip these if you already know any of them.

Generators

Generators are objects that allow us to suspend the execution of a python function. User curated generators are implement using the keyword yield. By creating a normal function containing the yield keyword, we turn that function into a generator:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

As you can see, calling next() on the generator causes the interpreter to load test‘s frame, and return the yielded value. Calling next() again, cause the frame to load again into the interpreter stack, and continue on yielding another value.

By the third time next() is called, our generator was finished, and StopIteration was thrown.

Communicating with a generator

A less-known feature of generators, is the fact that you can communicate with them using two methods: send() and throw().

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

Upon calling gen.send(), the value is passed as a return value from the yield keyword.

gen.throw() on the other hand, allows throwing Exceptions inside generators, with the exception raised at the same spot yield was called.

Returning values from generators

Returning a value from a generator, results in the value being put inside the StopIteration exception. We can later on recover the value from the exception and use it to our need.

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

Behold, a new keyword: yield from

Python 3.4 came with the addition of a new keyword: yield from. What that keyword allows us to do, is pass on any next(), send() and throw() into an inner-most nested generator. If the inner generator returns a value, it is also the return value of yield from:

>>> def inner():
...     inner_result = yield 2
...     print(‘inner‘, inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print(‘outer‘, val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

I‘ve written an article to further elaborate on this topic.

Putting it all together

Upon introducing the new keyword yield from in Python 3.4, we were now able to create generators inside generators that just like a tunnel, pass the data back and forth from the inner-most to the outer-most generators. This has spawned a new meaning for generators - coroutines.

Coroutines are functions that can be stopped and resumed while being run. In Python, they are defined using the async def keyword. Much like generators, they too use their own form of yield from which is await. Before async and await were introduced in Python 3.5, we created coroutines in the exact same way generators were created (with yield from instead of await).

async def inner():
    return 1

async def outer():
    await inner()

Like every iterator or generator that implement the __iter__() method, coroutines implement __await__() which allows them to continue on every time await coro is called.

There‘s a nice sequence diagram inside the Python docs that you should check out.

In asyncio, apart from coroutine functions, we have 2 important objects: tasks and futures.

Futures

Futures are objects that have the __await__() method implemented, and their job is to hold a certain state and result. The state can be one of the following:

  1. PENDING - future does not have any result or exception set.
  2. CANCELLED - future was cancelled using fut.cancel()
  3. FINISHED - future was finished, either by a result set using fut.set_result() or by an exception set using fut.set_exception()

The result, just like you have guessed, can either be a Python object, that will be returned, or an exception which may be raised.

Another important feature of future objects, is that they contain a method called add_done_callback(). This method allows functions to be called as soon as the task is done - whether it raised an exception or finished.

Tasks

Task objects are special futures, which wrap around coroutines, and communicate with the inner-most and outer-most coroutines. Every time a coroutine awaits a future, the future is passed all the way back to the task (just like in yield from), and the task receives it.

Next, the task binds itself to the future. It does so by calling add_done_callback() on the future. From now on, if the future will ever be done, by either being cancelled, passed an exception or passed a Python object as a result, the task‘s callback will be called, and it will rise back up to existence.

Asyncio

The final burning question we must answer is - how is the IO implemented?

Deep inside asyncio, we have an event loop. An event loop of tasks. The event loop‘s job is to call tasks every time they are ready and coordinate all that effort into one single working machine.

The IO part of the event loop is built upon a single crucial function called select. Select is a blocking function, implemented by the operating system underneath, that allows waiting on sockets for incoming or outgoing data. Upon receiving data it wakes up, and returns the sockets which received data, or the sockets which are ready for writing.

When you try to receive or send data over a socket through asyncio, what actually happens below is that the socket is first checked if it has any data that can be immediately read or sent. If its .send() buffer is full, or the .recv() buffer is empty, the socket is registered to the select function (by simply adding it to one of the lists, rlist for recv and wlist for send) and the appropriate function awaits a newly created future object, tied to that socket.

When all available tasks are waiting for futures, the event loop calls select and waits. When the one of the sockets has incoming data, or its send buffer drained up, asyncio checks for the future object tied to that socket, and sets it to done.

Now all the magic happens. The future is set to done, the task that added itself before with add_done_callback() rises up back to life, and calls .send() on the coroutine which resumes the inner-most coroutine (because of the await chain) and you read the newly received data from a nearby buffer it was spilled unto.

Method chain again, in case of recv():

  1. select.select waits.
  2. A ready socket, with data is returned.
  3. Data from the socket is moved into a buffer.
  4. future.set_result() is called.
  5. Task that added itself with add_done_callback() is now woken up.
  6. Task calls .send() on the coroutine which goes all the way into the inner-most coroutine and wakes it up.
  7. Data is being read from the buffer and returned to our humble user.

In summary, asyncio uses generator capabilities, that allow pausing and resuming functions. It uses yield from capabilities that allow passing data back and forth from the inner-most generator to the outer-most. It uses all of those in order to halt function execution while it‘s waiting for IO to complete (by using the OS select function).

And the best of all? While one function is paused, another may run and interleave with the delicate fabric, which is asyncio.

python asyncio协程

标签:dna   inter   其他   origin   bec   traceback   zed   something   throw   

原文地址:https://www.cnblogs.com/wztshine/p/14460847.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!