multitask 小析

在对一个开源项目的分析过程中接识了multitask,它又涉及了不少完全不了解或者几乎不 了解的 python 概念。网上对 python-multitask

的介绍不多,而我一直认为,必须真正明白工具的实现机制才能得心应手的使用它。所以自

视理解了multitask的流程逻辑后,我试图将其源代码进行一下分析,班门弄斧了。

Prerequisites

python-multitask Cooperative multitasking Coroutine

metaclass:

generator:

How to Use?

multitask的主要代码全部包含在multitask.py文件中。根据其文档描述,multitask有如下 使用方式:

例1. 使用multitask并发运行两个毫不相干的task:

>>> def printer(message):
...     while True:
...         print message
...         yield
...
>>> multitask.add(printer('hello'))
>>> multitask.add(printer('goodbye'))
>>> multitask.run()
hello
goodbye
hello
goodbye
hello
goodbye
[and so on ...]

例2. 使用multitask实现一个可以处理并发客户端连接的服务程序:

def listener(sock):
    while True:
        conn, address = (yield multitask.accept(sock))
        multitask.add(client_handler(conn))

def client_handler(sock):
    while True:
        request = (yield multitask.recv(sock, 1024))
        if not request:
            break
        response = handle_request(request)
        yield multitask.send(sock, response)

multitask.add(listener(sock))
multitask.run()

例3. task (parent task)可以yield其它task (child task). child task会一直执行, 直到运行结束抛出StopIteration. child task还能通过StopIteration的参数, 将输出 数据返回给parent

task:

>>> def parent():
...     print (yield return_none())
...     print (yield return_one())
...     print (yield return_many())
...     try:
...         yield raise_exception()
...     except Exception, e:
...         print 'caught exception: %s' % e
...
>>> def return_none():
...     yield
...     # do nothing
...     # or return
...     # or raise StopIteration
...     # or raise StopIteration(None)
...
>>> def return_one():
...     yield
...     raise StopIteration(1)
...
>>> def return_many():
...     yield
...     raise StopIteration(2, 3)  # or raise StopIteration((2, 3))
...
>>> def raise_exception():
...     yield
...     raise RuntimeError('foo')
...
>>> multitask.add(parent())
>>> multitask.run()
None
1
(2, 3)
caught exception: foo

Why it works?

-- 异步IO --

multitask使用select/poll来处理异步IO, 并使用coroutine来实现所谓的Cooperative Multi-tasking. 我们知道, 由于select本身的限制, 和linux平台上poll调用的低效, 使得 使用multitask实现一个100k量级并发的高性能服务程序也许并不合适. 同时, 由于 select/poll/epoll/kqueue无法用来准确检测普通文件的状态变化, 所以, multitask也不 能用于对磁盘文件的读写的 (python v2.7 documentation. 16.1 select -- Waiting for I/O completion).

multitask为什么没有使用linux平台的epoll和BSD平台的kqueue呢? 我还没有这个问题的答 案, 也许是基于跨平台的考虑(但是可以针对特殊平台特殊处理啊), 也许是作者想提供一个 尽量平台无关的实现. 无论如何, 对其针对平台进行扩展, 并不是难事儿.

异步I/O处理在_FDSelector中实现. 除了注册文件描述符, 解注文件描述符等常规操作外, 让我们来看看其事件处理函数process()的实现 (这里选取了以select.poll为底层的实现版 本):

354         def process(self, tm, timeout):
355             try:
356                 ready = self._poller.poll(timeout)
357             except (select.error, IOError, OSError), err:
358                 if err.args[0] != errno.EINTR:
359                     raise
360             else:
361                 for fd, event in ready:
362                     fd = self._waits[fd]
363
364                     if event & select.POLLNVAL:
365                         err = errno.EINVAL
366                     elif event & select.POLLHUP:
367                         err = errno.ECONNRESET
368                     else:
369                         err = 0
370
371                     if err == 0:
372                         fd._reenqueue(tm)
373                     else:
374                         fd._reenqueue(tm,
375                            exc_info=(_socket_error_from_errno(err),))

为了理清流程, 我们暂时先忽略代码的错误处理. 并且, 结合代码上下文, 我们知道了 _waits 是形如{fileno: FDReady()}的字典变量, FDReady是对应一个需要异步处理 的task, tm变量是multitask的主干对象TaskManager. 正是它驱动着各个coroutine的运 转, 这个我们下文再议. 那么,

