MQ!Rabbit-client Channel

MQ!Rabbit-client Channel

继续看建立链接的源码。 建立链接的源码

AMQChannel

看看conn的启动,下面这俩段代码,这部分也是是补充 MQ!Rabbit-client command里面最后的尾巴的

public void start() throws IOException, TimeoutException {
   ...
   _channel0.enqueueRpc(connStartBlocker);
   ...
   Method serverResponse = 
                    _channel0.rpc(method, handshakeTimeout/2).getMethod();
      
}    

首先要知道这个 _channel0是个什么东西。查看AMQConnection类(start方法就是这个类的)

private final AMQChannel _channel0;

先看下AMQChannel的成员变量

// 对象锁
protected final Object _channelMutex = new Object();
// AMQConnection 对象
private final AMQConnection _connection;
// 通道编号为0的代表全局连接中的所有帧,1-65535代表特定通道的帧.
private final int _channelNumber;
// 内部处理使用的对象,调用AMQCommand的方法来处理一些东西。
private AMQCommand _command = new AMQCommand();
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
// 当前未处理完的rpc请求
private RpcContinuation _activeRpc = null;
/** Whether transmission of content-bearing methods should be blocked */
public volatile boolean _blockContent = false;

那么来看下这个_channel0.enqueueRpc(connStartBlocker)对应源码干了啥

public void enqueueRpc(RpcContinuation k){
    // 根据传入的 RpcContinuation构造RpcContinuationRpcWrapper对象的一个方法
    doEnqueueRpc(() -> new RpcContinuationRpcWrapper(k));
}


private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {
    synchronized (_channelMutex) {
        boolean waitClearedInterruptStatus = false;
        while (_activeRpc != null) {
            try {
                _channelMutex.wait();
            } catch (InterruptedException e) {
                waitClearedInterruptStatus = true;
            }
        }
        if (waitClearedInterruptStatus) {
            Thread.currentThread().interrupt();
        }
        // RpcContinuationRpcWrapper 获取对象赋值给 _activeRpc
        // 属性: private RpcWrapper _activeRpc = null;
        _activeRpc = rpcWrapperSupplier.get();
    }
}

可以看到,最后是将rpc对象交给当前对象。当前对象是AMQChannel的实例即_channel0,所以

// 将rpc任务放入channel
_channel0.enqueueRpc(connStartBlocker);

在看_channel0.rpc(method, handshakeTimeout/2).getMethod();干了什么

这里先把调用链写出来(都是AMQChannel的方法),敲黑板😶,注意看最后一个方法

  • rpc(Method m, int timeout)
  • privateRpc(Method m, int timeout)
  • ensureIsOpen(); quiescingRpc(m, k);
  • enqueueRpc(k); quiescingTransmit(m);
  • quiescingTransmit(new AMQCommand(m));
  • transmit(AMQChannel channel) throws IOException

最后一个方法就是上一篇 MQ!Rabbit-client command说的核心方法.源码都不想贴了,还是贴上凑字数吧

public AMQCommand rpc(Method m, int timeout)
    throws IOException, ShutdownSignalException, TimeoutException {
    return privateRpc(m, timeout);
}

// 获取 command
private AMQCommand privateRpc(Method m, int timeout)
            throws IOException, ShutdownSignalException, TimeoutException {
    SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
    rpc(m, k);

    try {
        return k.getReply(timeout);
    } catch (TimeoutException e) {
        cleanRpcChannelState();
        throw e;
    }
}

public void rpc(Method m, RpcContinuation k) throws IOException {
    synchronized (_channelMutex) {
        ensureIsOpen();
        quiescingRpc(m, k);
    }
}


public void quiescingRpc(Method m, RpcContinuation k)
        throws IOException {
    synchronized (_channelMutex) {
        enqueueRpc(k);
        quiescingTransmit(m);
    }
}

public void quiescingTransmit(Method m) throws IOException {
    synchronized (_channelMutex) {
        quiescingTransmit(new AMQCommand(m));
    }
}

