sanic

sanic

app.run:

def run(
    self,
    host=None,
    port=None,
    debug=False,
    ssl=None,
    sock=None,
    workers=1,
    protocol=None,
    backlog=100,
    stop_event=None,
    register_sys_signals=True,
    access_log=True,
    **kwargs
):
    try:
        self.is_running = True
        if workers == 1:
            if (
                auto_reload
                and os.environ.get("SANIC_SERVER_RUNNING") != "true"
            ):
                reloader_helpers.watchdog(2)
            else:
                serve(**server_settings)
        else:
            serve_multiple(server_settings, workers)
    except BaseException:
        error_logger.exception(
            "Experienced exception while trying to serve"
        )
        raise
    finally:
        self.is_running = False
    logger.info("Server Stopped")

sanic的启动函数除了配置相关的处理,主要是调用了server.serve_multiple函数来启动多个进程来接收服务,serve_multiple的实现如下:

server.serve_mutiple

def serve_multiple(server_settings, workers):
   if server_settings.get("sock") is None:
        sock = socket()
        sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        sock.bind((server_settings["host"], server_settings["port"]))
        sock.set_inheritable(True)
        server_settings["sock"] = sock
        server_settings["host"] = None
        server_settings["port"] = None

    def sig_handler(signal, frame):
        logger.info("Received signal %s. Shutting down.", Signals(signal).name)
        for process in processes:
            os.kill(process.pid, SIGTERM)

    signal_func(SIGINT, lambda s, f: sig_handler(s, f))
    signal_func(SIGTERM, lambda s, f: sig_handler(s, f))

    processes = []

    for _ in range(workers):
        process = Process(target=serve, kwargs=server_settings)
        process.daemon = True
        process.start()
        processes.append(process)

    for process in processes:
        process.join()

    # the above processes will block this until they're stopped
    for process in processes:
        process.terminate()
    server_settings.get("sock").close()

serve_mutltiple主要是创建socket并绑定端口,然后根据参数中worker数量启动对应数量的进程,每个进程执行serve,serve的代码如下:
server.serve

def serve():
    if not run_async:
        # create new event_loop after fork
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

    connections = connections if connections is not None else set()
    server = partial(
        protocol,
        loop=loop,
        connections=connections,
        signal=signal,
        request_handler=request_handler,
        error_handler=error_handler,
        request_timeout=request_timeout,
        response_timeout=response_timeout,
        keep_alive_timeout=keep_alive_timeout,
        request_max_size=request_max_size,
        request_class=request_class,
        access_log=access_log,
        keep_alive=keep_alive,
        is_request_stream=is_request_stream,
        router=router,
        websocket_max_size=websocket_max_size,
        websocket_max_queue=websocket_max_queue,
        websocket_read_limit=websocket_read_limit,
        websocket_write_limit=websocket_write_limit,
        state=state,
        debug=debug,
    )

    server_coroutine = loop.create_server(
        server,
        host,
        port,
        ssl=ssl,
        reuse_port=reuse_port,
        sock=sock,
        backlog=backlog,
    )

    try:
        http_server = loop.run_until_complete(server_coroutine)
    except BaseException:
        logger.exception("Unable to start server")
        return

    trigger_events(after_start, loop)

    pid = os.getpid()
    try:
        logger.info("Starting worker [%s]", pid)
        loop.run_forever()
    finally:
        logger.info("Stopping worker [%s]", pid)

        # Run the on_stop function if provided
        trigger_events(before_stop, loop)

        # Wait for event loop to finish and all connections to drain
        http_server.close()

server.serve主要通过asyncio库loop.create_server实现,这时一个针对四层的实现,具体http处理的实现需要用户实现,这个方法需要传入一个处理连接的类,这个类需要实现几个方法:connection_made,connection_lost, pause_writing, resume_writing, data_receved,sanic的http处理定义在server.HttpProtocol中,connection_made将传入的transport对象保存到对象的成员中,方便后续处理,同时启动request_timeout任务,即定时判断请求是否超时了。

   def connection_made(self, transport):
        self.connections.add(self)
        self._request_timeout_handler = self.loop.call_later(
            self.request_timeout, self.request_timeout_callback
        )
        self.transport = transport
        self._last_request_time = current_time