fd._reenqueue(tm)

就是把FDReady()本身再追加至TaskManager的处理队列中了. 然后, 再在TaskManager 的处理流程中调用 读/写 函数 接收/发送 数据.

-- 一个读操作 --

当使用

def foo():
    data = yield (multitask.recv(sock, 1024))

时, 发生了什么事儿呢? 我们具体来看一看:

714 @yieldable
715 def recv(sock, *args, **kwargs):
733     return _fdaction(sock, sock.recv, args, kwargs, read=True)

575 @yieldable
576 def _fdaction(fd, func, args=(), kwargs={}, read=False, write=False, exc=False):
577     timeout = kwargs.pop('timeout', None)
578
579     yield FDReady(fd, read, write, exc, timeout)
580
581     while True:
582         try:
583             raise StopIteration(func(*(args), **(kwargs)))
584         except (socket.error, IOError, OSError), err:
585             if err.args[0] != errno.EINTR:
586                 raise

那么 * 号处的yield返回了另外一个generator '_fdaction', _fdaction在第一次被调用时 返回

FDReady(fd, True, False, False, timeout)

的事件监听instance. 当fd上有读事件发生时, TaskManager会再次调用此_fdaction, 并把

sock.recv(func)读取到的数据以StopIteration参数的形式返回给multitask.recv() 的调用者, 即, 数据此时被交给了 * 号处的data变量 (这个移交过程由TaskManager调用 _fdaction的send()完成).

