博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
tornado 源码分析 之 异步io的实现方式
阅读量:6832 次
发布时间:2019-06-26

本文共 20415 字,大约阅读时间需要 68 分钟。

前言

本文将尝试详细的带大家一步步走完一个异步操作,从而了解tornado是如何实现异步io的.其实本文是对[上一篇文][1]的实践和复习主旨在于关注异步io的实现,所以会忽略掉代码中的一些异常处理.文字较多,凑合下吧接下来只会贴出部分源码,帮助理解,希望有耐心的同学打开tornado源码,一起跟踪一遍吧.

AsyncHTTPClient :

AsyncHTTPClient 继承 Configurable ,从__new__重看出是单例模式.  根据 Configurable 的__new__和 AsyncHTTPClient 的 configurable_base 和 configurable_default 得知,实例化后一定是 SimpleAsyncHTTPClient 的实例

fetch

def fetch(self, request, callback=None, raise_error=True, **kwargs):        if self._closed:            raise RuntimeError("fetch() called on closed AsyncHTTPClient")        if not isinstance(request, HTTPRequest):            request = HTTPRequest(url=request, **kwargs)        # We may modify this (to add Host, Accept-Encoding, etc),        # so make sure we don't modify the caller's object.  This is also        # where normal dicts get converted to HTTPHeaders objects.        request.headers = httputil.HTTPHeaders(request.headers)        request = _RequestProxy(request, self.defaults)        future = TracebackFuture()        if callback is not None:            callback = stack_context.wrap(callback)            def handle_future(future):                exc = future.exception()                if isinstance(exc, HTTPError) and exc.response is not None:                    response = exc.response                elif exc is not None:                    response = HTTPResponse(                        request, 599, error=exc,                        request_time=time.time() - request.start_time)                else:                    response = future.result()                self.io_loop.add_callback(callback, response)            future.add_done_callback(handle_future)        ##fetch_impl带上handle_response,重点        def handle_response(response):            if raise_error and response.error:                future.set_exception(response.error)            else:                future.set_result(response)        self.fetch_impl(request, handle_response)        return future
fetch 中调用 fetch_impl,fetch_impl 中其中一个参数是 callback ,而代码中的 callback 包含了 future 的 set_result ,所以,当 callback 被调用时,外部的 yield 操作将被激活,程序会在 ioloop 中调用此 callback ,然后回到原函数的 yield 处,并且原函数返回此次 qeust 的 future 对象,以便在函数外部增加别的 callback

fetch_impl

def _connection_class(self):        return _HTTPConnectiondef _handle_request(self, request, release_callback, final_callback):        self._connection_class()(            self.io_loop, self, request, release_callback,            final_callback, self.max_buffer_size, self.tcp_client,            self.max_header_size, self.max_body_size)
在 return 之前,继续查看 fetch_impl 内部是如何处理,根据推测,他一定是将继续处理网络请求,肯定会将网络请求交由 ioloop 的 epoll 部分处理,设定好处理的 hanlder 再返回 future.set_result ,接下来继续分析 fetch_impl 内部是如果设置网络请求的. fetch_impl 的实现代码中查看,实例化中创建了 tcpclient 对象,这个肯定是关键根据之前的分析 SimpleAsyncHTTPClient 是单例模式,那他怎么处理各种 http 请求呢?查看代码得知,他将请求的 request 和 callback 存储在 self.queue 中,每次 fetch_impl 的时候,一个个 pop 出来处理就好了,这样就能处理n个请求了一步步跟踪到 _handle_request ,发现最后到了 _HTTPConnection 的实例化中了.实例化的参数有之前那个包含 future 的 callback .这样就可以保证 yield 操作可以回到原处了.好了,继续走

_HTTPConnection