public void quiescingTransmit(AMQCommand c) throws IOException {
        synchronized (_channelMutex) {
            if (c.getMethod().hasContent()) {
                while (_blockContent) {
                    try {
                        _channelMutex.wait();
                    } catch (InterruptedException ignored) {}

                    // This is to catch a situation when the thread wakes up during
                    // shutdown. Currently, no command that has content is allowed
                    // to send anything in a closing state.
                    ensureIsOpen();
                }
            }
            c.transmit(this);
        }
}

// 将Command转换为多个Frame并且发送
public void transmit(AMQChannel channel) throws IOException {
    int channelNumber = channel.getChannelNumber();
    AMQConnection connection = channel.getConnection();

    synchronized (assembler) {
        Method m = this.assembler.getMethod();
        connection.writeFrame(m.toFrame(channelNumber));
        if (m.hasContent()) {
            byte[] body = this.assembler.getContentBody();

            connection.writeFrame(this.assembler.getContentHeader()
                                  .toFrame(channelNumber, body.length));

            int frameMax = connection.getFrameMax();
            int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax
                - EMPTY_FRAME_SIZE;

            for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
                int remaining = body.length - offset;

                int fragmentLength = (remaining < bodyPayloadMax) ? remaining
                    : bodyPayloadMax;
                Frame frame = Frame.fromBodyFragment(channelNumber, body,
                                                     offset, fragmentLength);
                connection.writeFrame(frame);
            }
        }
    }

    connection.flush();
}

看了上面的源码,我们再来分析下创建链接start方法中的一段源码

...
// connStart处理这里不展示了
AMQP.Connection.Start connStart;
AMQP.Connection.Tune connTune = null;    
...

// 循环处理,直到TuneOk
do {
    // StartOK 封装为一个method帧
    Method method = (challenge == null)
        ? new AMQP.Connection.StartOk.Builder()
            .clientProperties(_clientProperties)
            .mechanism(sm.getName())
            .response(response)
            .build()
        : new AMQP.Connection.SecureOk.Builder().response(response).build();

     try {
         // StartOK 封装的帧发送给 broker
         Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();
         // 判断返回信息是否是 Tune
         if (serverResponse instanceof AMQP.Connection.Tune) {
             connTune = (AMQP.Connection.Tune) serverResponse;
         } else {
             challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
             response = sm.handleChallenge(challenge, this.username, this.password);
          }
        } catch (ShutdownSignalException e) {
         Method shutdownMethod = e.getReason();
         if (shutdownMethod instanceof AMQP.Connection.Close) {
             AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;
             if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
                 throw new AuthenticationFailureException(shutdownClose.getReplyText());
             }
         }
         throw new PossibleAuthenticationFailureException(e);
     }
} while (connTune == null);
...

ChannelManager

建立链接的源码中如下代码:

_channelManager = instantiateChannelManager(channelMax, threadFactory);

protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
        ChannelManager result = new ChannelManager(this._workService, channelMax, threadFactory, this.metricsCollector);
        configureChannelManager(result);
        return result;
    }

这里初始化了一个ChannelManager对象_channelManager

ChannelManager主要用来管理Channel的,channelNumber=0的除外,应为channelNumber=0是留给Connection的特殊的channelNumber。(上面提到的_channel0)。

ChannelManager的成员变量:

/** Monitor for <code>_channelMap</code> and <code>channelNumberAllocator</code> */
private final Object monitor = new Object();
/** Mapping from <code><b>1.._channelMax</b></code> to {@link ChannelN} instance */
private final Map<Integer, ChannelN> _channelMap = new HashMap<Integer, ChannelN>();
private final IntAllocator channelNumberAllocator;

private final ConsumerWorkService workService;

private final Set<CountDownLatch> shutdownSet = new HashSet<CountDownLatch>();

/** Maximum channel number available on this connection. */
private final int _channelMax;
private final ThreadFactory threadFactory;

先来看看开发中如何使用channel(ConnectionUtil部分未贴)

package com.jmmq.load.jim.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.stereotype.Component;

/**
 * 生产者
 */
@Component
public class Producer {
    public static void sendByExchange(String message) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null);
        // 声明 exchange
        channel.exchangeDeclare(ConnectionUtil.EXCHANGE_NAME, "fanout");
        // 交换机和队列绑定
        channel.queueBind(ConnectionUtil.QUEUE_NAME, ConnectionUtil.EXCHANGE_NAME, "");
        channel.basicPublish(ConnectionUtil.EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println("发送的信息为:" + message);
        channel.close();
        connection.close();
    }

}

