探索Rabbitmq的Java客户端

原文

AMQPConnection

实例初始化

创建Connection时会通过FrameHandlerFacotry创建一个SocketFrameHandler,SocketFrameHandler对Socket进行了封装。

public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
    {
        checkPreconditions();
        this.username = params.getUsername();
        this.password = params.getPassword();
        this._frameHandler = frameHandler;
        this._virtualHost = params.getVirtualHost();
        this._exceptionHandler = params.getExceptionHandler();

        this._clientProperties = new HashMap<String, Object>(params.getClientProperties());
        this.requestedFrameMax = params.getRequestedFrameMax();
        this.requestedChannelMax = params.getRequestedChannelMax();
        this.requestedHeartbeat = params.getRequestedHeartbeat();
        this.shutdownTimeout = params.getShutdownTimeout();
        this.saslConfig = params.getSaslConfig();
        this.executor = params.getExecutor();
        this.threadFactory = params.getThreadFactory();

        this._channelManager = null;

        this._brokerInitiatedShutdown = false;

        this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
    }

启动连接

初始化WorkService和HeartBeatSender。

创建一个channel0的AMQChannel,这个channel不会被ChannelManager管理

首先channel0会将一个BlockingRpcContinuation作为当前未完成的Rpc请求,用于接收handshake的响应。

然后channel0会向socket中写入一条只有header的消息作为handshake,header中包含了客户端的版本号。

紧接着会启动主循环线程,主循环会通过SocketFrameHandler从socket接收字节流。此时接收到的第一条数据是服务端响应handshake返回的Connection.Start信息(包含服务端版本、机制、基础信息)。

主循环线程启动后,主线程会阻塞地等待服务端的handshake响应。拿到响应之后会对服务器回传的信息进行比对,然后发送Connection.StartOK的信息去服务端(这个请求也还是阻塞式的),等待服务端回传Connection.Tune(包含最大channel数、最大frame长度和heartbeat间隔)。将这些信息与实例初始化是的设置信息进行对比,初始化ChannelManager

紧接着发送Connection.TuneOk和Connection.Open消息去服务端,完成connection的建立。

Connection > MainLoop > readFrame

消息体

Frame是对AMQP消息的封装:包含frame的type、channel号、消息内容

type|channelNumber|payloadSize|payload|frameEndMarker

Payload包含了消息类型、消息头和消息主题

method|header|body

消息发送和接收

消息的发送和接收都要channel来完成。

创建Channel

通过Connection的ChannelManager来创建Channel,通过指定的ChannelNumber或者由分配器分配。创建好的Channel实例会放入ChannelManager的Map中,key为ChannelNumber。由此可见Channel是Connection唯一的。

public ChannelN createChannel(AMQConnection connection);
public ChannelN createChannel(AMQConnection connection, int channelNumber);
private ChannelN addNewChannel(AMQConnection connection, int channelNumber);
protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService);

Channel实例化之后会调用Channel.open方法,发送Channel.Open去服务端(阻塞式),等待服务端响应Channel.OpenOk。

消息发送

Channel.transmit 发送消息,调用AMQCommand.transmit完成发送。

AMQCommand.transmit将消息封装成Frame,通过connection的SocketFrameHandler写入OutpuStream。

消息接收

主循环线程在链接创建完成后会监听socket,从InputStream中读取二进制流封装成Frame。通过Frame中的ChannelNumber从ChannelManager中获取对应的Channel实例处理Frame。

while (_running) {
    Frame frame = _frameHandler.readFrame();
    if (frame != null) {
        _missedHeartbeats = 0;
        if (frame.type == AMQP.FRAME_HEARTBEAT) {
            // Ignore it: we've already just reset the heartbeat counter.
        } else {
            if (frame.channel == 0) { // the special channel
                _channel0.handleFrame(frame);
            } else {
                if (isOpen()) {
                    // If we're still _running, but not isOpen(), then we
                    // must be quiescing, which means any inbound frames
                    // for non-zero channels (and any inbound commands on
                    // channel zero that aren't Connection.CloseOk) must
                    // be discarded.
                    ChannelManager cm = _channelManager;
                    if (cm != null) {
                        cm.getChannel(frame.channel).handleFrame(frame);
                    }
                }
            }
        }
    } else {
        // Socket timeout waiting for a frame.
        // Maybe missed heartbeat.
        handleSocketTimeout();
    }
}

