异步调用中的上下文控制Tornado stack context

邹业盛 2016-01-10 16:55 更新
  1. 面临的问题
  2. 尝试解决
  3. Python中的上下文对象及with语句
  4. Tornado中的stack_context相关内容
  5. 总结与比较

1. 面临的问题

这里说的"异步调用", 在形式上是指那种"调用函数"里, 需要传入一个"回调函数", 然后调用函数马上就返回. 在这种情况下, 回调函数的异常显然与调用函数的异常是不同的, 并且如果不作特殊处理, 回调函数的异常并不会影响调用函数的表现, 比如下例:

# -*- coding: utf-8 -*-

import tornado.ioloop
IL = tornado.ioloop.IOLoop.instance()


def callback():
    raise Exception, 'in callback'

def func():
    IL.add_callback(callback)

def out():
    try:
        func()
    except:
        print 'ok'

out()
IL.start()

上面的代码中, callback 抛出的异常并没有影响到 func 函数的表现, 于是我们看不到 ok 的输出.

但是在实践当中, 因为 callback 的调用, 间接地是由 func 造成的, 通常我们希望的一个结果是, callback 抛出的异常, 应该就像是 func 抛出的异常一样, 触发对应的异常处理, 输出 ok .

注意, 这里对异常的行为, 我们只说 func 抛出的, 或者说, 一个非 func 抛出的异常, 与一个就是由 func 抛出的异常, 它们会造成相同的结果, 这是本质的问题.

2. 尝试解决

为了达到" callback 抛出的异常, 就像是 func 抛出的一样 " , 这个目的, 我们最直观地会想到这样处理代码:

def callback():
    try:
        raise Exception, 'in callback'
    except:
        print 'ok'

或者这样:

def like_other(func):
    def wrapper(*args, **karg):
        try:
            func(*args, **kargs)
        except:
            print 'ok'
    return wrapper

IL.add_callback(like_other(callback))

好吧, 从现象上我不能说这样做不对, 但是逻辑上, 这种做法只能说是"碰巧"一样了. 顺着这个思路, 我们很容易想到这样的"正确"代码:

def callback():
    raise Exception, 'in callback'

def func():
    IL.add_callback(lambda: env(callback))

def env(func):
    try:
        func()
    except:
        print 'ok'

def out():
    env(func)

out()
IL.start()

我们把 funccallback 这两个函数都埋到同样的坑里, 这样它们抛出的异常就可以保证有着同样的表现.

这样做当然没有问题, 除了代码逻辑上的一个缺陷 -- func 需要知道它的调用者, out 中的细节, 即 func 要知道 out 在调用它时挖了一个叫 env 的坑. 这个问题就大了, 而且很显然地, 调用 func 的地方可能有许多, 除了 env 它们还使用着各种不同的坑. 所以在 func 中写死了 env 这点必须得到改进. 比如引入一个中间的容器:

def callback():
    raise Exception, 'in callback'

HOLE = {}

def func():
    IL.add_callback(lambda: HOLE[''](callback))


def env(func):
    try:
        func()
    except:
        print 'ok'

def out():
    HOLE.setdefault('', env)(func)

out()
IL.start()

至此, 理想状态下问题似乎得到解决了. 但是实践当中, 情况则会复杂得多, 有不同的 out , 不同的 env , 但是基本的处理方式是没有变的, 我们把 env 抽象成函数执行的"上下文", 把 HOLE 抽象成一个全局的上下文管理容器, 两者配合, 就可以实现出更通用的结构.

在了解 Tornado 中具体的代码之前, 需要先了解 Python 中的 Context Manager 上下文对象, 及与之相关的 with 语句的一些基本知识.

3. Python中的上下文对象及with语句

定义一个可以被 with 使用的对象, 只需要实现 __enter____exit__ 这两个方法就可以了的:

class Context(object):

    def __enter__(self):
        return 123

    def __exit__(self, type, value, traceback):
        return True


with Context() as i:
    print i
    raise Exception, 1

__exit__ 方法中接收的参数就是异常的一些信息, 可以从中判断是否抛出了异常. 最后如果返回是 True , 则忽略异常, 否则会将异常抛出.

另外, Python 有一个 contextlib 的官方模块, 提供了一些方法, 以便更容易使用上下文对象, 比如一个把生成器转换成上下文对象的装饰器:

import contextlib

@contextlib.contextmanager
def process():
    try:
        yield 123
    except:
        pass

with process() as i:
    print i
    raise Exception, 2

生成器中的 yield 出来的值, 就是 with 语句的接收值, yield 语句的表现就是 with 结构中语句的表现.

4. Tornado中的stack_context相关内容

Tornado 中, 我觉得 stack_context.py 是一个同 gen.py 一样让人很难一下看明白的东西. 为了简单, 我们从 1.0.11.1.0 这两个比较旧的版本着手, 因为 stack_context 的机制是从 1.1.0 开始加入的, 那时的 stack_context.py 内容比现在的少多了.

