MQ!Rabbit-client Channel
继续看建立链接的源码。 建立链接的源码
AMQChannel
看看conn的启动,下面这俩段代码,这部分也是是补充 MQ!Rabbit-client command里面最后的尾巴的
public void start() throws IOException, TimeoutException {
...
_channel0.enqueueRpc(connStartBlocker);
...
Method serverResponse =
_channel0.rpc(method, handshakeTimeout/2).getMethod();
}
首先要知道这个 _channel0
是个什么东西。查看AMQConnection
类(start方法就是这个类的)
private final AMQChannel _channel0;
先看下AMQChannel
的成员变量
// 对象锁
protected final Object _channelMutex = new Object();
// AMQConnection 对象
private final AMQConnection _connection;
// 通道编号为0的代表全局连接中的所有帧,1-65535代表特定通道的帧.
private final int _channelNumber;
// 内部处理使用的对象,调用AMQCommand的方法来处理一些东西。
private AMQCommand _command = new AMQCommand();
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
// 当前未处理完的rpc请求
private RpcContinuation _activeRpc = null;
/** Whether transmission of content-bearing methods should be blocked */
public volatile boolean _blockContent = false;
那么来看下这个_channel0.enqueueRpc(connStartBlocker)
对应源码干了啥
public void enqueueRpc(RpcContinuation k){
// 根据传入的 RpcContinuation构造RpcContinuationRpcWrapper对象的一个方法
doEnqueueRpc(() -> new RpcContinuationRpcWrapper(k));
}
private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {
synchronized (_channelMutex) {
boolean waitClearedInterruptStatus = false;
while (_activeRpc != null) {
try {
_channelMutex.wait();
} catch (InterruptedException e) {
waitClearedInterruptStatus = true;
}
}
if (waitClearedInterruptStatus) {
Thread.currentThread().interrupt();
}
// RpcContinuationRpcWrapper 获取对象赋值给 _activeRpc
// 属性: private RpcWrapper _activeRpc = null;
_activeRpc = rpcWrapperSupplier.get();
}
}
可以看到,最后是将rpc对象交给当前对象。当前对象是AMQChannel
的实例即_channel0
,所以
// 将rpc任务放入channel
_channel0.enqueueRpc(connStartBlocker);
在看_channel0.rpc(method, handshakeTimeout/2).getMethod();
干了什么
这里先把调用链写出来(都是AMQChannel的方法),敲黑板😶,注意看最后一个方法
rpc(Method m, int timeout)
privateRpc(Method m, int timeout)
ensureIsOpen(); quiescingRpc(m, k);
enqueueRpc(k); quiescingTransmit(m);
quiescingTransmit(new AMQCommand(m));
transmit(AMQChannel channel) throws IOException
最后一个方法就是上一篇 MQ!Rabbit-client command说的核心方法.源码都不想贴了,还是贴上凑字数吧
public AMQCommand rpc(Method m, int timeout)
throws IOException, ShutdownSignalException, TimeoutException {
return privateRpc(m, timeout);
}
// 获取 command
private AMQCommand privateRpc(Method m, int timeout)
throws IOException, ShutdownSignalException, TimeoutException {
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
rpc(m, k);
try {
return k.getReply(timeout);
} catch (TimeoutException e) {
cleanRpcChannelState();
throw e;
}
}
public void rpc(Method m, RpcContinuation k) throws IOException {
synchronized (_channelMutex) {
ensureIsOpen();
quiescingRpc(m, k);
}
}
public void quiescingRpc(Method m, RpcContinuation k)
throws IOException {
synchronized (_channelMutex) {
enqueueRpc(k);
quiescingTransmit(m);
}
}
public void quiescingTransmit(Method m) throws IOException {
synchronized (_channelMutex) {
quiescingTransmit(new AMQCommand(m));
}
}
public void quiescingTransmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
if (c.getMethod().hasContent()) {
while (_blockContent) {
try {
_channelMutex.wait();
} catch (InterruptedException ignored) {}
// This is to catch a situation when the thread wakes up during
// shutdown. Currently, no command that has content is allowed
// to send anything in a closing state.
ensureIsOpen();
}
}
c.transmit(this);
}
}
// 将Command转换为多个Frame并且发送
public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();
synchronized (assembler) {
Method m = this.assembler.getMethod();
connection.writeFrame(m.toFrame(channelNumber));
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();
connection.writeFrame(this.assembler.getContentHeader()
.toFrame(channelNumber, body.length));
int frameMax = connection.getFrameMax();
int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax
- EMPTY_FRAME_SIZE;
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
int remaining = body.length - offset;
int fragmentLength = (remaining < bodyPayloadMax) ? remaining
: bodyPayloadMax;
Frame frame = Frame.fromBodyFragment(channelNumber, body,
offset, fragmentLength);
connection.writeFrame(frame);
}
}
}
connection.flush();
}
看了上面的源码,我们再来分析下创建链接start方法中的一段源码
...
// connStart处理这里不展示了
AMQP.Connection.Start connStart;
AMQP.Connection.Tune connTune = null;
...
// 循环处理,直到TuneOk
do {
// StartOK 封装为一个method帧
Method method = (challenge == null)
? new AMQP.Connection.StartOk.Builder()
.clientProperties(_clientProperties)
.mechanism(sm.getName())
.response(response)
.build()
: new AMQP.Connection.SecureOk.Builder().response(response).build();
try {
// StartOK 封装的帧发送给 broker
Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();
// 判断返回信息是否是 Tune
if (serverResponse instanceof AMQP.Connection.Tune) {
connTune = (AMQP.Connection.Tune) serverResponse;
} else {
challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
response = sm.handleChallenge(challenge, this.username, this.password);
}
} catch (ShutdownSignalException e) {
Method shutdownMethod = e.getReason();
if (shutdownMethod instanceof AMQP.Connection.Close) {
AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;
if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
throw new AuthenticationFailureException(shutdownClose.getReplyText());
}
}
throw new PossibleAuthenticationFailureException(e);
}
} while (connTune == null);
...
ChannelManager
建立链接的源码中如下代码:
_channelManager = instantiateChannelManager(channelMax, threadFactory);
protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
ChannelManager result = new ChannelManager(this._workService, channelMax, threadFactory, this.metricsCollector);
configureChannelManager(result);
return result;
}
这里初始化了一个ChannelManager
对象_channelManager
。
ChannelManager
主要用来管理Channel
的,channelNumber=0的除外,应为channelNumber=0是留给Connection的特殊的channelNumber。(上面提到的_channel0)。
ChannelManager
的成员变量:
/** Monitor for <code>_channelMap</code> and <code>channelNumberAllocator</code> */
private final Object monitor = new Object();
/** Mapping from <code><b>1.._channelMax</b></code> to {@link ChannelN} instance */
private final Map<Integer, ChannelN> _channelMap = new HashMap<Integer, ChannelN>();
private final IntAllocator channelNumberAllocator;
private final ConsumerWorkService workService;
private final Set<CountDownLatch> shutdownSet = new HashSet<CountDownLatch>();
/** Maximum channel number available on this connection. */
private final int _channelMax;
private final ThreadFactory threadFactory;
先来看看开发中如何使用channel(ConnectionUtil
部分未贴)
package com.jmmq.load.jim.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.stereotype.Component;
/**
* 生产者
*/
@Component
public class Producer {
public static void sendByExchange(String message) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null);
// 声明 exchange
channel.exchangeDeclare(ConnectionUtil.EXCHANGE_NAME, "fanout");
// 交换机和队列绑定
channel.queueBind(ConnectionUtil.QUEUE_NAME, ConnectionUtil.EXCHANGE_NAME, "");
channel.basicPublish(ConnectionUtil.EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("发送的信息为:" + message);
channel.close();
connection.close();
}
}
/**
* 消费者
*/
package com.jmmq.load.jim.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void getMessage() throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null);
DefaultConsumer deliverCallback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
// 消息确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(ConnectionUtil.QUEUE_NAME, deliverCallback);
}
public static void main(String[] args) throws Exception {
Consumer.getMessage();
}
}
可以看到在获取channel的时候都是使用。
connection.createChannel();
我们来看下这里面做了什么。这里直接贴出来需要看的代码
@Override
public Channel createChannel() throws IOException {
ensureIsOpen();
ChannelManager cm = _channelManager;
if (cm == null) return null;
Channel channel = cm.createChannel(this);
metricsCollector.newChannel(channel);
return channel;
}
public Channel createChannel(int channelNumber) throws IOException {
ensureIsOpen();
ChannelManager cm = _channelManager;
if (cm == null) return null;
return cm.createChannel(this, channelNumber);
}
public ChannelN createChannel(AMQConnection connection) throws IOException {
ChannelN ch;
synchronized (this.monitor) {
int channelNumber = channelNumberAllocator.allocate();
if (channelNumber == -1) {
return null;
} else {
ch = addNewChannel(connection, channelNumber);
}
}
ch.open(); // now that it's been safely added
return ch;
}
下面是ChannelManager中关于创建Channel的代码,一个是带了channelNumber的,一个是自动分片channelNumber的,分别对应AMQConnection中的两个方法。最后都调用addNewChannel方法。
public ChannelN createChannel(AMQConnection connection) throws IOException {
ChannelN ch;
synchronized (this.monitor) {
int channelNumber = channelNumberAllocator.allocate();
if (channelNumber == -1) {
return null;
} else {
ch = addNewChannel(connection, channelNumber);
}
}
ch.open(); // now that it's been safely added
return ch;
}
private ChannelN addNewChannel(AMQConnection connection, int channelNumber) throws IOException {
if (_channelMap.containsKey(channelNumber)) {
// That number's already allocated! Can't do it
// This should never happen unless something has gone
// badly wrong with our implementation.
throw new IllegalStateException("We have attempted to "
+ "create a channel with a number that is already in "
+ "use. This should never happen. "
+ "Please report this as a bug.");
}
ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);
_channelMap.put(ch.getChannelNumber(), ch);
return ch;
}
protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {
return new ChannelN(connection, channelNumber, workService);
}
这里重点看下 ch.open()
/**
* Package method: open the channel.
* This is only called from {@link ChannelManager}.
* @throws IOException if any problem is encountered
*/
public void open() throws IOException {
// wait for the Channel.OpenOk response, and ignore it
exnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND));
}
调用了AMQChannel
的rpc方法,向broker发送了一个Channel.Open帧。
addNewChannel
方法实际上是创建了一个ChannelN对象,然后置其于ChannelManager中的_channelMap中,方便管理。
private ChannelN addNewChannel(AMQConnection connection, int channelNumber) {
if (_channelMap.containsKey(channelNumber)) {
// That number's already allocated! Can't do it
// This should never happen unless something has gone
// badly wrong with our implementation.
throw new IllegalStateException("We have attempted to "
+ "create a channel with a number that is already in "
+ "use. This should never happen. "
+ "Please report this as a bug.");
}
ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);
_channelMap.put(ch.getChannelNumber(), ch);
return ch;
}
channelNumberAllocator是channelNumber的分配器,其原理是采用BitSet
来实现channelNumber的分配。后面有时间会单独写一篇关于BitSet
(java.util.BitSet)🐊。
这里再留一个尾巴🐊,上面我们看到了,使用 ChannelManager
创建的实际都是 ChannelN
对象,后面在进行学习。