Kombu 源码解析三

声明:本文仅限于简书发布,其他第三方网站均为盗版,原文地址: Kombu 源码解析三

Kombu 源码解析三

在上一篇中,我们从一个 Consumer 的 Sample 中聊起,看了一些 Kombu 的内部实现,了解了一些消息是如何拿回来的。现在我们接着继续看看消息是如何发出去的,同样,我们还是从第一篇文章中提到的 Sample 出发,看看真实情况是如何的:

首先第一步还是先创建 Connection,这个毫无疑问,因为肯定要有连接才能发送消息。这个代码我们在上一篇文章中已经看过了,目前阶段就先不看了,继续看下面一个。

Line 29 我们明确得声明了一个 Producer,这个和我们在第二篇文章中看到的不太一样,第二篇中我们只是简单得构造了一个 SimpleQueue,而 SimpleQueue 会创建 Consumer 和 Producer,但是这里我们明确得声明了一个 Producer,所以我们可以探究一下发生了什么,代码的位置在:kombu/messageing.py Line 61:

从这里可以看到很多熟悉的名词,channel/exchange 等等,然而我们发现我们都没有传递这些参数,那么默认都是 None 了,只有 channel 我们传的是 connection,ok,记住这些,继续出发看看调用 publish 的时候发生了啥:

publish 的注释很长,所以我把他忽略了,看下我们调用的代码,我们也传了很多参数,先一一对应起来看下我们的参数都起到什么作用了,首先我们传递的参数有:

  • body:{'hello': 'world'}
  • exchange:Exchange('kombu_demo', type='direct')
  • routing_key:'kombu_demo'
  • serializer:json
  • compression:zlib

纵观一下,发现这里没做啥太多有价值的事情,只是对参数进行校验和规整,比较有用的估计也就在 Line 138-140 这三行了,这里对数据进行了压缩处理,然后 Line 148 也有点用,这里对重试进行了策略设置。最后就直接在 Line 149 调用内部的 _publish 方法了。

这里是比较有意思的地方啦,这里在 Line 188 就将消息封装成了 Message,然后 Line 194 对 mq 的 queue 进行声明,然后最后还是通过 Channel 来发送消息。

话说,看到这里,我们还是没看到 Exchange 和 Routing Key 是如何工作的,所以我们希望 basic_publish 能够给我们一点启发,跟踪 Channel 的基类,我们看到 virtual/base.py Line 600:

可以看到,这里的消息发送原来都是交给 Exchange 来做的,所以我们是时候看下 Exchange 的实现了,代码跳到 komu/transport/virtual/exchange.py,我们会发现很多有趣的东西,例如在 Line 155

我们会发现 Kombu 默认会支持 3 种不同的 Exchange,并且定义了它的 Exchange 实现,我们来看看 DirectExchange 的具体 deliver

所以关键是在于 _lookup 是如何实现的,其实 _lookup 具体实现我们不太需要关心,我们往上看一下,查看一下 lookup 心中就有一些了解了:

这里有一个问题就在于 table 是什么,我们找出 table 的实现事情就清楚多了,但是跟踪一下代码,你会发现这个 table 的获取比较复杂,这里抽取几块比较典型的代码,记录一下这个 table 的来源,位置其实都在 komu/transport/virtual/base.py


从这些代码片段中我们就大概知道了 Exchange routing 的原理了,关键的点应该是在每一个 Exchange 的 prepare_bind,然后我们再来看看 Direct Exchange 的 put 操作具体是怎样的,这个代码就在 kombu/transport/redis.py Line 762 这个位置:

没啥亮点了,因为我们都已经知道了 queue 是哪个,所以直接放进去就可以了,留个心眼,这里还做了一个 q_for_pri 的操作,以后可能会看到。

总结

OK,这就是一个简单的生产消息的流程,我们简单得走了一遍,又了解了不少的知识:

  • Producer 包含了很多东西,有 Exchange、routing_key 和 channel 等等
  • Message 只是一些要素的封装,没有干什么事情
  • Exchange 只是将发送的 routing_key 转化为 queue 的名字
  • 实际发送还是得 channel 来干活,每个不同的 Transport 都有对应的 Channel

Reference

  1. Kombu Code
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • cinder RPC 分析 [TOC] 我们都知道在Cinder内部,各组件之间通讯是通过RPC api,比如c...
    笨手笨脚越阅读 1,855评论 0 3
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,948评论 18 139
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,407评论 2 34
  • rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性...
    点融黑帮阅读 3,041评论 3 41
  • 1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的...
    高广超阅读 6,123评论 3 51