作为一个消息中间件,有客户端和服务端两部分代码,这次的源码解析系列主要从客户端的代码入手,分成建立连接、消息发送、消息消费三个部分。趁着我昨天弄明白了源码编译的兴奋劲头还没过去,今天研究一下建立连接的部分。
如果读起来吃力,代码部分可以略过,我把主要的功能点给加粗。
通常来说,客户端使用MQ的API建立时,可以分成两个步骤:
1. 对于连接的配置,比如服务器IP地址,用户名和密码等等
2. 建立连接并启动
客户端示例代码:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username,password,url);
ActiveMQConnection connection = connectionFactory.createConnection();
connection.start();
可以看到主要的方法是ActiveMQConnectionFactory的构造函数,和createConnection(),以及connection中的start()方法。
ActiveMQConnectionFactory中的createConnection
构造函数比较简单,直接把传入的用户名密码和url放在变量里
public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
setUserName(userName);
setPassword(password);
setBrokerURL(brokerURL.toString());
}
createConnection方法指向了createActiveMQConnection方法,该方法中主要做的事情有三个:
1. 建立Transport和通过Transport建立Connection
2. 配置Connection,建立好的Transport对象会被放到Connection对象中
3. 启动Transport
//建立Transport和通过Transport建立Connection
Transport transport = createTransport();
connection = createActiveMQConnection(transport, factoryStats);
//配置
connection.setUserName(userName);
connection.setPassword(password);
configureConnection(connection);
//启动Transport
transport.start();
configureConnection(connection);
这个方法的作用是对实例化出的ActiveMQConnetion对象中的参数的一系列配置,代码有点长就不上了。
对于我们来说其实主要想看的是连接是如何建立起来的,也就是
Transport transport = createTransport();
connection = createActiveMQConnection(transport, factoryStats);
createTransport();
方法中包含了对客户端传入的url的初步校验,主要是验证URL的合法性,而后调用工厂类TransportFactory.connection(url)来进行连接的建立。
我们客户端在建立连接的时候,有可能有TCP、UDP等等协议,AMQ实现了简单工厂类FactoryFinder,在TransportFactory.connection(url)
方法中,先是通过FactoryFinder根据用户输入的url(比如tcp://192.168.0.1)来找到使用的协议工厂TcpTransportFactory,然后使用TcpTransportFactory中的类来进行连接的建立。这个过程从代码上来看有点曲折:
1. TransportFactory的connect()调用findTransportFactory方法
2. findTransportFactory调用FactoryFinder类的newInstance方法
3. newInstance调用ObjectFactory类的create方法
4. ObejctFactory是一个接口类,实现类是StandaloneObjectFactory,其中的create方法调用自身的loadClass方法
5. loadClass方法中最终找到正确的类,返回至TransportFactory中
6. 如果是tcp连接,最终得到的就是一个实例化的TcpTransportFactory类
public abstact class TransportFactory {
……
private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
public static Transport connect(URI location) throws Exception {
TransportFactory tf = findTransportFactory(location);
return tf.doConnect(location);
}
public static TransportFactory findTransportFactory(URI location) throws IOException {
//拆分url
String scheme = location.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" + location + "]");
}
TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
if (tf == null) {
// 调用FactoryFinder找到正确的TransportFactory
try {
tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
TRANSPORT_FACTORYS.put(scheme, tf);
} catch (Throwable e) {
throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
}
}
return tf;
}
……
}
public class FactoryFinder {
……
//通过ObjectFactory来找到正确的TransportFactory
public Object newInstance(String key) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException {
return objectFactory.create(path+key);
}
……
}
ObjectFactory的设计也是很有趣的。AMQ在代码中的说法是之所以这么实现是因为这样如果用户想自己写一个ObjectFactory,也可以支持。
/**
* The strategy that the FactoryFinder uses to find load and instantiate Objects
* can be changed out by calling the
* {@link org.apache.activemq.util.FactoryFinder#setObjectFactory(org.apache.activemq.util.FactoryFinder.ObjectFactory)}
* method with a custom implementation of ObjectFactory.
*
* The default ObjectFactory is typically changed out when running in a specialized container
* environment where service discovery needs to be done via the container system. For example,
* in an OSGi scenario.
*/
public interface ObjectFactory {
/**
* @param path the full service path
* @return
*/
public Object create(String path) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException;
}
Anyway,我们现在通过这么曲折的过程得到了一个实例化的TcpTransportFactory对象,下一步应该是调用doConnect(url)方法进行连接的建立了。因为TcpTransportFactory继承了TransportFactory类,doConnect方法仍然是在TransportFactory中的:
public Transport doConnect(URI location) throws Exception {
try {
//把url里关于Transport的配置提取出来,WireFormat基本都可以看成是url的配置。
//如果使用Openwire(默认协议),那么WireFormat就是openwire的相关配置。
//见http://activemq.apache.org/configuring-wire-formats.html
Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
if( !options.containsKey("wireFormat.host") ) {
options.put("wireFormat.host", location.getHost());
}
WireFormat wf = createWireFormat(options);
//建立socket连接
Transport transport = createTransport(location, wf);
//装饰者模式,在连接上包装上MutexTransportFilter、WireFormatNegotiator、InactivityMonitor、ResponseCorrelator四个功能
Transport rc = configure(transport, wf, options);
//remove auto
IntrospectionSupport.extractProperties(options, "auto.");
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect parameters: " + options);
}
return rc;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
这个方法中主要有三个重要功能:
1. 配置wireformat
2. 建立TcpTransport连接
3. 在连接上包装四大辅助功能
其中配置WireFormat可以不看,建立TcpTransport其实是在调用createTransport(location, wf);
时引用了TcpTransport的构造函数,代码如下:
public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
URI localLocation) throws UnknownHostException, IOException {
this.wireFormat = wireFormat;
this.socketFactory = socketFactory;
try {
this.socket = socketFactory.createSocket();
} catch (SocketException e) {
this.socket = null;
}
this.remoteLocation = remoteLocation;
this.localLocation = localLocation;
this.initBuffer = null;
setDaemon(false);
}
这里直接调用了socketFactory.createSocket();
,使用的是默认的方法,所以无法指定本地网卡建立连接。我看了下其实可以用socketFactory.createSocket(host, port, localHost, localPort)
来改写,改写后就可以指定本地IP和端口了。
此外,查了下网络上的资料,四大辅助后续再看:
MutexTransportFilter类实现了对每个请求的同步锁,同一时间只允许发送一个请求,如果有第二个请求需要等待第一个请求发送完毕才可继续发送。
WireFormatNegotiator类实现了在客户端连接broker的时候先发送数据解析相关的协议信息,例如解析版本号,是否使用缓存等信息。
InactivityMonitor类实现了连接成功后启动心跳检查机制,客户端每10秒发送一次心跳信息,服务端每30秒读一次心跳信息,如果没有读到则会断开连接,心跳检测是相互的,客户端也会每30秒读取服务端发送来的心跳信息,如果没有读到也一样会断开连接。
ResponseCorrelator类实现了异步请求但需要获取响应信息否则就会阻塞等待功能。
ActiveMQConnection的Start()
在使用AMQ的过程中,很多用户问我为什么Connection需要start(),不能在createConnection的时候直接start了吗?而且不调用start方法其实不影响发送,但是会影响接收。抱着这样的疑惑,我们来看一下源码。
/**
* Starts (or restarts) a connection's delivery of incoming messages. A call
* to <CODE>start</CODE> on a connection that has already been started is
* ignored.
*
* @throws JMSException if the JMS provider fails to start message delivery
* due to some internal error.
* @see javax.jms.Connection#stop()
*/
@Override
public void start() throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
if (started.compareAndSet(false, true)) {
for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
ActiveMQSession session = i.next();
session.start();
}
}
}
源码里直接对start方法加了注释,说明start就是启动connection可以接收消息的功能。其实源码里可以很明显看出来start里包含了几个步骤:
1. 检查连接是否关闭或失效
2. 确认客户端的ConnectionInfo是否被送到服务器
3. 启动这个Connection中的每一个Session
我好奇的是第二步,看看源码
/**
* Send the ConnectionInfo to the Broker
*
* @throws JMSException
*/
protected void ensureConnectionInfoSent() throws JMSException {
synchronized(this.ensureConnectionInfoSentMutex) {
// Can we skip sending the ConnectionInfo packet??
if (isConnectionInfoSentToBroker || closed.get()) {
return;
}
//TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
info.setClientId(clientIdGenerator.generateId());
}
syncSendPacket(info.copy(), getConnectResponseTimeout());
this.isConnectionInfoSentToBroker = true;
// Add a temp destination advisory consumer so that
// We know what the valid temporary destinations are on the
// broker without having to do an RPC to the broker.
ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
if (watchTopicAdvisories) {
advisoryConsumer = new AdvisoryConsumer(this, consumerId);
}
}
}
从源码里还能看到讨论和待办……我觉得我已经深入核心了……这个方法里做了两件事,
- 发送ConnectionInfo的数据包到服务端,如果info里没有用户自己设定的clientID,还会自动帮忙生成一个。发送的时候调用的是syncSendPacket方法,很明显是个同步发送,需要服务端同步返回response才算发送成功,我理解这里应该是一个试探连接的步骤。
- 建立一个通往临时目的地的消费者。所以其实每一个ActiveMQConnection的连接中都自动包含了一个消费者。我临时写了个客户端试了下,的确是存在的。
奇葩的是我就算不调用connection.start()方法,直接发送消息,这个临时消费者也是存在的,所以肯定是在消息发送的时候的哪个地方调用了connection的start方法。
至于为什么不调用start()方法就无法消费,我看了下session的start方法:
/**
* Start this Session.
*
* @throws JMSException
*/
protected void start() throws JMSException {
started.set(true);
for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
ActiveMQMessageConsumer c = iter.next();
c.start();
}
executor.start();
}
原来是在session的start方法里启动了这个session里的consumer,想想session的建立过程的确是不需要调用session.start方法的,但是我们一般是先调用start方法,而后建立consumer,这个逻辑顺序还是有点错乱……
等下一次研究接收端的源码时再深入吧。
本次发现的源码优化点
1. socket建立时,使用不同的createSocket方法,指定本机IP和端口。
2. 项目用到了advisory message,每当agent建立/断开连接的时候,ActiveMQ.Advisory.Connection中会生成一条消息,这个消息中带上了ConnectionInfo。项目就是使用这个来即时检测agent的在线和离线状态的。因此如果我们改一下ConnectionInfo,加上agent的一些重要信息,比如agent版本号,操作系统类型,真实IP地址等等,会在获取agent信息的即时性上得到很大的提高。
我真的去试了一下……在ConnectionInfo里添加了一条test=test,然后重新编译服务端和客户端的依赖jar包,开启MQ的logging plugins,并且用客户端去监听了一下ActiveMQ.Advisory.Connection,得到了这样的结果。
ConnectionInfo {commandId = 1,
responseRequired = true,
connectionId = ID:Air.local-51230-1502000963732-1:1,
clientId = ID:Air.local-51230-1502000963732-0:1,
clientIp = tcp://127.0.0.1:51231,
userName = null, password = *****,
test = test,
brokerPath = null,
brokerMasterConnector = false,
manageable = true,
clientMaster = true,
faultTolerant = true,
failoverReconnect = false}
成功!