Gunicorn源码分析(二)Worker进程

Gunicorn.worker实现了不同类型的work进程,有单进程、多线程、多协程等形式。

gunicorn.worker目录结构:

workers/
├── __init__.py
├── _gaiohttp.py
├── base.py
├── base_async.py
├── gaiohttp.py
├── geventlet.py
├── ggevent.py
├── gthread.py
├── gtornado.py
├── sync.py
└── workertmp.py

主要看以下几个源码文件

  1. base.py:基类文件
  2. gthread.py:单进程多线程工作模式
  3. sync.py:单进程单线程模式
  4. workertmp:tmp文件,master监控worker进程的机制

剩下的其他文件大同小异。

Worker

下面是将Worker类实现的简略。

class Worker(object):
    SIGNALS = [getattr(signal, "SIG%s" % x)
            for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()] # 支持的信号

    PIPE = []

    def __init__(self, age, ppid, sockets, app, timeout, cfg, log)
    def __str__(self)
    def notify(self)
    def run(self)
    def init_process(self)
    def load_wsgi(self)                  # 获得实现wsgi协议的app,如Flask
    def init_signals(self)
    def handle_usr1(self, sig, frame)
    def handle_exit(self, sig, frame)
    def handle_quit(self, sig, frame)
    def handle_abort(self, sig, frame)
    def handle_error(self, req, client, addr, exc)
    def handle_winch(self, sig, fname)
Worker调用过程
    def __init__(self, age, ppid, sockets, app, timeout, cfg, log):
        """\
        This is called pre-fork so it shouldn't do anything to the
        current process. If there's a need to make process wide
        changes you'll want to do that in ``self.init_process()``.
        """
        self.age = age
        self.pid = "[booting]"
        self.ppid = ppid
        self.sockets = sockets
        self.app = app                      
        self.timeout = timeout  #超时时间
        self.cfg = cfg              # 配置
        # 状态
        self.booted = False  #已启动
        self.aborted = False  #已终止

        self.reloader = None  

        self.nr = 0
        jitter = randint(0, cfg.max_requests_jitter)
        self.max_requests = cfg.max_requests + jitter or sys.maxsize
        self.alive = True           # 是否存活
        self.log = log                #日志对象
        self.tmp = WorkerTmp(cfg)  # worker tmp文件

__init__()做的事情相对简单,就是将一些相关的参数,如cfgapp等作为Worker对象的属性,同时创建一个tmpfile父进程通过检查该文件的时间戳,来确认子进程是否存活。

def notify(self):
        """\
        Your worker subclass must arrange to have this method called
        once every ``self.timeout`` seconds. If you fail in accomplishing
        this task, the master process will murder your workers.
        """
        self.tmp.notify()

notify()调用WorkerTmp.notify()更改所对应tmp文件的时间戳。

    def init_process(self):
        """\
        If you override this method in a subclass, the last statement
        in the function should be to call this method with
        super(MyWorkerClass, self).init_process() so that the ``run()``
        loop is initiated.
        """

        # set environment' variables
        if self.cfg.env:
            for k, v in self.cfg.env.items():
                os.environ[k] = v
        #设置进程信息
        util.set_owner_process(self.cfg.uid, self.cfg.gid,
                               initgroups=self.cfg.initgroups)

        # Reseed the random number generator
        util.seed()

        # For waking ourselves up
        self.PIPE = os.pipe()
        for p in self.PIPE:
            util.set_non_blocking(p)
            util.close_on_exec(p)

        # Prevent fd inheritance
        # close_on_exec 设置对应的文件在创建子进程的时候不会被继承
        for s in self.sockets:
            util.close_on_exec(s)
        util.close_on_exec(self.tmp.fileno())

        self.wait_fds = self.sockets + [self.PIPE[0]]

        self.log.close_on_exec()

        # 设置信号处理函数
        self.init_signals()

        # start the reloader
        if self.cfg.reload:
            def changed(fname):
                self.log.info("Worker reloading: %s modified", fname)
                self.alive = False
                self.cfg.worker_int(self)
                time.sleep(0.1)
                sys.exit(0)

            reloader_cls = reloader_engines[self.cfg.reload_engine]
            self.reloader = reloader_cls(extra_files=self.cfg.reload_extra_files,
                                         callback=changed)
            self.reloader.start()

        self.load_wsgi()
        self.cfg.post_worker_init(self) 

        # Enter main run loop
        self.booted = True
        self.run()            #主循环

init_process()Work进程的入口文件,启动工作进程调用的是该方法,官方建议所有的实现子类的重载方法应该调用父类的该方法,该方法主要做了以下几件事:

  1. 设置进程的进程组信息;
  2. 创建单进程管道,Worker是通过管道来存储导致中断的信号,不直接处理,先收集起来,在主循环中处理;
  3. 获取要监听的文件描述符,并将描述符设置为不可被子进程继承;
  4. 设置中断信号处理函数;
  5. 设置代码更新时,自动重启的配置
  6. 获取实现了wsgi协议的app对象
  7. 进入主循环方法
    def run(self):
        """\
        This is the mainloop of a worker process. You should override
        this method in a subclass to provide the intended behaviour
        for your particular evil schemes.
        """
        raise NotImplementedError()