每一次recv()操作, 就是由一个FDReady和sock.recv()函数完成的. 而这个非阻塞的执行挂 起由 yield (multitask.recv(sock, 1024)yield FDReady(fd, True, False, False, timeout) 一起完成.

-- YieldCondition --

FDReady是什么, 它又是怎样和FDSelector结合起来的呢?

FDReady._waits是一个所有FDReady实例共同使用的 TaskManager和对应于此 TaskManger_FDSelector 的索引. 例如, FDReadyTaskManager x中生成, 那么 它就会被加入x对应的_FDSelector的监控列表中. 于是, FDReady_FDSelector结 合起来了.

根据代码的docstring, YieldCondition是所有会被一个task yield至TaskManager, 并 在某些事件发生时, 重启此task的类的基类. YieldCondition有一个metaclass MetaYieldCondtion, 用来跟踪全局存活的YieldCondtion类的继承类(FDReady, _QueueAction). FDReady类又可以通过_waits跟踪其所属的TaskManger和使用的 _FDSelector, 那么, 通过MetaYieldCondition就能访问到现存的所有TaskManager(可 能是有多个实例, 比如多线程模式中, 每个线程实例化自己的TaskManager和其对应的 _FDSelector了.

-- 驱动力 --

曾经有一段时间, 知识的匮乏让我陷入了一阵"可笑"的思索: linux内核是什么, 为什么会 不停的运转, 进程又为什么会被执行, 谁来推动的. 当然, 我现在知道了, 计算机的心肌纤 维 - 时钟 - 是这一切的"始作俑者". 我看到一段代码, 必须先明白生命载体(进程, 线程) 和行为模式, 这样, 才能明白, 代码从哪里来, 到哪里去.

multitask的载体是调用它的进程或者线程, 那么它的行为模式就是由TaskManager来表现 的. 当你调用那个事件驱动程序中唯一的阻塞函数multitask.run()时, 这个"世界"就开始 了运转.

1106     def run(self):
1117         while (self._queue or
1118                self._timeouts or
1119                MetaYieldCondition._has_waits(self)):
1120
1121             while self._queue:
1122                 self._run_next_task()
1123
1124             MetaYieldCondition._handle_waits(self)
1125
1126             if self._timeouts:
1127                 self._handle_timeouts(self._get_run_timeout())

self._queue元素是(task, input, exc_info)这样的元组. 每一个通过TaskManageradd()添加的任务都会以(task, None, ())的形式追加到其中. 它还包括 TaskManager在运行过程中生成的中间任务.

MetaYieldCondition._has_waits()MetaYieldCondition._handle_waits()等等主要 用于超时处理, 我们这里暂时先不用讨论. 那么, 暂时就把注意力集中到 self._run_next_task()身上.

1186     def _run_next_task(self):
1187         task, input, exc_info = self._queue.popleft()
1188         while True:
1189             try:
1190                 if exc_info:
1191                     output = task.throw(*exc_info)
1192                 else:
1193                     output = task.send(input)
1194             except StopIteration, e:
1195                 if not isinstance(task, _ChildTask):
1196                     break
1197                 else:
1198                     if not e.args:
1199                         output = None
1200                     elif len(e.args) == 1:
1201                         output = e.args[0]
1202                     else:
1203                         output = e.args
1204                     task, input, exc_info = task.parent, output, ()
1205             except:
1206                 if isinstance(task, _ChildTask):
1207                     # Propagate exception to parent
1208                     task, input, exc_info = task.parent, None, sys.exc_info()
1209                 else:
1210                     # No parent task, so just die
1211                     raise
1212             else:
1213                 if isinstance(output, types.GeneratorType):
1214                     task, input, exc_info = _ChildTask(task, output), None, ()
1215                 else:
1216                     if isinstance(output, YieldCondition):
1217                         output.task = task
1218                         output._handle(self)
1219                     else:
1220                         # Return any other output as input and send task to
1221                         # end of queue
1222                         self._enqueue(task, input=output)
1223                     break

为了简化和清晰, 假设现在TaskManager中只有一个上述示例调用生成的task:

def foo():
    data = yield (multitask.recv(sock, 1024))    # *

生成的generator 'foo'. 下面将一步一步分析其在TaskManager中的变换和执行流程.

首先, 第一次执行_run_next_task()时, 将上述的generator 'foo' 从队列中取出:

1187         task, input, exc_info = self._queue.popleft()

其中, task为generator 'foo', input为初始化值None, exc_info为初始化值().

1190                 if exc_info:
1191                     output = task.throw(*exc_info)
1192                 else:
1193                     output = task.send(input)

实际将以output = task.send(None)的形式执行上述代码. 这就是PEP 342中提到的, 使 用None初次调用generator, 此generator 'foo' 返回multitask.recv(sock, 1024)并在 yield返回前挂起. 由上面的分析得知, multitask.recv()返回generator '_fdaction'.

假设在此过程中没有任何异常状况, 那么代码将会执行到:

1213                 if isinstance(output, types.GeneratorType):
1214                     task, input, exc_info = _ChildTask(task, output), None, ()
1215                 else:
1216                     if isinstance(output, YieldCondition):
1217                         output.task = task
1218                         output._handle(self)
1219                     else:
1220                         # Return any other output as input and send task to
1221                         # end of queue
1222                         self._enqueue(task, input=output)
1223                     break

显然, 代码1213的判断为真, 同时, output为generator '_fdaction'. 执行1214行:

1214                     task, input, exc_info = _ChildTask(task, output), None, ()

这里, 把generator 'foo'和generator '_fdaction' 父子化:

1029 class _ChildTask(object):
1030
1031     __slots__ = ('parent', 'send', 'throw')
1032
1033     def __init__(self, parent, task):
1034         self.parent = parent
1035         self.send = task.send
1036         self.throw = task.throw

两个generator通过_ChildTask确立了父子关系, 并且暂时合并成了一个task -- generator '_ChildTask'. 然后, 在1214行, 变量task, input, exc_info都被重新赋值.

接着再回到循环的开始:

1190                 if exc_info:
1191                     output = task.throw(*exc_info)
1192                 else:
1193                     output = task.send(input)

上述代码块, 最终以output = task.send(None)的形式调用. 记得上面提到的PEP 342么, 使用None值参数第一次调用一个generator, 这时这个generator已经变成了第一次循环生 成的generator '_ChildTask', 也即generator '_fdaction'本身. 回顾上面的代码, generator '_fdaction'会返回一个FDReady实例, 这个实例赋给了output.

接着第二次循环接着往下走.

FDReadyYieldCondition的子类, 并且它本身不是generator, 那么,

1215                 else:
1216                     if isinstance(output, YieldCondition):
1217                         output.task = task
1218                         output._handle(self)
1219                     else:
1220                         # Return any other output as input and send task to
1221                         # end of queue
1222                         self._enqueue(task, input=output)
1223                     break

哈, 1217和1218执行了. FDReady的task变量 (继参自YieldCondition)暂存generator '_fdaction'的包装generator '_ChildTask', 然后调用其_handle方法:

508     def _handle(self, tm):
509         self._waits[tm].add(self)
510         super(FDReady, self)._handle(tm)

FDReady的_handle方法将此FDReady实例注册到目前这个TaskManager对应的_FDSelector 中, 然后调用FDReady父类YieldCondition的_handle方法:

297     def _handle(self, tm):
298         if self.expiration is not None:
299             tm._add_timeout(self)

嗯, 超时处理, 咱上面说了, 超时什么的, 暂时不予接待的.

接着第二次循环往下走. break了. 那么此时, TaskManager此时的_queue空了. FDReady被 注册到了_FDSelector中, FDReady的task变量引用着generator '_fdaction'的包装 generator '_ChildTask', '_ChildTask'的parent成员又引用着最起始的generator 'foo'. 很完美!

是的, 熟悉异步IO的你可能在想该select()/poll()/epoll_wait()/kqueue()之类操作了 吧. 嗯, 该收获事件了.

再次回到run()函数的循环中, 现在_run_next_task()执行结束了, 下面到了1124行:

1124             MetaYieldCondition._handle_waits(self)

248     @staticmethod
249     def _handle_waits(tm, timeout=None):
250         for cls in MetaYieldCondition.__custom_wait_handlers:
251             if cls._has_waits(tm):
252                 cls._handle_waits(tm, tm._get_run_timeout(timeout))

还记得上面说的"YieldCondition有一个metaclass MetaYieldCondtion, 用来跟踪全局存 活的YieldCondtion类的继承类(FDReady, _QueueAction)"不? __custom_wait_handlers就 是这个用于跟踪的变量, 在上面所有假设的前提下, 这个变量中只有类FDReady, 那好了, 上面几行代码就相当于:

if FDReady._has_waits(tm):
    FDReady._handle_waits(tm, tm._get_run_timeout(timeout))

实现如下:

520     @classmethod
521     def _has_waits(cls, tm):
522         return bool(cls._waits[tm])
523
524     @classmethod
525     def _handle_waits(cls, tm, timeout):
526         if (timeout is None) or (timeout > 0.0):
527             cls._waits[tm].process(tm, timeout)

现在tm对象的_FDSelector刚刚加入了一个新的FDReady的fd, _has_waits为True. timeout 又被假设为None. cls._waits[tm].process(tm, timeout)就是在调用文章开头所分析的 _FDSelector的process函数了. 有始有终, 一切被衔接起来了.

等fd上有数据到达时(记得咱们的generator 'foo'是multitask.recv()的吧), FDReady的 _reenqueue()函数被调用:

516     def _reenqueue(self, tm, input=None, exc_info=()):
517         self._waits[tm].remove(self)
518         super(FDReady, self)._reenqueue(tm, input, exc_info)

FDReady先把自己从_FDSelector中解注(事件驱动, 你懂的), 然后,

304     def _reenqueue(self, tm, input=None, exc_info=()):
305         tm._enqueue(self.task, input, exc_info)
306         if self.expiration is not None:
307             tm._remove_timeout(self)

Yes! tm._enqueue(task, None, ()), FDReady的fd上有了读事件, 就又回到了TaskManager 的_queue中. 这里的task是FDReady的变量引用那个"generator '_fdaction'的包装generator '_ChildTask'".

稍安毋躁, 现在只是有了读事件, 真正的数据还在TCP/IP栈里呢, 必须得读出来啊.

直接跳到TaskManager的_run_next_task()中吧:

1190                 if exc_info:
1191                     output = task.throw(*exc_info)
1192                 else:
1193                     output = task.send(input)

task是那个"generator '_fdaction'的包装generator '_ChildTask'", input是None, 于是, send()调用后, _fdaction中代码开始从yield下一行执行, 开始正经读数据了:

583             raise StopIteration(func(*(args), **(kwargs)))

func引用sock.recv(). 读出的数据返回给上面的output. .........

还没完, 我们的终点是generator 'foo'. 但是我觉得上面的过程要是理解的话, 下面的过 程也应该不难了.

-- 通讯组件 Queue --

再议.

Look back

上面只在简化的情况下分析了例2的流程, 例1和例3, 你能理解了么?

其实, 我是花了很久, 才有上面肤浅的理解. 在读coroutine, metaclass包括multitask本 身的定义和相关代码时, 我脑子里一直在念叨"brain fuck"这个词. 以前没接触过 Functional Programming, 新的概念新的领域, 刚开始有点苦恼也是正常现象吧?!

                    --- Yeah, I smoke a lot, still on top.
Comments

不要轻轻地离开我,请留下点什么...

comments powered by Disqus

Published

Category

ProgLang

Tags

Contact