ActiveMQ源码解析(一)建立连接

作为一个消息中间件,有客户端和服务端两部分代码,这次的源码解析系列主要从客户端的代码入手,分成建立连接、消息发送、消息消费三个部分。趁着我昨天弄明白了源码编译的兴奋劲头还没过去,今天研究一下建立连接的部分。

如果读起来吃力,代码部分可以略过,我把主要的功能点给加粗。

通常来说,客户端使用MQ的API建立时,可以分成两个步骤:
1. 对于连接的配置,比如服务器IP地址,用户名和密码等等
2. 建立连接并启动
客户端示例代码:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username,password,url);  
ActiveMQConnection connection = connectionFactory.createConnection();  
connection.start();

可以看到主要的方法是ActiveMQConnectionFactory的构造函数,和createConnection(),以及connection中的start()方法。

ActiveMQConnectionFactory中的createConnection

构造函数比较简单,直接把传入的用户名密码和url放在变量里

public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
     setUserName(userName);
     setPassword(password);
     setBrokerURL(brokerURL.toString());
}

createConnection方法指向了createActiveMQConnection方法,该方法中主要做的事情有三个:
1. 建立Transport和通过Transport建立Connection
2. 配置Connection,建立好的Transport对象会被放到Connection对象中
3. 启动Transport

//建立Transport和通过Transport建立Connection
Transport transport = createTransport();
connection = createActiveMQConnection(transport, factoryStats);            
//配置
connection.setUserName(userName);            
connection.setPassword(password);            
configureConnection(connection);
//启动Transport
transport.start();

configureConnection(connection);这个方法的作用是对实例化出的ActiveMQConnetion对象中的参数的一系列配置,代码有点长就不上了。
对于我们来说其实主要想看的是连接是如何建立起来的,也就是

Transport transport = createTransport();
connection = createActiveMQConnection(transport, factoryStats);      

createTransport();方法中包含了对客户端传入的url的初步校验,主要是验证URL的合法性,而后调用工厂类TransportFactory.connection(url)来进行连接的建立。

我们客户端在建立连接的时候,有可能有TCP、UDP等等协议,AMQ实现了简单工厂类FactoryFinder,在TransportFactory.connection(url)方法中,先是通过FactoryFinder根据用户输入的url(比如tcp://192.168.0.1)来找到使用的协议工厂TcpTransportFactory,然后使用TcpTransportFactory中的类来进行连接的建立。这个过程从代码上来看有点曲折:
1. TransportFactory的connect()调用findTransportFactory方法
2. findTransportFactory调用FactoryFinder类的newInstance方法
3. newInstance调用ObjectFactory类的create方法
4. ObejctFactory是一个接口类,实现类是StandaloneObjectFactory,其中的create方法调用自身的loadClass方法
5. loadClass方法中最终找到正确的类,返回至TransportFactory中
6. 如果是tcp连接,最终得到的就是一个实例化的TcpTransportFactory类

public abstact class TransportFactory {
……
    private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");

    public static Transport connect(URI location) throws Exception {
        TransportFactory tf = findTransportFactory(location);
        return tf.doConnect(location);
    }

    public static TransportFactory findTransportFactory(URI location) throws IOException {
        //拆分url
        String scheme = location.getScheme();
        if (scheme == null) {
            throw new IOException("Transport not scheme specified: [" + location + "]");
        }
        TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
        if (tf == null) {
            // 调用FactoryFinder找到正确的TransportFactory
            try {
                tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
                TRANSPORT_FACTORYS.put(scheme, tf);
            } catch (Throwable e) {
                throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
            }
        }
        return tf;
    }
……
}
public class FactoryFinder {
……
    //通过ObjectFactory来找到正确的TransportFactory
    public Object newInstance(String key) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException {
        return objectFactory.create(path+key);
    }
……
}

ObjectFactory的设计也是很有趣的。AMQ在代码中的说法是之所以这么实现是因为这样如果用户想自己写一个ObjectFactory,也可以支持。

    /**
     * The strategy that the FactoryFinder uses to find load and instantiate Objects
     * can be changed out by calling the
     * {@link org.apache.activemq.util.FactoryFinder#setObjectFactory(org.apache.activemq.util.FactoryFinder.ObjectFactory)}
     * method with a custom implementation of ObjectFactory.
     *
     * The default ObjectFactory is typically changed out when running in a specialized container
     * environment where service discovery needs to be done via the container system.  For example,
     * in an OSGi scenario.
     */
    public interface ObjectFactory {
        /**
         * @param path the full service path
         * @return
         */
        public Object create(String path) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException;

    }

Anyway,我们现在通过这么曲折的过程得到了一个实例化的TcpTransportFactory对象,下一步应该是调用doConnect(url)方法进行连接的建立了。因为TcpTransportFactory继承了TransportFactory类,doConnect方法仍然是在TransportFactory中的:

    public Transport doConnect(URI location) throws Exception {
        try {
            //把url里关于Transport的配置提取出来,WireFormat基本都可以看成是url的配置。
            //如果使用Openwire(默认协议),那么WireFormat就是openwire的相关配置。
            //见http://activemq.apache.org/configuring-wire-formats.html
            Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
            if( !options.containsKey("wireFormat.host") ) {
                options.put("wireFormat.host", location.getHost());
            }
            WireFormat wf = createWireFormat(options);

            //建立socket连接
            Transport transport = createTransport(location, wf);

            //装饰者模式,在连接上包装上MutexTransportFilter、WireFormatNegotiator、InactivityMonitor、ResponseCorrelator四个功能
            Transport rc = configure(transport, wf, options);

            //remove auto
            IntrospectionSupport.extractProperties(options, "auto.");

            if (!options.isEmpty()) {
                throw new IllegalArgumentException("Invalid connect parameters: " + options);
            }
            return rc;
        } catch (URISyntaxException e) {
            throw IOExceptionSupport.create(e);
        }
    }

这个方法中主要有三个重要功能:
1. 配置wireformat
2. 建立TcpTransport连接
3. 在连接上包装四大辅助功能
其中配置WireFormat可以不看,建立TcpTransport其实是在调用createTransport(location, wf);时引用了TcpTransport的构造函数,代码如下:

    public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
                        URI localLocation) throws UnknownHostException, IOException {
        this.wireFormat = wireFormat;
        this.socketFactory = socketFactory;
        try {
            this.socket = socketFactory.createSocket();
        } catch (SocketException e) {
            this.socket = null;
        }
        this.remoteLocation = remoteLocation;
        this.localLocation = localLocation;
        this.initBuffer = null;
        setDaemon(false);
    }