Workder类没有实现run(),由子类去实现具体的逻辑。

再来看看WorkerTmp类。

WorkerTmp

# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.

import os
import platform
import tempfile

from gunicorn import util

PLATFORM = platform.system()
IS_CYGWIN = PLATFORM.startswith('CYGWIN')

class WorkerTmp(object):

    def __init__(self, cfg):
        old_umask = os.umask(cfg.umask)
        fdir = cfg.worker_tmp_dir
        if fdir and not os.path.isdir(fdir):
            raise RuntimeError("%s doesn't exist. Can't create workertmp." % fdir)
        fd, name = tempfile.mkstemp(prefix="wgunicorn-", dir=fdir)

        # allows the process to write to the file
        util.chown(name, cfg.uid, cfg.gid)
        os.umask(old_umask)

        # unlink the file so we don't leak tempory files
        try:
            if not IS_CYGWIN:
                # 即使这里unlink了文件,已经打开了文件描述符仍然可以访问该文件内容  close能够实施删除文件内容的操作,必定因为在close之前有一个unlink操作。
                util.unlink(name)
            self._tmp = os.fdopen(fd, 'w+b', 1)
        except:
            os.close(fd)
            raise

        self.spinner = 0

    def notify(self):
        self.spinner = (self.spinner + 1) % 2
        os.fchmod(self._tmp.fileno(), self.spinner)         #  更新时间戳

    def last_update(self):
        return os.fstat(self._tmp.fileno()).st_ctime

    def fileno(self):
        return self._tmp.fileno()

    def close(self):
        return self._tmp.close()

WorkTmp类主要的作用是创建一个临时文件,子进程通过更新该文件的时间戳,父进程定期检查子进程临时文件的时间戳确定子进程是否存活。
WorkTmp._init__()在系统创建了临时文件并获取其文件描述符,然后unlink该文件,防止子进程关闭后没有删除文件,,即使被unlink了,已经打开的文件描述符仍然访问文件。
WorkTmp.notify()通过更改文件的权限来更新文件修改时间。
WorkTmp.last_update()用来获取文件最后一次更新的时间。

最后看下工作进程的一个实现子类ThreadWorker

ThreadWorker

该类实现了父类的Woker.run()方法,并重载了部分其他方法。

 def init_process(self):
        """初始化函数
        """
        self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
        self.poller = selectors.DefaultSelector()       # 利用系统提供的selector
        self._lock = RLock()                            # 创建可重入锁
        super(ThreadWorker, self).init_process()

初始化函数做了以下事情:

  1. 通过concurrent.futures.ThreadPoolExecutor创建线程池,线程的数量由配置文件的thread决定;
  2. 通过selectors.DefaultSelector()获取符合所在平台的最优的I/O复用,如Linux使用epollMac下面使用Kqueue,这个模块隐藏了底层的平台细节,对外提供统一的接口;
  3. 创建一个可重入锁;
  4. 调用父类的init_process(),在该方法里面调用了run()方法。
    def run(self):
        """运行的主函数
            ①通知父进程,我还活着
            ②监听事件
            ③处理监听事件
            ④判断父进程是否已经挂了,是的话退出循环
            ⑤murder 超过keep-alive最长的时间的请求
        """

        # init listeners, add them to the event loop
        for sock in self.sockets:
            sock.setblocking(False)                                             # 设置为非阻塞
            # a race condition during graceful shutdown may make the listener
            # name unavailable in the request handler so capture it once here
            server = sock.getsockname()
            acceptor = partial(self.accept, server)  # self.acceptor的偏函数
            self.poller.register(sock, selectors.EVENT_READ, acceptor)          #register(fileobj, events, data=None) 用data来保存callback函数

        while self.alive:                                                       # 主循环
            # notify the arbiter we are alive
            self.notify()                                                       # todo 通知机制?

            # can we accept more connections?
            if self.nr_conns < self.worker_connections:                         # 防止超过并发数
                # wait for an event
                # selector新写法
                events = self.poller.select(1.0)                                # 等待事件
                for key, _ in events:
                    callback = key.data                                         #callback从data获取
                    callback(key.fileobj)

                # check (but do not wait) for finished requests
                result = futures.wait(self.futures, timeout=0,
                        return_when=futures.FIRST_COMPLETED)                    #等待队列事件 futures.wait 接收的第一个参数是一个可迭代对象,无阻塞等待完成
            else:
                # wait for a request to finish
                result = futures.wait(self.futures, timeout=1.0,                # 阻塞等待
                        return_when=futures.FIRST_COMPLETED)

            # clean up finished requests
            for fut in result.done:
                self.futures.remove(fut)

            if not self.is_parent_alive(): # 通过判断ppid是否已经发生变化
                break

            # hanle keepalive timeouts
            self.murder_keepalived()

        self.tpool.shutdown(False)
        self.poller.close()

        for s in self.sockets:
            s.close()

        futures.wait(self.futures, timeout=self.cfg.graceful_timeout)       # 优雅关闭等待的最长时间

