celery 7 优秀开源项目kombu源码分析之registry和entrypoint

有多年后台开发的工程师想必清楚,Celery本身涉及到的技术点其实在业界应用是很广泛的。Celery能这么流行,我们先排除没有进行技术深入下的盲从,和它诞生的非常早以外,我认为这和项目的内部设计的非常好也是有关的。

接下来分析Celery使用的Kombu库中的一些设计实现让大家对这个优秀项目更了解,并从中学习可扩展开发的实践。

Kombu是什么?
当一个项目变得越来越复杂,就要考虑只保留核心,并把其他部分分拆到不同的项目中以便减少未来的维护和开发的成本。Flask、IPython都是这样做的。

Kombu是一个把消息传递封装成统一接口的库。

Celery一开始先支持的RabbitMQ,也就是使用AMQ协议。由于要支持越来越多的消息代理,但是这些消息代理是不支持AMQ协议的,需要一个东西把所有的消息代理的处理方式统一起来,甚至可以理解为把它们「伪装成支持AMQ协议」。Kombu的最初的实现叫做carrot, 后来经过重构才成了Kombu。

registry

registry也就是「注册」,有按需加入的意思,在Python标准库和一些优秀开源项目中都有应用。我们先看个django的[场景]

### source code start
from itertools import chain


class CheckRegistry:

    def __init__(self):
        self.registered_checks = []
        self.deployment_checks = []

    def register(self, check=None, *tags, **kwargs):
        kwargs.setdefault('deploy', False)

        def inner(check):
            check.tags = tags
            if kwargs['deploy']:
                if check not in self.deployment_checks:
                    self.deployment_checks.append(check)
            elif check not in self.registered_checks:
                self.registered_checks.append(check)
            return check

        if callable(check):
            return inner(check)
        else:
            if check:
                tags += (check, )
            return inner

    def tag_exists(self, tag, include_deployment_checks=False):
        return tag in self.tags_available(include_deployment_checks)

    def tags_available(self, deployment_checks=False):
        return set(chain(*[check.tags for check in self.get_checks(deployment_checks) if hasattr(check, 'tags')]))

    def get_checks(self, include_deployment_checks=False):
        checks = list(self.registered_checks)
        if include_deployment_checks:
            checks.extend(self.deployment_checks)
        return checks


registry = CheckRegistry()
register = registry.register
tag_exists = registry.tag_exists

### source code end
@register('mytag', 'another_tag')
def my_check(apps, **kwargs):
    pass


print tag_exists('another_tag')
print tag_exists('not_exists_tag')

可以看到每次用registry.register都能动态的添加新的tag,最后还用 register= registry.register这样的方式列了个别名。执行结果如下:

❯ python django_example.py
True
False

<meta charset="utf-8">

kombu库包含对消息的序列化和反序列化工作的实现,可以同时支持多种序列化方案,如pickle、json、yaml和msgpack。假如你从前没有写过这样可扩展的项目,可能想的是每种的方案的loads和dumps都封装一遍,然后用一个大的if/elif/else来控制最后的序列化如何执行。

那么在kombu里面是怎么用的呢?

import codecs
from collections import namedtuple

codec = namedtuple('codec', ('content_type', 'content_encoding', 'encoder'))

class SerializerNotInstalled(Exception):
    pass


class SerializerRegistry(object):
    def __init__(self):
        self._encoders = {}
        self._decoders = {}
        self._default_encode = None
        self._default_content_type = None
        self._default_content_encoding = None

    def register(self, name, encoder, decoder, content_type,
                 content_encoding='utf-8'):
        if encoder:
            self._encoders[name] = codec(
                content_type, content_encoding, encoder,
            )
        if decoder:
            self._decoders[content_type] = decoder

    def _set_default_serializer(self, name):
        try:
            (self._default_content_type, self._default_content_encoding,
             self._default_encode) = self._encoders[name]
        except KeyError:
            raise SerializerNotInstalled(
                'No encoder installed for {0}'.format(name))

    def dumps(self, data, serializer=None):
        if serializer and not self._encoders.get(serializer):
            raise SerializerNotInstalled(
                'No encoder installed for {0}'.format(serializer))

        if not serializer and isinstance(data, unicode):
            payload = data.encode('utf-8')
            return 'text/plain', 'utf-8', payload

        if serializer:
            content_type, content_encoding, encoder = \
                self._encoders[serializer]
        else:
            encoder = self._default_encode
            content_type = self._default_content_type
            content_encoding = self._default_content_encoding

        payload = encoder(data)
        return content_type, content_encoding, payload

    def loads(self, data, content_type, content_encoding):
        content_type = (content_type if content_type
                        else 'application/data')
        content_encoding = (content_encoding or 'utf-8').lower()

        if data:
            decode = self._decoders.get(content_type)
            if decode:
                return decode(data)
        return data


registry = SerializerRegistry()
dumps = registry.dumps
loads = registry.loads
register = registry.register

其实kombu还实现了unregister限于篇幅我就不展开了。现在我们想添加yaml的支持,只需要加这样一个函数:

def register_yaml():
    try:
        import yaml
        registry.register('yaml', yaml.safe_dump, yaml.safe_load,
                          content_type='application/x-yaml',
                          content_encoding='utf-8')
    except ImportError:

        def not_available(*args, **kwargs):
            """Raise SerializerNotInstalled.
            Used in case a client receives a yaml message, but yaml
            isn't installed.
            """
            raise SerializerNotInstalled(
                'No decoder installed for YAML. Install the PyYAML library')
        registry.register('yaml', None, not_available, 'application/x-yaml')


register_yaml()

这样就支持yaml了。如果希望默认使用yaml来序列化,可以执行:

registry._set_default_serializer('yaml')

是不是非常好扩展,如果哪天我希望去掉对pickle(安全问题),就可以直接注释对应的函数就好了。写个小例子试验下

yaml_data = """\
float: 3.1415926500000002
int: 10
list: [george, jerry, elaine, cosmo]
string: The quick brown fox jumps over the lazy dog
unicode: "Th\\xE9 quick brown fox jumps over th\\xE9 lazy dog"
"""

content_type, content_encoding, payload = dumps(yaml_data, serializer='yaml')
print content_type, content_encoding

assert loads(payload, content_type=content_type, content_encoding=content_encoding) == yaml_data

运行的结果就是:

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容