Channel会使用已经准备好的AMQCommand处理Frame,并未下一个Frame准备新的AMQCommand。

public void handleFrame(Frame frame) throws IOException {
    AMQCommand command = _command;
    if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
        _command = new AMQCommand(); // prepare for the next one
        handleCompleteInboundCommand(command);
    }
}

AMQCommad会使用CommandAssembler依次从Frame的payload中检出对应的Method、Header和Body。如果检出了Body,整个Frame会被检出完成。如过未完成,会进入主循环再次处理直至完成。

public synchronized boolean handleFrame(Frame f) throws IOException
{
    switch (this.state) {
      case EXPECTING_METHOD:          consumeMethodFrame(f); break;
      case EXPECTING_CONTENT_HEADER:  consumeHeaderFrame(f); break;
      case EXPECTING_CONTENT_BODY:    consumeBodyFrame(f);   break;
      default:
          throw new AssertionError("Bad Command State " + this.state);
    }
    return isComplete();
}

Frame被检出完后,会根据Method的类型进入不同的异步处理流程。

Method在channel打开和关闭的情况下会以下的可能:

Channel打开:Basic.Deliver, Basic.Return, Basic.Flow, Basic.Ack, Basic.Nack, Basic.RecoveryOk, Basic.Cancel

Channel关闭:Channel.CloseOk

生产和消费

生产

调用Channel.basicPublish()方法,指定exchange、routingKey等信息,消息属性、消息体。封装成Baisc.Publish,放入AMQCommand,最后调用transmit方法完成发送。参考消息发送

public void basicPublish(String exchange, String routingKey,
                         boolean mandatory, boolean immediate,
                         BasicProperties props, byte[] body)
    throws IOException
{
    if (nextPublishSeqNo > 0) {
        unconfirmedSet.add(getNextPublishSeqNo());
        nextPublishSeqNo++;
    }
    BasicProperties useProps = props;
    if (props == null) {
        useProps = MessageProperties.MINIMAL_BASIC;
    }
    transmit(new AMQCommand(new Basic.Publish.Builder()
                                .exchange(exchange)
                                .routingKey(routingKey)
                                .mandatory(mandatory)
                                .immediate(immediate)
                            .build(),
                            useProps, body));
}

消费

创建QueueingConsumer实例,然后调用Channel.basicConsume方法。

queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume("queue_name", queueingConsumer);
new Thread() {
    @Override
    public void run() {
        while (true) {
            try {
                final QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                new Thread() {
                    @Override
                    public void run() {
                        try{
                            delivery.getEnvelope();//消息头
                            delivery.getProperties();//消息属性
                            delivery.getBody();//消息体
                        }finally{
                          //channel.basicAck();
                          //channel.basicNack()
                        }
                    }
                }.start();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}.start();

QueueingConsumer实现了Consumer接口。

Channel.basicConsume方法会封装Channel.Consume消息发送到服务端(阻塞式),等待服务端的Channel.ConsumeOk响应(包含了服务端为Consumer分配的ConsumerTag)。然后将QueueingConsumer放入Map中,key为ConsumerTag。consumer是Channel唯一。

当客户端接收到消息,参考消息接收。Basic.Deliver类型的消息(consumerTag、deliveryTag、redelivered、exchange、routingKey)会进入消费处理流程。Channel根据ConsumerTag从Map中获取对应的QueueConsumer实例,由Channel的ConsumerDispatcher通过Connection初始化的WorkService创建新的处理线程,调用QueueConsumer实例的handleDelivery方法。QueueConsumer将消息封装成Delivery对象,放入BlockingQueue中。

消费线程等待新的Delivery(阻塞式),之后创建新的线程完成消息的处理。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 228,835评论 6 534
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 98,676评论 3 419
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 176,730评论 0 380
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 63,118评论 1 314
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 71,873评论 6 410
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 55,266评论 1 324
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 43,330评论 3 443
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 42,482评论 0 289
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 49,036评论 1 335
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 40,846评论 3 356
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 43,025评论 1 371
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 38,575评论 5 362
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 44,279评论 3 347
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 34,684评论 0 26
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 35,953评论 1 289
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 51,751评论 3 394
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 48,016评论 2 375

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,789评论 18 139
  • 本文章翻译自http://www.rabbitmq.com/api-guide.html,并没有及时更新。 术语对...
    joyenlee阅读 7,686评论 0 3
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,377评论 2 34
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,360评论 0 1
  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 11,057评论 6 13