multitask 小析
在对一个开源项目的分析过程中接识了multitask,它又涉及了不少完全不了解或者几乎不 了解的 python 概念。网上对 python-multitask
的介绍不多,而我一直认为,必须真正明白工具的实现机制才能得心应手的使用它。所以自
视理解了multitask的流程逻辑后,我试图将其源代码进行一下分析,班门弄斧了。
Prerequisites
python-multitask Cooperative multitasking Coroutine
metaclass:
- http://en.wikipedia.org/wiki/Metaclass
- http://www.ibm.com/developerworks/linux/library/l-pymeta/index.html
- http://www.voidspace.org.uk/python/articles/metaclasses.shtml
- http://gnosis.cx/publish/programming/metaclass_1.html
- http://gnosis.cx/publish/programming/metaclass_2.html
generator:
- PEP 255 -- Simple Generators
- PEP 288 -- Generators Attributes and Exceptions
- PEP 342 -- Coroutines via Enhanced Generators
- The C10K problem
How to Use?
multitask的主要代码全部包含在multitask.py文件中。根据其文档描述,multitask有如下 使用方式:
例1. 使用multitask并发运行两个毫不相干的task:
>>> def printer(message):
... while True:
... print message
... yield
...
>>> multitask.add(printer('hello'))
>>> multitask.add(printer('goodbye'))
>>> multitask.run()
hello
goodbye
hello
goodbye
hello
goodbye
[and so on ...]
例2. 使用multitask实现一个可以处理并发客户端连接的服务程序:
def listener(sock):
while True:
conn, address = (yield multitask.accept(sock))
multitask.add(client_handler(conn))
def client_handler(sock):
while True:
request = (yield multitask.recv(sock, 1024))
if not request:
break
response = handle_request(request)
yield multitask.send(sock, response)
multitask.add(listener(sock))
multitask.run()
例3. task (parent task)可以yield其它task (child task). child
task会一直执行,
直到运行结束抛出StopIteration
. child
task还能通过StopIteration
的参数, 将输出
数据返回给parent
task:
>>> def parent():
... print (yield return_none())
... print (yield return_one())
... print (yield return_many())
... try:
... yield raise_exception()
... except Exception, e:
... print 'caught exception: %s' % e
...
>>> def return_none():
... yield
... # do nothing
... # or return
... # or raise StopIteration
... # or raise StopIteration(None)
...
>>> def return_one():
... yield
... raise StopIteration(1)
...
>>> def return_many():
... yield
... raise StopIteration(2, 3) # or raise StopIteration((2, 3))
...
>>> def raise_exception():
... yield
... raise RuntimeError('foo')
...
>>> multitask.add(parent())
>>> multitask.run()
None
1
(2, 3)
caught exception: foo
Why it works?
-- 异步IO --
multitask使用select/poll来处理异步IO, 并使用coroutine来实现所谓的Cooperative Multi-tasking. 我们知道, 由于select本身的限制, 和linux平台上poll调用的低效, 使得 使用multitask实现一个100k量级并发的高性能服务程序也许并不合适. 同时, 由于 select/poll/epoll/kqueue无法用来准确检测普通文件的状态变化, 所以, multitask也不 能用于对磁盘文件的读写的 (python v2.7 documentation. 16.1 select -- Waiting for I/O completion).
multitask为什么没有使用linux平台的epoll和BSD平台的kqueue呢? 我还没有这个问题的答 案, 也许是基于跨平台的考虑(但是可以针对特殊平台特殊处理啊), 也许是作者想提供一个 尽量平台无关的实现. 无论如何, 对其针对平台进行扩展, 并不是难事儿.
异步I/O处理在_FDSelector中实现. 除了注册文件描述符, 解注文件描述符等常规操作外, 让我们来看看其事件处理函数process()的实现 (这里选取了以select.poll为底层的实现版 本):
354 def process(self, tm, timeout):
355 try:
356 ready = self._poller.poll(timeout)
357 except (select.error, IOError, OSError), err:
358 if err.args[0] != errno.EINTR:
359 raise
360 else:
361 for fd, event in ready:
362 fd = self._waits[fd]
363
364 if event & select.POLLNVAL:
365 err = errno.EINVAL
366 elif event & select.POLLHUP:
367 err = errno.ECONNRESET
368 else:
369 err = 0
370
371 if err == 0:
372 fd._reenqueue(tm)
373 else:
374 fd._reenqueue(tm,
375 exc_info=(_socket_error_from_errno(err),))
为了理清流程, 我们暂时先忽略代码的错误处理. 并且, 结合代码上下文,
我们知道了
_waits
是形如{fileno: FDReady()}
的字典变量,
FDReady
是对应一个需要异步处理
的task, tm变量是multitask的主干对象TaskManager
.
正是它驱动着各个coroutine的运
转, 这个我们下文再议. 那么,
fd._reenqueue(tm)
就是把FDReady()
本身再追加至TaskManager
的处理队列中了. 然后,
再在TaskManager
的处理流程中调用 读/写 函数 接收/发送 数据.
-- 一个读操作 --
当使用
def foo():
data = yield (multitask.recv(sock, 1024))
时, 发生了什么事儿呢? 我们具体来看一看:
714 @yieldable
715 def recv(sock, *args, **kwargs):
733 return _fdaction(sock, sock.recv, args, kwargs, read=True)
575 @yieldable
576 def _fdaction(fd, func, args=(), kwargs={}, read=False, write=False, exc=False):
577 timeout = kwargs.pop('timeout', None)
578
579 yield FDReady(fd, read, write, exc, timeout)
580
581 while True:
582 try:
583 raise StopIteration(func(*(args), **(kwargs)))
584 except (socket.error, IOError, OSError), err:
585 if err.args[0] != errno.EINTR:
586 raise
那么 * 号处的yield返回了另外一个generator '_fdaction', _fdaction在第一次被调用时 返回
FDReady(fd, True, False, False, timeout)
的事件监听instance. 当fd上有读事件发生时, TaskManager会再次调用此_fdaction, 并把
sock.recv(func)
读取到的数据以StopIteration
参数的形式返回给multitask.recv()
的调用者, 即, 数据此时被交给了 * 号处的data变量
(这个移交过程由TaskManager
调用
_fdaction的send()完成).
每一次recv()操作, 就是由一个FDReady和sock.recv()函数完成的.
而这个非阻塞的执行挂
起由 yield (multitask.recv(sock, 1024)
和
yield FDReady(fd, True, False, False, timeout)
一起完成.
-- YieldCondition --
FDReady
是什么, 它又是怎样和FDSelector
结合起来的呢?
FDReady._waits
是一个所有FDReady
实例共同使用的
TaskManager
和对应于此
TaskManger
的_FDSelector
的索引. 例如,
FDReady
在TaskManager x
中生成, 那么
它就会被加入x
对应的_FDSelector
的监控列表中. 于是,
FDReady
和_FDSelector
结
合起来了.
根据代码的docstring, YieldCondition
是所有会被一个task
yield至TaskManager
, 并
在某些事件发生时, 重启此task的类的基类.
YieldCondition
有一个metaclass
MetaYieldCondtion
,
用来跟踪全局存活的YieldCondtion
类的继承类(FDReady
,
_QueueAction
).
FDReady
类又可以通过_waits跟踪其所属的TaskManger
和使用的
_FDSelector
, 那么,
通过MetaYieldCondition
就能访问到现存的所有TaskManager
(可
能是有多个实例, 比如多线程模式中,
每个线程实例化自己的TaskManager
和其对应的
_FDSelector
了.
-- 驱动力 --
曾经有一段时间, 知识的匮乏让我陷入了一阵"可笑"的思索: linux内核是什么, 为什么会 不停的运转, 进程又为什么会被执行, 谁来推动的. 当然, 我现在知道了, 计算机的心肌纤 维 - 时钟 - 是这一切的"始作俑者". 我看到一段代码, 必须先明白生命载体(进程, 线程) 和行为模式, 这样, 才能明白, 代码从哪里来, 到哪里去.
multitask的载体是调用它的进程或者线程, 那么它的行为模式就是由TaskManager来表现 的. 当你调用那个事件驱动程序中唯一的阻塞函数multitask.run()时, 这个"世界"就开始 了运转.
1106 def run(self):
1117 while (self._queue or
1118 self._timeouts or
1119 MetaYieldCondition._has_waits(self)):
1120
1121 while self._queue:
1122 self._run_next_task()
1123
1124 MetaYieldCondition._handle_waits(self)
1125
1126 if self._timeouts:
1127 self._handle_timeouts(self._get_run_timeout())
self._queue
元素是(task, input, exc_info)
这样的元组.
每一个通过TaskManager
的add()
添加的任务都会以(task, None, ())
的形式追加到其中. 它还包括
TaskManager
在运行过程中生成的中间任务.
MetaYieldCondition._has_waits()
和MetaYieldCondition._handle_waits()
等等主要
用于超时处理, 我们这里暂时先不用讨论. 那么, 暂时就把注意力集中到
self._run_next_task()
身上.
1186 def _run_next_task(self):
1187 task, input, exc_info = self._queue.popleft()
1188 while True:
1189 try:
1190 if exc_info:
1191 output = task.throw(*exc_info)
1192 else:
1193 output = task.send(input)
1194 except StopIteration, e:
1195 if not isinstance(task, _ChildTask):
1196 break
1197 else:
1198 if not e.args:
1199 output = None
1200 elif len(e.args) == 1:
1201 output = e.args[0]
1202 else:
1203 output = e.args
1204 task, input, exc_info = task.parent, output, ()
1205 except:
1206 if isinstance(task, _ChildTask):
1207 # Propagate exception to parent
1208 task, input, exc_info = task.parent, None, sys.exc_info()
1209 else:
1210 # No parent task, so just die
1211 raise
1212 else:
1213 if isinstance(output, types.GeneratorType):
1214 task, input, exc_info = _ChildTask(task, output), None, ()
1215 else:
1216 if isinstance(output, YieldCondition):
1217 output.task = task
1218 output._handle(self)
1219 else:
1220 # Return any other output as input and send task to
1221 # end of queue
1222 self._enqueue(task, input=output)
1223 break
为了简化和清晰, 假设现在TaskManager
中只有一个上述示例调用生成的task:
def foo():
data = yield (multitask.recv(sock, 1024)) # *
生成的generator 'foo'.
下面将一步一步分析其在TaskManager
中的变换和执行流程.
首先, 第一次执行_run_next_task()
时, 将上述的generator 'foo'
从队列中取出:
1187 task, input, exc_info = self._queue.popleft()
其中, task为generator 'foo', input为初始化值None, exc_info为初始化值().
1190 if exc_info:
1191 output = task.throw(*exc_info)
1192 else:
1193 output = task.send(input)
实际将以output = task.send(None)
的形式执行上述代码. 这就是PEP
342中提到的, 使
用None初次调用generator, 此generator 'foo'
返回multitask.recv(sock, 1024)
并在
yield返回前挂起. 由上面的分析得知, multitask.recv()
返回generator
'_fdaction'.
假设在此过程中没有任何异常状况, 那么代码将会执行到:
1213 if isinstance(output, types.GeneratorType):
1214 task, input, exc_info = _ChildTask(task, output), None, ()
1215 else:
1216 if isinstance(output, YieldCondition):
1217 output.task = task
1218 output._handle(self)
1219 else:
1220 # Return any other output as input and send task to
1221 # end of queue
1222 self._enqueue(task, input=output)
1223 break
显然, 代码1213的判断为真, 同时, output为generator '_fdaction'. 执行1214行:
1214 task, input, exc_info = _ChildTask(task, output), None, ()
这里, 把generator 'foo'和generator '_fdaction' 父子化:
1029 class _ChildTask(object):
1030
1031 __slots__ = ('parent', 'send', 'throw')
1032
1033 def __init__(self, parent, task):
1034 self.parent = parent
1035 self.send = task.send
1036 self.throw = task.throw
两个generator通过_ChildTask确立了父子关系, 并且暂时合并成了一个task -- generator '_ChildTask'. 然后, 在1214行, 变量task, input, exc_info都被重新赋值.
接着再回到循环的开始:
1190 if exc_info:
1191 output = task.throw(*exc_info)
1192 else:
1193 output = task.send(input)
上述代码块, 最终以output = task.send(None)
的形式调用.
记得上面提到的PEP 342么,
使用None
值参数第一次调用一个generator,
这时这个generator已经变成了第一次循环生
成的generator '_ChildTask', 也即generator '_fdaction'本身.
回顾上面的代码,
generator '_fdaction'会返回一个FDReady实例, 这个实例赋给了output.
接着第二次循环接着往下走.
FDReady
是YieldCondition
的子类, 并且它本身不是generator, 那么,
1215 else:
1216 if isinstance(output, YieldCondition):
1217 output.task = task
1218 output._handle(self)
1219 else:
1220 # Return any other output as input and send task to
1221 # end of queue
1222 self._enqueue(task, input=output)
1223 break
哈, 1217和1218执行了. FDReady的task变量 (继参自YieldCondition)暂存generator '_fdaction'的包装generator '_ChildTask', 然后调用其_handle方法:
508 def _handle(self, tm):
509 self._waits[tm].add(self)
510 super(FDReady, self)._handle(tm)
FDReady的_handle方法将此FDReady实例注册到目前这个TaskManager
对应的_FDSelector
中, 然后调用FDReady父类YieldCondition的_handle方法:
297 def _handle(self, tm):
298 if self.expiration is not None:
299 tm._add_timeout(self)
嗯, 超时处理, 咱上面说了, 超时什么的, 暂时不予接待的.
接着第二次循环往下走. break了. 那么此时, TaskManager此时的_queue空了. FDReady被 注册到了_FDSelector中, FDReady的task变量引用着generator '_fdaction'的包装 generator '_ChildTask', '_ChildTask'的parent成员又引用着最起始的generator 'foo'. 很完美!
是的,
熟悉异步IO的你可能在想该select()/poll()/epoll_wait()/kqueue()
之类操作了
吧. 嗯, 该收获事件了.
再次回到run()函数的循环中, 现在_run_next_task()执行结束了, 下面到了1124行:
1124 MetaYieldCondition._handle_waits(self)
248 @staticmethod
249 def _handle_waits(tm, timeout=None):
250 for cls in MetaYieldCondition.__custom_wait_handlers:
251 if cls._has_waits(tm):
252 cls._handle_waits(tm, tm._get_run_timeout(timeout))
还记得上面说的"YieldCondition有一个metaclass MetaYieldCondtion, 用来跟踪全局存 活的YieldCondtion类的继承类(FDReady, _QueueAction)"不? __custom_wait_handlers就 是这个用于跟踪的变量, 在上面所有假设的前提下, 这个变量中只有类FDReady, 那好了, 上面几行代码就相当于:
if FDReady._has_waits(tm):
FDReady._handle_waits(tm, tm._get_run_timeout(timeout))
实现如下:
520 @classmethod
521 def _has_waits(cls, tm):
522 return bool(cls._waits[tm])
523
524 @classmethod
525 def _handle_waits(cls, tm, timeout):
526 if (timeout is None) or (timeout > 0.0):
527 cls._waits[tm].process(tm, timeout)
现在tm对象的_FDSelector刚刚加入了一个新的FDReady的fd, _has_waits为True. timeout 又被假设为None. cls._waits[tm].process(tm, timeout)就是在调用文章开头所分析的 _FDSelector的process函数了. 有始有终, 一切被衔接起来了.
等fd上有数据到达时(记得咱们的generator 'foo'是multitask.recv()的吧), FDReady的 _reenqueue()函数被调用:
516 def _reenqueue(self, tm, input=None, exc_info=()):
517 self._waits[tm].remove(self)
518 super(FDReady, self)._reenqueue(tm, input, exc_info)
FDReady先把自己从_FDSelector
中解注(事件驱动, 你懂的), 然后,
304 def _reenqueue(self, tm, input=None, exc_info=()):
305 tm._enqueue(self.task, input, exc_info)
306 if self.expiration is not None:
307 tm._remove_timeout(self)
Yes! tm._enqueue(task, None, ()), FDReady的fd上有了读事件, 就又回到了TaskManager 的_queue中. 这里的task是FDReady的变量引用那个"generator '_fdaction'的包装generator '_ChildTask'".
稍安毋躁, 现在只是有了读事件, 真正的数据还在TCP/IP栈里呢, 必须得读出来啊.
直接跳到TaskManager的_run_next_task()中吧:
1190 if exc_info:
1191 output = task.throw(*exc_info)
1192 else:
1193 output = task.send(input)
task是那个"generator '_fdaction'的包装generator '_ChildTask'", input是None, 于是, send()调用后, _fdaction中代码开始从yield下一行执行, 开始正经读数据了:
583 raise StopIteration(func(*(args), **(kwargs)))
func引用sock.recv(). 读出的数据返回给上面的output. .........
还没完, 我们的终点是generator 'foo'. 但是我觉得上面的过程要是理解的话, 下面的过 程也应该不难了.
-- 通讯组件 Queue --
再议.
Look back
上面只在简化的情况下分析了例2的流程, 例1和例3, 你能理解了么?
其实, 我是花了很久, 才有上面肤浅的理解. 在读coroutine, metaclass包括multitask本 身的定义和相关代码时, 我脑子里一直在念叨"brain fuck"这个词. 以前没接触过 Functional Programming, 新的概念新的领域, 刚开始有点苦恼也是正常现象吧?!
--- Yeah, I smoke a lot, still on top.
Comments
不要轻轻地离开我,请留下点什么...