这里直接调用了socketFactory.createSocket();,使用的是默认的方法,所以无法指定本地网卡建立连接。我看了下其实可以用socketFactory.createSocket(host, port, localHost, localPort)来改写,改写后就可以指定本地IP和端口了。

此外,查了下网络上的资料,四大辅助后续再看:

  1. MutexTransportFilter类实现了对每个请求的同步锁,同一时间只允许发送一个请求,如果有第二个请求需要等待第一个请求发送完毕才可继续发送。

  2. WireFormatNegotiator类实现了在客户端连接broker的时候先发送数据解析相关的协议信息,例如解析版本号,是否使用缓存等信息。

  3. InactivityMonitor类实现了连接成功后启动心跳检查机制,客户端每10秒发送一次心跳信息,服务端每30秒读一次心跳信息,如果没有读到则会断开连接,心跳检测是相互的,客户端也会每30秒读取服务端发送来的心跳信息,如果没有读到也一样会断开连接。

  4. ResponseCorrelator类实现了异步请求但需要获取响应信息否则就会阻塞等待功能。

ActiveMQConnection的Start()

在使用AMQ的过程中,很多用户问我为什么Connection需要start(),不能在createConnection的时候直接start了吗?而且不调用start方法其实不影响发送,但是会影响接收。抱着这样的疑惑,我们来看一下源码。

    /**
     * Starts (or restarts) a connection's delivery of incoming messages. A call
     * to <CODE>start</CODE> on a connection that has already been started is
     * ignored.
     *
     * @throws JMSException if the JMS provider fails to start message delivery
     *                 due to some internal error.
     * @see javax.jms.Connection#stop()
     */
    @Override
    public void start() throws JMSException {
        checkClosedOrFailed();
        ensureConnectionInfoSent();
        if (started.compareAndSet(false, true)) {
            for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
                ActiveMQSession session = i.next();
                session.start();
            }
        }
    }

源码里直接对start方法加了注释,说明start就是启动connection可以接收消息的功能。其实源码里可以很明显看出来start里包含了几个步骤:
1. 检查连接是否关闭或失效
2. 确认客户端的ConnectionInfo是否被送到服务器
3. 启动这个Connection中的每一个Session