class _HTTPConnection(httputil.HTTPMessageDelegate):    _SUPPORTED_METHODS = set(["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])    def __init__(self, io_loop, client, request, release_callback,                 final_callback, max_buffer_size, tcp_client,                 max_header_size, max_body_size):        self.start_time = io_loop.time()        self.io_loop = io_loop        self.client = client        self.request = request        self.release_callback = release_callback        self.final_callback = final_callback        self.max_buffer_size = max_buffer_size        self.tcp_client = tcp_client        self.max_header_size = max_header_size        self.max_body_size = max_body_size        self.code = None        self.headers = None        self.chunks = []        self._decompressor = None        # Timeout handle returned by IOLoop.add_timeout        self._timeout = None        self._sockaddr = None        with stack_context.ExceptionStackContext(self._handle_exception):            self.parsed = urlparse.urlsplit(_unicode(self.request.url))            if self.parsed.scheme not in ("http", "https"):                raise ValueError("Unsupported url scheme: %s" %                                 self.request.url)            # urlsplit results have hostname and port results, but they            # didn't support ipv6 literals until python 2.7.            netloc = self.parsed.netloc            if "@" in netloc:                userpass, _, netloc = netloc.rpartition("@")            host, port = httputil.split_host_and_port(netloc)            if port is None:                port = 443 if self.parsed.scheme == "https" else 80            if re.match(r'^\[.*\]$', host):                # raw ipv6 addresses in urls are enclosed in brackets                host = host[1:-1]            self.parsed_hostname = host  # save final host for _on_connect            if request.allow_ipv6 is False:                af = socket.AF_INET            else:                af = socket.AF_UNSPEC            ssl_options = self._get_ssl_options(self.parsed.scheme)            timeout = min(self.request.connect_timeout, self.request.request_timeout)            if timeout:                self._timeout = self.io_loop.add_timeout(                    self.start_time + timeout,                    stack_context.wrap(self._on_timeout))            self.tcp_client.connect(host, port, af=af,                                    ssl_options=ssl_options,                                    max_buffer_size=self.max_buffer_size,                                    callback=self._on_connect)
_HTTPConnection 的实例化中有一堆成员变量,有点晕,先不管这么多,关注我们的 callback ,和 tcpclient .一行行往下看,是 host 和 port 的初始化操作 ,http 和 https 是不一样的嘛,当然得处理一下,终于到了最后,是 tcpclient.connect ,从 connect 的参数中看到 callback=self._on_connect ,应该是个重要的方法,出去那些字符串处理,发现 self.connection.write_headers(start_line ,  self.request.headers ) ,这应该是发送 http 头的操作吧,是网络请求,所以这是处理 connect 这个 url 后,发送 http 头的操作.还是回头看看是如何 connect 的吧,因为这是异步的关键,搞懂了这个,那剩下来的也是同出一则

TCPClient

转到 tcpclient 的代码去看他的实例化和 connect 操作,看来剩下的路还很长呢 TCPClient 实例化的代码很短,有个 resolver 对象,先不管

connect

@gen.coroutine    def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None,                max_buffer_size=None):        """Connect to the given host and port.        Asynchronously returns an `.IOStream` (or `.SSLIOStream` if        ``ssl_options`` is not None).        """        addrinfo = yield self.resolver.resolve(host, port, af)        connector = _Connector(            addrinfo, self.io_loop,            functools.partial(self._create_stream, max_buffer_size))        af, addr, stream = yield connector.start()        # TODO: For better performance we could cache the (af, addr)        # information here and re-use it on subsequent connections to        # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2)        if ssl_options is not None:            stream = yield stream.start_tls(False, ssl_options=ssl_options,                                            server_hostname=host)        raise gen.Return(stream)
去到 connect 方法里,发现 coroutine 装饰器,并且调用时设置了 callback=self._on_connect ,所以当这个 coroutine 的 future 被解决时,会调用 self._on_connect ,你也可以看到 _on_connect 的参数是 stream ,就是 gen.Return(stream )传过去的. 因为 gen.coroutine 实现时的代码中, send 了 value 后,代码继续走,走到 gen.Return (其实这是个 exception ,就会走到 gen.coroutine 里的 set_result 了.)第一个 yield  右边是 self.resolver.resolve ,左边是 addrinfo ,是地址信息,这个异步操作处理的便是解析 url 的地址信息.此处 tornado 默认使用了阻塞的实现,暂时先不看,以后在新的篇幅补充,主要内容是 run_on_executor 装饰器的内容,此处其实是同步返回的,因为默认用的是 BlockingResolver 的代码,直接看下一个 yield

_Connector

def __init__(self, addrinfo, io_loop, connect):        self.io_loop = io_loop        self.connect = connect        self.future = Future()        self.timeout = None        self.last_error = None        self.remaining = len(addrinfo)        self.primary_addrs, self.secondary_addrs = self.split(addrinfo)
_Connector 实例化,参数有一个 callback ,是本类的 _create_stream ,并把 self.connect 设置成传过来的 callback 所以 self.connect 就是 TCPClient._create_stream 了,成员变量有个 future 实例,我们需要全程高度关注 future 和 callback .实例化后调用了 start 方法 ,start 内部,调用 try_connect,set_timout ,根据函数名得知,是 connect 操作和设置超时的操作.最后返回实例化时创建的 future .

try_connect

def start(self, timeout=_INITIAL_CONNECT_TIMEOUT):        self.try_connect(iter(self.primary_addrs))        self.set_timout(timeout)        return self.future    def try_connect(self, addrs):        try:            af, addr = next(addrs)        except StopIteration:            # We've reached the end of our queue, but the other queue            # might still be working.  Send a final error on the future            # only when both queues are finished.            if self.remaining == 0 and not self.future.done():                self.future.set_exception(self.last_error or                                          IOError("connection failed"))            return        future = self.connect(af, addr)        future.add_done_callback(functools.partial(self.on_connect_done,                                                   addrs, af, addr))
future  =  self.connect(af ,  addr ),执行了 TCPClient._create_stream 方法,返回 future ,并且设置 future 的 callback=on_connect_done

_create_stream

def _create_stream(self, max_buffer_size, af, addr):        # Always connect in plaintext; we'll convert to ssl if necessary        # after one connection has completed.        stream = IOStream(socket.socket(af),                          io_loop=self.io_loop,                          max_buffer_size=max_buffer_size)        return stream.connect(addr)
实例化 IOStream ,执行并返回 stream.connect,stream.connect 返回的 future 便是 try_connect 中的 future ,所以,进去看看 stream.connect 内部是怎么”解决”这个 future 的.

IOStream

connect

def connect(self, address, callback=None, server_hostname=None):        self._connecting = True        if callback is not None:            self._connect_callback = stack_context.wrap(callback)            future = None        else:            future = self._connect_future = TracebackFuture()        try:            self.socket.connect(address)        except socket.error as e:            if (errno_from_exception(e) not in _ERRNO_INPROGRESS and                    errno_from_exception(e) not in _ERRNO_WOULDBLOCK):                if future is None:                    gen_log.warning("Connect error on fd %s: %s",                                    self.socket.fileno(), e)                self.close(exc_info=True)                return future        self._add_io_state(self.io_loop.WRITE)        return future
self._connecting  =  True  设置此实例正在连接中,连接完毕设置成 false 如果没有 callback 传入,生成 future 对象, 刚才返回的 future 记录在这个实例的成员变量 self._connect_future 中.然后执行 socket 的 connect 操作,因为 socket 设置成非阻塞,所以此处会立即返回,不会阻塞,当连接成功时,缓冲区可写,失败时缓冲区可读可写.这是基础知识,详情百度.然后调用 self._add_io_state ,返回 future

_add_io_state

def _add_io_state(self, state):        if self.closed():            # connection has been closed, so there can be no future events            return        if self._state is None:            self._state = ioloop.IOLoop.ERROR | state            with stack_context.NullContext():                self.io_loop.add_handler(                    self.fileno(), self._handle_events, self._state)        elif not self._state & state:            self._state = self._state | state            self.io_loop.update_handler(self.fileno(), self._state)
终于到了这一步,要用 epoll 了!!!根据实例化的代码得知 self._state=None ,会走 self.io_loop.add_handler 这步,根据我之前发的[文章][2],会将当前的 fd ,当前实例的 _handle_events ,和写,错误操作注册到 epoll 中接着!!!!!终于走完了这个 yield 的流程了!!!!!!

小总结:

请一定弄清 future 是怎么传递的,每个 future 管理的 callback 是什么操作.  _HTTPConnection  中  tcpclient.connect 一个 future   ,callback=self._on_connect  . 他将在 raise   gen.Return(stream )时被添加到 ioloop 执行.   tcpclient.connect 内部的  connector.start 一个 future ,   callback 是 on_connect_done ,他将在 poll 检测到 write 事件时,被添加到 ioloop 执行

ioloop

def start(self):        if self._running:            raise RuntimeError("IOLoop is already running")        self._setup_logging()        if self._stopped:            self._stopped = False            return        old_current = getattr(IOLoop._current, "instance", None)        IOLoop._current.instance = self        self._thread_ident = thread.get_ident()        self._running = True        old_wakeup_fd = None        if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':            try:                old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())                if old_wakeup_fd != -1:                    signal.set_wakeup_fd(old_wakeup_fd)                    old_wakeup_fd = None            except ValueError:                old_wakeup_fd = None        try:            while True:                with self._callback_lock:                    callbacks = self._callbacks                    self._callbacks = []                due_timeouts = []                if self._timeouts:                    now = self.time()                    while self._timeouts:                        if self._timeouts[0].callback is None:                            heapq.heappop(self._timeouts)                            self._cancellations -= 1                        elif self._timeouts[0].deadline <= now:                            due_timeouts.append(heapq.heappop(self._timeouts))                        else:                            break                    if (self._cancellations > 512                            and self._cancellations > (len(self._timeouts) >> 1)):                        self._cancellations = 0                        self._timeouts = [x for x in self._timeouts                                          if x.callback is not None]                        heapq.heapify(self._timeouts)                for callback in callbacks:                    self._run_callback(callback)                for timeout in due_timeouts:                    if timeout.callback is not None:                        self._run_callback(timeout.callback)                callbacks = callback = due_timeouts = timeout = None                if self._callbacks:                    poll_timeout = 0.0                elif self._timeouts:                    poll_timeout = self._timeouts[0].deadline - self.time()                    poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))                else:                    poll_timeout = _POLL_TIMEOUT                if not self._running:                    break                if self._blocking_signal_threshold is not None:                    signal.setitimer(signal.ITIMER_REAL, 0, 0)                try:                    event_pairs = self._impl.poll(poll_timeout)                except Exception as e:                    if errno_from_exception(e) == errno.EINTR:                        continue                    else:                        raise                if self._blocking_signal_threshold is not None:                    signal.setitimer(signal.ITIMER_REAL,                                     self._blocking_signal_threshold, 0)                self._events.update(event_pairs)                while self._events:                    fd, events = self._events.popitem()                    try:                        fd_obj, handler_func = self._handlers[fd]                        handler_func(fd_obj, events)                    except (OSError, IOError) as e:                        if errno_from_exception(e) == errno.EPIPE:                            # Happens when the client closes the connection                            pass                        else:                            self.handle_callback_exception(self._handlers.get(fd))                    except Exception:                        self.handle_callback_exception(self._handlers.get(fd))                fd_obj = handler_func = None        finally:            # reset the stopped flag so another start/stop pair can be issued            self._stopped = False            if self._blocking_signal_threshold is not None:                signal.setitimer(signal.ITIMER_REAL, 0, 0)            IOLoop._current.instance = old_current            if old_wakeup_fd is not None:                signal.set_wakeup_fd(old_wakeup_fd)
接下来 tornado 终于也回到了 ioloop 代码中了(泪奔)!!当连接成功时,该 fd 的缓冲区可写, epoll 收到 fd 的 write 操作通知~进入到了 epoll 的 loop 中处理.然后!回到刚才注册的 _handle_events 了!注意这个 _handle_events 是 IOStream 的实例里的 _handle_events ,他有刚才我们处理的所有信息哦~接下来看 _handle_events 的代码,看他如果解决掉 future

IOStream._handle_events

def _handle_events(self, fd, events):        if self.closed():            gen_log.warning("Got events for closed stream %s", fd)            return        try:            if self._connecting:                # Most IOLoops will report a write failed connect                # with the WRITE event, but SelectIOLoop reports a                # READ as well so we must check for connecting before                # either.                self._handle_connect()            if self.closed():                return            if events & self.io_loop.READ:                self._handle_read()            if self.closed():                return            if events & self.io_loop.WRITE:                self._handle_write()            if self.closed():                return            if events & self.io_loop.ERROR:                self.error = self.get_fd_error()                # We may have queued up a user callback in _handle_read or                # _handle_write, so don't close the IOStream until those                # callbacks have had a chance to run.                self.io_loop.add_callback(self.close)                return            state = self.io_loop.ERROR            if self.reading():                state |= self.io_loop.READ            if self.writing():                state |= self.io_loop.WRITE            if state == self.io_loop.ERROR and self._read_buffer_size == 0:                # If the connection is idle, listen for reads too so                # we can tell if the connection is closed.  If there is                # data in the read buffer we won't run the close callback                # yet anyway, so we don't need to listen in this case.                state |= self.io_loop.READ            if state != self._state:                assert self._state is not None, \                    "shouldn't happen: _handle_events without self._state"                self._state = state                self.io_loop.update_handler(self.fileno(), self._state)        except UnsatisfiableReadError as e:            gen_log.info("Unsatisfiable read, closing connection: %s" % e)            self.close(exc_info=True)        except Exception:            gen_log.error("Uncaught exception, closing connection.",                          exc_info=True)            self.close(exc_info=True)            raise    def _handle_connect(self):        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)        if err != 0:            self.error = socket.error(err, os.strerror(err))            # IOLoop implementations may vary: some of them return            # an error state before the socket becomes writable, so            # in that case a connection failure would be handled by the            # error path in _handle_events instead of here.            if self._connect_future is None:                gen_log.warning("Connect error on fd %s: %s",                                self.socket.fileno(), errno.errorcode[err])            self.close()            return        if self._connect_callback is not None:            callback = self._connect_callback            self._connect_callback = None            self._run_callback(callback)        if self._connect_future is not None:            future = self._connect_future            self._connect_future = None            future.set_result(self)        self._connecting = False
判断是否在连接中,当然是了,刚才我也强调过了,然后进入 _handle_connect,_handle_connect 先判断 connect 有没成功,成功了就是设置 _connect_future 的 result,set_result(self ),把 self(iostream )设置进去了!   然后 _connect_future 的 callbacks 会在下一次循环被 ioloop 消化掉!!一步步返回看,这个 future 正是我们之前的那个 yiled 操作的右边的返回的 future ,所以刚才 _Connector.try_connect 设置的 callback   ,on_connect_done 会在 ioloop 的 callback 里执行. 根据上一篇[文章][3]讲的 coroutine 的源码得知,此 future 里还有 Runner.run 的 callback 哦~所以 ,run 里 send 了 vaule 到 gen . 终于终于!!程序回到了刚才的 yield 处了!!!!!tornado正是如此实现异步io的感觉一直讲完整个操作不太现实,剩下的大家还是自己跟踪吧,道理跟这个流程类似. yield 操作右边,一定是返回一个 future (旧版本貌似是 YieldPoint ,因为没看过旧版,所以不太清楚) ,然后在返回 future 之前,设置好 fd 的 handler ,和其他的解析工作,然后等待 epoll 检测到关心的 io   event ,在 io 的 handler 里把 future 解决,从而回到 yield 处~ 核心就是 ioloop 三部分 ,future,gen.coroutine . 相互配合完成异步操作. 跟踪几遍消化一下,就可以写 tornado 的扩展了. 祝大家武运亨通

转载地址:http://jmnkl.baihongyu.com/

你可能感兴趣的文章
开始带队开发
查看>>
myeclipse中java web项目修改内容,不能自动发布到tomcat的问题
查看>>
如何准备BAT技术面试答案(下)——Java研发方向
查看>>
2.C#基本语法和变量
查看>>
SQL笔记 --- 表完整性
查看>>
RBAC权限系统设计之我见
查看>>
玩转redis之redis 分片集群方案与实现
查看>>
如何让Redis Server运行更稳定
查看>>
weak_ptr 的 operator== 操作问题
查看>>
IOS写文件
查看>>
mysql小数数据类型
查看>>
js判断input输入保留正整数和两位小数实现方法
查看>>
redisson学习示例
查看>>
升级到 PHP 7.0
查看>>
error lnk2019 无法解析的外部符号
查看>>
ITSM--IT服务管理注意细则
查看>>
java list去重
查看>>
android中网络操作使用总结(http)
查看>>
JAVA中使用代码创建多数据源,并实现动态切换(一)
查看>>
理解Scroll View
查看>>