run()方法中主要做了以下事情:

  1. 更新tmpfile时间戳
  2. 获取就绪的请求连接;
  3. 如果并发数允许,分配一个线程处理请求;
  4. 判断父进程是否已经停止工作,有的话准备退出主循环;
  5. 杀死已经允许最大连接事件的keep-alive连接。

下面是一个请求刚进来的处理过程:

    def _wrap_future(self, fs, conn):
        """将futuren放入队列中,并设置处理完成后的回调函数
        
        Arguments:
            fs {[type]} -- [description]
            conn {[type]} -- [description]
        """

        fs.conn = conn
        self.futures.append(fs)
        fs.add_done_callback(self.finish_request)

    def enqueue_req(self, conn):
        """将请求放入线程处理
        
        Arguments:
            conn {[type]} -- [description]
        """

        conn.init()
        # submit the connection to a worker
        fs = self.tpool.submit(self.handle, conn)
        self._wrap_future(fs, conn)

    def accept(self, server, listener):
        """监听时间处理函数
        
        Arguments:
            server {[type]} -- [description]
            listener {[type]} -- [description]
        """

        try:
            sock, client = listener.accept()
            # initialize the connection object
            conn = TConn(self.cfg, sock, client, server)
            self.nr_conns += 1                                      # 增加当前正在处理的请求数
            # enqueue the job
            self.enqueue_req(conn)
        except EnvironmentError as e:
            if e.errno not in (errno.EAGAIN,
                    errno.ECONNABORTED, errno.EWOULDBLOCK):
                raise
  1. socketaccept返回一个与客户端连接的socket;
  2. socket作为self.handler()方法的参数启动线程;
  3. 注册线程运行完成后的回调函数。

self.handler()主要的部分在于其调用的self.handle_request(),因此直接看self.handle_request()做了哪些事情:

    def handle_request(self, req, conn):
        """主要的处理函数
        """

        environ = {}
        resp = None
        try:
            self.cfg.pre_request(self, req)
            request_start = datetime.now()
            resp, environ = wsgi.create(req, conn.sock, conn.client,
                    conn.server, self.cfg)
            environ["wsgi.multithread"] = True
            self.nr += 1
            if self.alive and self.nr >= self.max_requests:
                self.log.info("Autorestarting worker after current request.")
                resp.force_close()
                self.alive = False

            if not self.cfg.keepalive:
                resp.force_close()
            elif len(self._keep) >= self.max_keepalived:
                resp.force_close()

            respiter = self.wsgi(environ, resp.start_response)
            try:
                if isinstance(respiter, environ['wsgi.file_wrapper']):
                    resp.write_file(respiter)
                else:
                    for item in respiter:
                        resp.write(item)

                resp.close()
                request_time = datetime.now() - request_start
                self.log.access(resp, req, environ, request_time)
            finally:
                if hasattr(respiter, "close"):
                    respiter.close()

            if resp.should_close():
                self.log.debug("Closing connection.")
                return False
        except EnvironmentError:
            # pass to next try-except level
            util.reraise(*sys.exc_info())
        except Exception:
            if resp and resp.headers_sent:
                # If the requests have already been sent, we should close the
                # connection to indicate the error.
                self.log.exception("Error handling request")
                try:
                    conn.sock.shutdown(socket.SHUT_RDWR)
                    conn.sock.close()
                except EnvironmentError:
                    pass
                raise StopIteration()
            raise
        finally:
            try:
                self.cfg.post_request(self, req, environ, resp)
            except Exception:
                self.log.exception("Exception in post_request hook")

        return True

除了一些配置和环境相关的处理,关键的在于respiter = self.wsgi(environ, resp.start_response)这行代码,这行代码获取了实现wsgi协议的app并运行,将获取后的结果返回给客户端。
这里就是整个请求处理的关键,只要符合wsgi协议的框架,都可以这样接入Gunicorn

整个ThreadWork的处理流程,如下图:

Gunicorn.ThreadWorker
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容

  • 入口 首先程序的入口为gunicorn/app/wsgiapp这个模块。 WSGIApplication这个类继承...
    _kkk阅读 2,076评论 0 1
  • 1.ios高性能编程 (1).内层 最小的内层平均值和峰值(2).耗电量 高效的算法和数据结构(3).初始化时...
    欧辰_OSR阅读 29,299评论 8 265
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,084评论 1 32
  • 一、Python简介和环境搭建以及pip的安装 4课时实验课主要内容 【Python简介】: Python 是一个...
    _小老虎_阅读 5,719评论 0 10
  • 1 秦与明陷入LA的无限魔咒中。开始频繁出入酒吧。有时是想找一点安慰。有时又是胡乱中对于生活放弃的痛恨进而买醉。母...
    viu不老君阅读 184评论 0 0