我好奇的是第二步,看看源码

    /**
     * Send the ConnectionInfo to the Broker
     *
     * @throws JMSException
     */
    protected void ensureConnectionInfoSent() throws JMSException {
        synchronized(this.ensureConnectionInfoSentMutex) {
            // Can we skip sending the ConnectionInfo packet??
            if (isConnectionInfoSentToBroker || closed.get()) {
                return;
            }
            //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
            if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
                info.setClientId(clientIdGenerator.generateId());
            }
            syncSendPacket(info.copy(), getConnectResponseTimeout());

            this.isConnectionInfoSentToBroker = true;
            // Add a temp destination advisory consumer so that
            // We know what the valid temporary destinations are on the
            // broker without having to do an RPC to the broker.

            ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
            if (watchTopicAdvisories) {
                advisoryConsumer = new AdvisoryConsumer(this, consumerId);
            }
        }
    }

从源码里还能看到讨论和待办……我觉得我已经深入核心了……这个方法里做了两件事,

  1. 发送ConnectionInfo的数据包到服务端,如果info里没有用户自己设定的clientID,还会自动帮忙生成一个。发送的时候调用的是syncSendPacket方法,很明显是个同步发送,需要服务端同步返回response才算发送成功,我理解这里应该是一个试探连接的步骤。
  2. 建立一个通往临时目的地的消费者。所以其实每一个ActiveMQConnection的连接中都自动包含了一个消费者。我临时写了个客户端试了下,的确是存在的。
连接建立时的临时目的地

奇葩的是我就算不调用connection.start()方法,直接发送消息,这个临时消费者也是存在的,所以肯定是在消息发送的时候的哪个地方调用了connection的start方法。

至于为什么不调用start()方法就无法消费,我看了下session的start方法:

    /**
     * Start this Session.
     *
     * @throws JMSException
     */
    protected void start() throws JMSException {
        started.set(true);
        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
            ActiveMQMessageConsumer c = iter.next();
            c.start();
        }
        executor.start();
    }

原来是在session的start方法里启动了这个session里的consumer,想想session的建立过程的确是不需要调用session.start方法的,但是我们一般是先调用start方法,而后建立consumer,这个逻辑顺序还是有点错乱……
等下一次研究接收端的源码时再深入吧。

本次发现的源码优化点

1. socket建立时,使用不同的createSocket方法,指定本机IP和端口。
2. 项目用到了advisory message,每当agent建立/断开连接的时候,ActiveMQ.Advisory.Connection中会生成一条消息,这个消息中带上了ConnectionInfo。项目就是使用这个来即时检测agent的在线和离线状态的。因此如果我们改一下ConnectionInfo,加上agent的一些重要信息,比如agent版本号,操作系统类型,真实IP地址等等,会在获取agent信息的即时性上得到很大的提高。

我真的去试了一下……在ConnectionInfo里添加了一条test=test,然后重新编译服务端和客户端的依赖jar包,开启MQ的logging plugins,并且用客户端去监听了一下ActiveMQ.Advisory.Connection,得到了这样的结果。

服务器上开启logging后建立连接看到的ConnectionInfo
ConnectionInfo {commandId = 1, 
responseRequired = true, 
connectionId = ID:Air.local-51230-1502000963732-1:1, 
clientId = ID:Air.local-51230-1502000963732-0:1, 
clientIp = tcp://127.0.0.1:51231, 
userName = null, password = *****, 
test = test, 
brokerPath = null, 
brokerMasterConnector = false, 
manageable = true, 
clientMaster = true, 
faultTolerant = true, 
failoverReconnect = false}

成功!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 1.OkHttp源码解析(一):OKHttp初阶2 OkHttp源码解析(二):OkHttp连接的"前戏"——HT...
    隔壁老李头阅读 20,828评论 24 176
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,605评论 18 399
  • 06 我有一个闺密叫曾小姐。 曾小姐属性毒舌,平日里说话的杀伤力特别大,而且还时常嫌弃我,每天不损一下我她的屁股就...
    讲故事的余姑娘阅读 201评论 0 0
  • 雷雷的婚礼十分的冷清,他的家人只来了他妈妈,据说他爸爸已经要跟雷雷断绝关系了。 但是雷雷丝毫不为所动。他略微憔悴的...
    东方辞阅读 276评论 0 3