看下面的代码:

# -*- coding: utf-8 -*-

import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web

IL = tornado.ioloop.IOLoop.instance()

from tornado.options import define, options

define("port", default=8888, help="run on the given port", type=int)


class MainHandler(tornado.web.RequestHandler):

    @tornado.web.asynchronous
    def get(self):
        IL.add_callback(self.callback)

    def callback(self):
        raise Exception
        self.finish("Hello, world")


def main():
    tornado.options.parse_command_line()
    application = tornado.web.Application([
        (r"/", MainHandler),
    ])
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()


if __name__ == "__main__":
    main()

最简单的一个应用, 在两个不同版本的 Tornado 下, 实际运行访问当中, 有没有 stack_context 结果区别很明显, 有 stack_context , 则得到 500 的响应, 否则一直没有响应.

实现的细节当中, 有两部分内容, 一是 web.py 中, 在执行具体的业务函数的 _execute 函数里, 挖了一个上下文管理的坑:

def _execute(self, transforms, *args, **kwargs):
    """Executes this request with the given output transforms."""
    self._transforms = transforms
    with stack_context.StackContext(self._stack_context):
        ... ...

同时在 ioloop.py 中, add_callback 也对传入的回调函数作了上下文有关的包装:

def add_callback(self, callback):
    """Calls the given callback on the next I/O loop iteration."""
    self._callbacks.add(stack_context.wrap(callback))
    self._wake()

就像前面说的, 为了通用, Tornado 在这些上下文的管理上可能做得比较巧妙, 在之后的版本当中, 这部分也可能变得比较复杂, 但是本质上的形式, 与最开始的那个理想化的例子是一致的.

这里的 statck_context.StackContext 对应于 HOLE , 是一个上下文的管理容器(准确地说它只是一个操作接口, 容器是 _state.contexts ). 而 statck_context.wrap 则是一个类似于调度器的东西, 作用是给出正确的上下文环境, 理解它的关键是搞清楚"定义时状态"与"执行时状态"的关系与区别.

接下来分块说一下 Tornado 中 stack_context.py 的源码, 以 1.1.0 版本为例.

class _State(threading.local):
    def __init__(self):
        self.contexts = ()
_state = _State()

这里构造了一个线程安全的变量, 在 Tornado 运行时当中, 它就是全局量了, 所有上下文都会保存在里面. 在后面我们还会看到, 这部分的应用注意两点:

接下来看 StackContext :

@contextlib.contextmanager
def StackContext(context_factory):
    old_contexts = _state.contexts
    try:
        _state.contexts = old_contexts + (context_factory,)
        with context_factory():
            yield
    finally:
        _state.contexts = old_contexts

它是一个接收上下文对象的一个上下文对象, 可能后面版本的不使用装饰器写法更好看一些:

class StackContext(object):

    def __init__(self, context_factory):
        self.context_factory = context_factory

    def __enter__(self):
        self.old_contexts = _state.contexts
        _state.contexts = old_contexts + (context_factory,)

        try:
            self.context = self.context_factory()
            self.context.__enter__()
        except Exception:
            _state.contexts = self.old_contexts
            raise

    def __exit__(self, type, value, traceback):
        try:
            return self.context.__exit__(type, value, traceback)
        finally:
            _state.contexts = self.old_contexts

简单来说, 一个 StackContext 实例的作用, 就是包装一个上下文对象, 代码在具体执行时, 还是使用原来的 context_factory , 但是传入的这个上下文对象会被保存到全局的 _state.contexts 当中.

单看这部分没有什么作用, 把上下文暂存起来的目的, 是为了在之后的 wrap 函数当中的, 可以给出当前的, 定义时的状态信息.

def wrap(fn):
    def wrapped(callback, contexts, *args, **kwargs):
        pairs = itertools.izip(itertools.chain(_state.contexts,
                                               itertools.repeat(None)),
                               contexts)
        new_contexts = []
        for old, new in itertools.dropwhile(lambda x: x[0] is x[1], pairs):
            new_contexts.append(StackContext(new))
        if new_contexts:
            with contextlib.nested(*new_contexts):
                callback(*args, **kwargs)
        else:
            callback(*args, **kwargs)

    if getattr(fn, 'stack_context_wrapped', False):
        return fn

    contexts = _state.contexts
    result = functools.partial(wrapped, fn, contexts)
    result.stack_context_wrapped = True
    return result

wrap 是一个函数装饰, 在定义时, 把当前的 _state.contexts 保存了下来, 而且参照之前的分析, 当前的 _state.contexts 可能是被 StackContext 替换过的(因为 tuple 不可变, 所以这里只能是"替换"). 之后, 在真正执行时, 又取一次 _state.context , 那这时, 定义时的上下文, 与执行时的上下文, 之间就可能存在差异, 这部分差异, 就是定义时有, 而执行时已经脱离的那些上下文, 脱离的这些上下文就是我们需要的 -- 它可以让一些函数的错误表现, 像另一些函数一样. 最后, 我们让回调函数在这些脱离了的上下文中执行(没脱离的上下文仍然有作用), 结果就是我们想要的, 相同的上下文继续作用于回调函数, 某个回调函数抛出了错误, 这些错误都可以按相同的方式处理.