sanic的data_received是借助于httptools库的HttpRequestParser来处理的,

def data_received(self, data):
    ...
    if self.parser is None:
        assert self.request is None
        self.headers = []
        self.parser = HttpRequestParser(self)
    ...        
    try:
        self.parser.feed_data(data)
    except HttpParserError:
        message = "Bad Request"
        if self._debug:
            message += "\n" + traceback.format_exc()
        self.write_error(InvalidUsage(message))

可以通过提供on_headers,on_body,on_message_complete等方法来干预不同阶段的请求处理, sanic中除了on_message_complete阶段,其他阶段用户均不能干涉,主要的做的是构建request对象,将必要的信息赋给这个对象。on_message_complete阶段创建一个任务调用request_handler来处理请求

def on_message_complete(self):
    # Entire request (headers and whole body) is received.
    # We can cancel and remove the request timeout handler now.
    if self._request_timeout_handler:
        self._request_timeout_handler.cancel()
        self._request_timeout_handler = None
    if self.is_request_stream and self._is_stream_handler:
        self._request_stream_task = self.loop.create_task(
            self.request.stream.put(None)
        )
        return
    self.request.body_finish()
    self.execute_request_handler()

def execute_request_handler(self):
    self._response_timeout_handler = self.loop.call_later(
        self.response_timeout, self.response_timeout_callback
    )
    self._last_request_time = current_time
    self._request_handler_task = self.loop.create_task(
        self.request_handler(
            self.request, self.write_response, self.stream_response
        )
    )

request_handler即app中定义的handle_request

async def handle_request(self, request, write_callback, stream_callback):
    ...
    response = None
    cancelled = False
    ...
    handler, args, kwargs, uri = self.router.get(request)
    try:
    ...
        response = handler(request, *args, **kwargs)
        if isawaitable(response):
            response = await response
    ...
    if isinstance(response, StreamingHTTPResponse):
        await stream_callback(response)
    else:
        write_callback(response)

这个方法主要先获取处理对应路由的方法及用户定义的方法,根据是否awaitable来获取response即用户定义路由处理方法的返回,返回对象是一个response.HTTPResponse/StreamingHTTPResponse对象,然后调用write_callback来将结果返回给client,write_callback是server.HttpProtocol的write_response方法,实现如下:

def write_response(self, response):
    ...
    try:
        keep_alive = self.keep_alive
        self.transport.write(
            response.output(
                self.request.version, keep_alive, self.keep_alive_timeout
            )
        )
    ...
    finally:
        if not keep_alive:
            self.transport.close()
            self.transport = None
        else:
            self._keep_alive_timeout_handler = self.loop.call_later(
                self.keep_alive_timeout, self.keep_alive_timeout_callback
            )
            self._last_response_time = current_time
            self.cleanup()

其主要通过transport.write写repsonse.output按照http协议格式输出的bytes response,response.output的定义如下

   def output(self, version="1.1", keep_alive=False, keep_alive_timeout=None):

        timeout_header = b""
        if keep_alive and keep_alive_timeout is not None:
            timeout_header = b"Keep-Alive: %d\r\n" % keep_alive_timeout

        body = b""
        if has_message_body(self.status):
            body = self.body
            self.headers["Content-Length"] = self.headers.get(
                "Content-Length", len(self.body)
            )

        self.headers["Content-Type"] = self.headers.get(
            "Content-Type", self.content_type
        )

        if self.status in (304, 412):
            self.headers = remove_entity_headers(self.headers)

        headers = self._parse_headers()

        if self.status is 200:
            status = b"OK"
        else:
            status = STATUS_CODES.get(self.status, b"UNKNOWN RESPONSE")

        return (
            b"HTTP/%b %d %b\r\n" b"Connection: %b\r\n" b"%b" b"%b\r\n" b"%b"
        ) % (
            version.encode(),
            self.status,
            status,
            b"keep-alive" if keep_alive else b"close",
            timeout_header,
            headers,
            body,
        )

不论是否有body返回header之后都会有一个空行,_parse_headers的返回是以\r\n结尾的,再接%b\r\n构成了一个空行

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。