/**
 * 消费者
 */
package com.jmmq.load.jim.mq;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {

    public static void getMessage() throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
//        channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null);
        DefaultConsumer deliverCallback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body, "UTF-8"));
                // 消息确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(ConnectionUtil.QUEUE_NAME, deliverCallback);
    }

    public static void main(String[] args) throws Exception {
        Consumer.getMessage();
    }
}

可以看到在获取channel的时候都是使用。

connection.createChannel();

我们来看下这里面做了什么。这里直接贴出来需要看的代码

@Override
public Channel createChannel() throws IOException {
    ensureIsOpen();
    ChannelManager cm = _channelManager;
    if (cm == null) return null;
    Channel channel = cm.createChannel(this);
    metricsCollector.newChannel(channel);
    return channel;
}

public Channel createChannel(int channelNumber) throws IOException {
    ensureIsOpen();
    ChannelManager cm = _channelManager;
    if (cm == null) return null;
    return cm.createChannel(this, channelNumber);
}

public ChannelN createChannel(AMQConnection connection) throws IOException {
    ChannelN ch;
    synchronized (this.monitor) {
        int channelNumber = channelNumberAllocator.allocate();
        if (channelNumber == -1) {
            return null;
        } else {
            ch = addNewChannel(connection, channelNumber);
        }
    }
    ch.open(); // now that it's been safely added
    return ch;
}

下面是ChannelManager中关于创建Channel的代码,一个是带了channelNumber的,一个是自动分片channelNumber的,分别对应AMQConnection中的两个方法。最后都调用addNewChannel方法。

public ChannelN createChannel(AMQConnection connection) throws IOException {
    ChannelN ch;
    synchronized (this.monitor) {
        int channelNumber = channelNumberAllocator.allocate();
        if (channelNumber == -1) {
            return null;
        } else {
            ch = addNewChannel(connection, channelNumber);
        }
    }
    ch.open(); // now that it's been safely added
    return ch;
}

private ChannelN addNewChannel(AMQConnection connection, int channelNumber) throws IOException {
    if (_channelMap.containsKey(channelNumber)) {
        // That number's already allocated! Can't do it
        // This should never happen unless something has gone
        // badly wrong with our implementation.
        throw new IllegalStateException("We have attempted to "
                + "create a channel with a number that is already in "
                + "use. This should never happen. "
                + "Please report this as a bug.");
    }
    ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);
    _channelMap.put(ch.getChannelNumber(), ch);
    return ch;
}

protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {
    return new ChannelN(connection, channelNumber, workService);
}

这里重点看下 ch.open()

/**
 * Package method: open the channel.
 * This is only called from {@link ChannelManager}.
 * @throws IOException if any problem is encountered
 */
public void open() throws IOException {
    // wait for the Channel.OpenOk response, and ignore it
    exnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND));
}

调用了AMQChannel的rpc方法,向broker发送了一个Channel.Open帧。

addNewChannel方法实际上是创建了一个ChannelN对象,然后置其于ChannelManager中的_channelMap中,方便管理。

private ChannelN addNewChannel(AMQConnection connection, int channelNumber) {
        if (_channelMap.containsKey(channelNumber)) {
            // That number's already allocated! Can't do it
            // This should never happen unless something has gone
            // badly wrong with our implementation.
            throw new IllegalStateException("We have attempted to "
                    + "create a channel with a number that is already in "
                    + "use. This should never happen. "
                    + "Please report this as a bug.");
        }
        ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);
        _channelMap.put(ch.getChannelNumber(), ch);
        return ch;
}

channelNumberAllocator是channelNumber的分配器,其原理是采用BitSet来实现channelNumber的分配。后面有时间会单独写一篇关于BitSet(java.util.BitSet)🐊。

这里再留一个尾巴🐊,上面我们看到了,使用 ChannelManager 创建的实际都是 ChannelN对象,后面在进行学习。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353

推荐阅读更多精彩内容