码迷,mamicode.com
首页 > 编程语言 > 详细

python coroutine测试

时间:2016-10-23 02:54:42      阅读:272      评论:0      收藏:0      [点我收藏+]

标签:编写   hello   future   import   self   python   read   next   callback   

目的:实现一个类似于asyn await的用法,来方便的编写callback相关函数

 

from __future__ import print_function
import time
import threading

def asyn_task(callback):
def SleepTask():
time.sleep(1)
callback("world")
# t = threading.Timer(1.0, lambda :callback("world"))
t = threading.Thread(target=SleepTask)
t.start() # after 30 seconds, "hello, world" will be printed


def main():
asyn_task(lambda s:print(s))
print ("hello")

def main2():
class myCoroutine:
def __init__(self):
self.evt = threading.Event()
self.co=self.myCOroutine()
self.co.next()
def myCOroutine(self):
while 1:
self.evt.clear()
line = (yield)
# print (‘yield:‘ + line)
self.line = line
self.evt.set()

def await(self):
self.evt.wait()
return self.line

def send(self, *args):
self.co.send(*args)

g = myCoroutine()
asyn_task(lambda s:g.send(s))
print ("hello")
s = g.await()
print (s)


main2()

python coroutine测试

标签:编写   hello   future   import   self   python   read   next   callback   

原文地址:http://www.cnblogs.com/cutepig/p/5988878.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!