结合 StackContextwrap 来看整个流程, 就是 StackContext 负责在某个地方挖个坑, 在其作用域范围内, 所有的回调函数如果 wrap 了的话, 就都会在相同的坑里执行.

如果你希望某些回调函数不受之前挖的坑的影响, 有一个临时的 NullContext 工具上下文, 它会动 _state.context , 把它清空. 这里有一点可以思考, 为什么不是去动 wrap , 而在 _state.context 上动手脚? 因为 wrap 是硬编码在 add_callback 这类函数当中的, 但是它们都受 _state.context 的影响, 所以换个思维去动 _state.context 就可以达到目的.

5. 总结与比较

可以看到, Tornado 用一种可以说很取巧的方法, 解决了回调函数的异常表现问题.

里面涉及了:

比较我要说的是 nodejs 中的机制. 在官方的 API 中, Domain 是做这块上下文管理的模块.

先回顾一下在简单的状态下我们碰到的问题:

def get(self):
    IL.add_callback(self.callback)

self.callback 的异常给我们造成了麻烦, 为了不让这个异常在全局蔓延开, 我们会很直接地想到在 self.callback 的执行上下文中把它给 try 起来, 就是在 IL 中对所有的回调函数都 try 一下, 这样只能说是单纯地拿到了异常, 但是在 IL 中并没有足够的信息, 让我们可以正确地处理这个异常(比如说返回 500 ). 所以, 这种简单直接的暴力大部分情况是没有用的.

反观 nodejs , 这个问题似乎简单得多了, 它的上下文的概念, 就是一个对象, 应用事件模式, 通过 on 定义事件回调, 通过 bind 调度全局的事件回调函数用以处理异常.

在 nodejs 中, 使用:

process.on('uncaughtException', uncaughtHandler);

就可以直接拿到所有未捕获的异常. domain.bind 的执行会有一个注册行为, 后面执行 bind 中的函数时就设置上当前的 domain , 这时如果有异常被 processuncaughtException 拿到, 就可以被当前的 domain.on('error', callback) 处理.

var d = require('domain').create();
var d2 = require('domain').create();

d.on('error', function(error){
  console.error('error', error);
});

d2.on('error', function(error){
  console.error('error2', error);
});

d.bind(function(){
  setTimeout(function(){
    throw 'I';
  }, 3000);
})();

d2.bind(function(){
  setTimeout(function(){
    throw 'II';
  }, 1000);
})();

现在剩下的一个问题就是, 当你拿到一个异常时, 如果获取到正确的"当前 domain ", 特别是 setTimeout 中的回调函数, 如何知道它对应的 domain 呢? 这个问题我看 domain.js 的代码看了很久, 认为它是不可能的, 后来我到网上去找, 发现, timer.js 中实现的 setTimeout 有处理 domain 的东西, 我去......

至此, 比较 Tornado 和 nodejs 的 Domain , 其实它们本质上的使用行为是一致的, 一是都先要声明一个上下文的东西, 让这个东西在全局范围内被管理起来. 二是之后在具体的函数调用时, 声明使用一个上下文的调度行为.

同时, 它们在实现上的行为与首手点却不相同. nodejs 使用全局的 process 拿到未捕获的异常, 在各个环节小心地控制 process.domain , 通过事件模型实现对异常的统一处理. 从外部看来清晰简单, 但是事实上 setTimeout 这种东西它里面隐藏了太多的细节.

而 Tornado 并没有选择在 ioloop 层面对所有的 callback 做异常处理, 而是把这一个关键的集中点单独做到 stack_context.py 中, 并且以 Python 特殊的"上下文"对象这个更高层面的抽象来定义这个问题--异常处理只是上下文应用的一部分, 在直接结果上, 也只存在一个统一的 stack 来调度上下文就实现了嵌套等逻辑. 而 nodejs 中不光有 active 的概念, 也同样有 stack 的概念.

Tornado 的实现我觉得很巧妙, 但是要弄明白确实要花些时间. nodejs 的 Domain 实现很直观, 只关注异常处理, 整个流程统一于事件模型, 但是需要低层的一些东西来支持.

另外, 它们还有一点不同, Tornado 的上下文管理是一个必选项, 而 nodejs 的 Domain 功能则是可选的. 这点上的不同, 全造成如果 Tornado 按 nodejs 那种方式去解决问题, 会显得很别扭.

评论
©2010-2016 zouyesheng.com All rights reserved. Powered by GitHub , txt2tags , MathJax