Smack源码解析

  • [Smack]
  • [1 初始化]
    • [1.1 初始化启动类]
  • [2 连接与登录]
    • [2.1 创建连接类]
    • [2.2 连接]
    • [2.3 登录]
  • [3 消息的发送]
  • [4 消息的接收]

本文分析Smack源码版本为4.4.0-alpha6

Smack简介

Smack是一个开源,易于使用的XMPP(jabber)客户端类库。
Smack是用Java编写的开放源代码,高度模块化,易于使用的XMPP客户端库,用于Java SE兼容JVM和Android。它是纯Java库,可以嵌入到您的应用程序中,以创建从完整的XMPP即时消息客户端到简单的XMPP集成(例如发送通知消息和支持状态的设备)的任何内容。Smack和XMPP允许您在(M2M,IoT,…)之间以多种方式轻松交换数据,例如“发后不管”,“发布订阅”。

Smack官方资料:

官网地址:https://www.igniterealtime.org/
Github:https://github.com/igniterealtime/Smack/tree/master
文档:https://download.igniterealtime.org/smack/docs/latest/documentation/
Javadoc: http://www.igniterealtime.org/builds/smack/dailybuilds/javadoc/

Smack Modules:

Smack可以轻松地嵌入到任何现有的Java应用程序中。该库作为几个模块提供,可为应用程序所需的功能提供更大的灵活性:

  • smack-core-提供核心XMPP功能。包括XMPP RFC中的所有XMPP功能。
  • smack-im -提供RFC 6121(XMPP-IM)中定义的功能,例如名册。
  • smack-tcp-支持基于TCP的XMPP。包括您通常要使用的XMPPTCPConnection类
  • smack-extensions-支持XMPP标准基金会定义的许多扩展(XEP),包括多用户聊天,文件传输,用户搜索等。这些扩展记录在扩展手册中。
  • smack-experimental-支持XMPP标准基金会定义的实验扩展(XEP)。这些扩展的API和功能应视为不稳定。
  • smack-legacy -支持XMPP标准基金会定义的旧版扩展(XEP)。
  • smack-bosh-支持BOSH(XEP-0124)。此代码应视为Beta。
  • smack-resolver-minidns-支持在MiniDNS的帮助下解析DNS SRV记录。不支持javax.naming API的平台的理想选择。还支持DNSSEC。
  • smack-resolver-dnsjava -支持在dnsjava的帮助下解析DNS SRV记录。
  • smack-resolver-javax -支持使用javax名称空间API解析DNS SRV记录。
  • smack-debug-用于协议流量的增强型GUI调试器。在类路径中找到并启用调试后,它将自动使用它。
  1. 初始化

  2. 1初始化启动类

1.1.1 SmackInitialization类

配置文件:

初始化是通过配置文件完成的。在smack-core模块org.jivesoftware.smack / smack-config.xml。默认情况下,Smack将把此文件打入jar中。此特定配置包含要加载的初始化程序类的列表。此初始化器列表中包含所有需要初始化的管理器类型类。

smack-config.xml

<?xml version="1.0"?>
<!-- Smack configuration file. -->
<smack>
    <!-- Classes that will be loaded when Smack starts -->
    <startupClasses>
        <className>org.jivesoftware.smack.initializer.VmArgInitializer</className>
        <className>org.jivesoftware.smack.ReconnectionManager</className>
    </startupClasses>

    <optionalStartupClasses>
        <className>org.jivesoftware.smack.util.dns.javax.JavaxResolver</className>
        <className>org.jivesoftware.smack.util.dns.minidns.MiniDnsResolver</className>
        <className>org.jivesoftware.smack.util.dns.dnsjava.DNSJavaResolver</className>
        <className>org.jivesoftware.smack.extensions.ExtensionsInitializer</className>
        <className>org.jivesoftware.smack.experimental.ExperimentalInitializer</className>
        <className>org.jivesoftware.smack.legacy.LegacyInitializer</className>
        <className>org.jivesoftware.smack.tcp.TCPInitializer</className>
        <className>org.jivesoftware.smack.sasl.javax.SASLJavaXSmackInitializer</className>
        <className>org.jivesoftware.smack.sasl.provided.SASLProvidedSmackInitializer</className>
        <className>org.jivesoftware.smack.android.AndroidSmackInitializer</className>
        <className>org.jivesoftware.smack.java7.Java7SmackInitializer</className>
        <className>org.jivesoftware.smack.im.SmackImInitializer</className>
        <className>org.jivesoftware.smackx.omemo.OmemoInitializer</className>
        <className>org.jivesoftware.smackx.ox.util.OpenPgpInitializer</className>
    </optionalStartupClasses>
</smack>

然而配置文件是哪里加载的呢?其实是在SmackInitialization类,他在静态代码块中进行的加载,这里贴出关键代码:

private static final String DEFAULT_CONFIG_FILE = "org.jivesoftware.smack/smack-config.xml";
//...
static {
    //....
    //....
    
    InputStream configFileStream;
    try {
        //1.在这里得到文件的输入流
        configFileStream = FileUtils.getStreamForClasspathFile(DEFAULT_CONFIG_FILE, null);
    }
    catch (Exception e) {
        throw new IllegalStateException("Could not load Smack configuration file", e);
    }

    try {
        //2.
        processConfigFile(configFileStream, null);
    }
    catch (Exception e) {
        throw new IllegalStateException("Could not parse Smack configuration file", e);
    }

    // Add the Java7 compression handler first, since it's preferred
    SmackConfiguration.addCompressionHandler(new Java7ZlibInputOutputStream());
    
    //....
    //....
}

我们在看看第二步中processConfigFile做了什么操作。

public static void processConfigFile(InputStream cfgFileStream,
                Collection<Exception> exceptions) throws Exception {
    processConfigFile(cfgFileStream, exceptions, SmackInitialization.class.getClassLoader());
}
public static void processConfigFile(InputStream cfgFileStream,
                Collection<Exception> exceptions, ClassLoader classLoader) throws Exception {
    XmlPullParser parser = PacketParserUtils.getParserFor(cfgFileStream);
    XmlPullParser.Event eventType = parser.getEventType();
    do {
        if (eventType == XmlPullParser.Event.START_ELEMENT) {
            //2.
            if (parser.getName().equals("startupClasses")) {
                parseClassesToLoad(parser, false, exceptions, classLoader);
            }
            else if (parser.getName().equals("optionalStartupClasses")) {
                parseClassesToLoad(parser, true, exceptions, classLoader);
            }
        }
        eventType = parser.next();
    }
    //1.判断是否是节点结束 循环解析
    while (eventType != XmlPullParser.Event.END_DOCUMENT);
    CloseableUtil.maybeClose(cfgFileStream, LOGGER);
}

可以看见它把流通过XmlPullParser解析出startupClasses、optionalStartupClasses两个节点中的内容。也就是我们之前 smack-config.xml中的节点。
并且都是调用parseClassesToLoad方法,区别只在于第二个参数的boolen不一样,其实这个boolen关系不大,在后面我们就知道了。

继续看它的parseClassesToLoad方法做了什么:

private static void parseClassesToLoad(XmlPullParser parser, boolean optional,
                Collection<Exception> exceptions, ClassLoader classLoader)
                throws Exception {
    final String startName = parser.getName();
    XmlPullParser.Event eventType;
    outerloop: do {//1.循环解析出className
        eventType = parser.next();
        if (eventType == XmlPullParser.Event.START_ELEMENT && "className".equals(parser.getName())) {
            String classToLoad = parser.nextText();
            if (SmackConfiguration.isDisabledSmackClass(classToLoad)) {
                continue outerloop;
            }

            try {
                //2.方法中进行反射加载类 参数classToLoad为节点中className的内容。
                loadSmackClass(classToLoad, optional, classLoader);
            } catch (Exception e) {
                // Don't throw the exception if an exceptions collection is given, instead
                // record it there. This is used for unit testing purposes.
                if (exceptions != null) {
                    exceptions.add(e);
                } else {
                    throw e;
                }
            }
        }
    }
    while (!(eventType == XmlPullParser.Event.END_ELEMENT && startName.equals(parser.getName())));
}

可以看到第一步循环出className节点的内容,第二步把className内容String传给了loadSmackClass方法进行反射加载。

调用loadSmackClass方法:

    private static void loadSmackClass(String className, boolean optional, ClassLoader classLoader) throws Exception {
        Class<?> initClass;
        try {
            // Attempt to load and initialize the class so that all static initializer blocks of
            // class are executed
            
            //1.反射类的class
            initClass = Class.forName(className, true, classLoader);
        }
        catch (ClassNotFoundException cnfe) {
            Level logLevel;
            if (optional) {
                logLevel = Level.FINE;
            }
            else {
                logLevel = Level.WARNING;
            }
            LOGGER.log(logLevel, "A startup class '" + className + "' could not be loaded.");
            if (!optional) {
                throw cnfe;
            } else {
                return;
            }
        }
        //2.判断传入class类型是否是SmackInitializer类型或者其实现类
        if (SmackInitializer.class.isAssignableFrom(initClass)) {
            //3.创建实例对象
            SmackInitializer initializer = (SmackInitializer) initClass.getConstructor().newInstance();
            //4.调用接口initialize方法,其实也就是调用实例的initialize方法
            List<Exception> exceptions = initializer.initialize();
            if (exceptions == null || exceptions.size() == 0) {
                LOGGER.log(Level.FINE, "Loaded SmackInitializer " + className);
            } else {
                for (Exception e : exceptions) {
                    LOGGER.log(Level.SEVERE, "Exception in loadSmackClass", e);
                }
            }
        } else {
            LOGGER.log(Level.FINE, "Loaded " + className);
        }
    }

第一步:反射类的class。
第二步:判断传入class类型是否是SmackInitializer类型或者其实现类
第三步:创建实例对象
第四步:调用接口initialize方法,其实也就是调用实例的initialize方法

我们现在看一下SmackInitializer类,其实是一个接口。

package org.jivesoftware.smack.initializer;

import java.util.List;

import org.jivesoftware.smack.SmackConfiguration;

/**
 * Defines an initialization class that will be instantiated and invoked by the {@link SmackConfiguration} class during initialization.
 *
 * <p>
 * Any implementation of this class MUST have a default constructor.
 *
 * @author Robin Collier
 *
 */
public interface SmackInitializer {
    List<Exception> initialize();
}

现在可以看出第二步判断里的逻辑为:SmackInitializer接口的实现类调用initialize()方法。
好,这时候我们在回过头看看我们smack-config.xml中配置的内容。

VmArgInitializer类

public class VmArgInitializer extends UrlInitializer {

    protected String getFilePath() {
        return System.getProperty("smack.provider.file");
    }

    @Override
    public List<Exception> initialize() {
        if (getFilePath() != null) {
            super.initialize();
        }
        return Collections.emptyList();
    }
}

父类UrlInitializer

public abstract class UrlInitializer implements SmackInitializer {
    //...
       @Override
    public List<Exception> initialize() {
        //...
        //...
    }
    //...
}

可以看见这个类是实现了SmackInitializer接口的。

在看看[smack-config.xml]中的第二个配置类
ReconnectionManager

public final class ReconnectionManager {
    //...
    //...
    static {
        XMPPConnectionRegistry.addConnectionCreationListener(new ConnectionCreationListener() {
            @Override
            public void connectionCreated(XMPPConnection connection) {
                if (connection instanceof AbstractXMPPConnection) {
                    ReconnectionManager.getInstanceFor((AbstractXMPPConnection) connection);
                }
            }
        });
    }
    //...
    //...
}

可以看到这个类并没有实现SmackInitializer接口,只是类中有静态代码块执行而已。

在看看[smack-config.xml]中的第三个配置类
JavaxResolver

public class JavaxResolver extends DNSResolver implements SmackInitializer {
    //...
    //...
    static {
        try {
            Hashtable<String, String> env = new Hashtable<>();
            env.put("java.naming.factory.initial", "com.sun.jndi.dns.DnsContextFactory");
            dirContext = new InitialDirContext(env);
        } catch (NamingException e) {
            LOGGER.log(Level.SEVERE, "Could not construct InitialDirContext", e);
        }
    
        // Try to set this DNS resolver as primary one
        setup();
    }

    public static void setup() {
        DNSUtil.setDNSResolver(getInstance());
    }

    //...
    //...

    @Override
    public List<Exception> initialize() {
        setup();
        return null;
    }
    //...
    //...
}

这个类也实现了SmackInitializer 接口。

然而实现类方法到底执行了什么呢,这里我们举个栗子,其他的就不一一给大家分析了。
看好,就是这个栗子:SmackImInitializer

public class SmackImInitializer extends UrlInitializer {

    @Override
    protected String getProvidersUri() {
        return "classpath:org.jivesoftware.smack.im/smackim.providers";
    }

    @Override
    protected String getConfigUri() {
        return "classpath:org.jivesoftware.smack.im/smackim.xml";
    }

}

父类UrlInitializer

public abstract class UrlInitializer implements SmackInitializer {
    private static final Logger LOGGER = Logger.getLogger(UrlInitializer.class.getName());

    //1.接口的实现方法
    @Override
    public List<Exception> initialize() {
        InputStream is = null;
        final ClassLoader classLoader = this.getClass().getClassLoader();
        final List<Exception> exceptions = new LinkedList<Exception>();
        
        //2.getProvidersUri() 有些子类对其进行了方法重载,所以具体获取要看子类是否重载
        final String providerUriString = getProvidersUri();
        if (providerUriString != null) {
            try {
                final URI providerUri = URI.create(providerUriString);
                is = FileUtils.getStreamForUri(providerUri, classLoader);

                LOGGER.log(Level.FINE, "Loading providers for providerUri [" + providerUri + "]");
                //3.此方法中进行类加载到内存操作
                ProviderFileLoader pfl = new ProviderFileLoader(is, classLoader);
                ProviderManager.addLoader(pfl);
                exceptions.addAll(pfl.getLoadingExceptions());
            }
            catch (Exception e) {
                LOGGER.log(Level.SEVERE, "Error trying to load provider file " + providerUriString, e);
                exceptions.add(e);
            } finally {
                maybeClose(is);
            }
        }

        //4.getConfigUri() 有些子类对其进行了方法重载,所以具体获取要看子类是否重载
        final String configUriString = getConfigUri();
        if (configUriString != null) {
            try {
                final URI configUri = URI.create(configUriString);
                is = FileUtils.getStreamForUri(configUri, classLoader);
                //5.调用加载类操作
                SmackInitialization.processConfigFile(is, exceptions, classLoader);
            }
            catch (Exception e) {
                exceptions.add(e);
            } finally {
                maybeClose(is);
            }
        }
        return exceptions;
    }

    protected String getProvidersUri() {
        return null;
    }

    protected String getConfigUri() {
        return null;
    }

    private static void maybeClose(InputStream is) {
        CloseableUtil.maybeClose(is, LOGGER);
    }
}

第一步:SmackImInitializer实例会调用父类UrlInitializer的initialize方法。
第二步:getProvidersUri()方法,因为SmackImInitializer对其进行了重载,所以这里看SmackImInitializer中getProvidersUri()的方法

 protected String getProvidersUri() {
        return "classpath:org.jivesoftware.smack.im/smackim.providers";
 }

可以看见是一个String路径,这个路径文件其实是在SmackImInitializer类所在的smack-im moudle下,我们看一下内容:
smackim.providers

<?xml version="1.0"?> 
<smackProviders>

    <!-- RFC-6121: Extensible Messaging and Presence Protocol (XMPP): Instant Messaging and Presence -->
    <iqProvider>
        <elementName>query</elementName>
        <namespace>jabber:iq:roster</namespace>
        <className>org.jivesoftware.smack.roster.provider.RosterPacketReNameProvider</className>
    </iqProvider>
    <streamFeatureProvider>
        <elementName>sub</elementName>
        <namespace>urn:xmpp:features:pre-approval</namespace>
        <className>org.jivesoftware.smack.roster.provider.SubscriptionPreApprovalStreamFeatureProvider</className>
    </streamFeatureProvider>

    <!-- XEP-0237: Roster Versioning -->
    <streamFeatureProvider>
        <elementName>ver</elementName>
        <namespace>urn:xmpp:features:rosterver</namespace>
        <className>org.jivesoftware.smack.roster.provider.RosterVerStreamFeatureProvider</className>
    </streamFeatureProvider>

</smackProviders>

第三步:ProviderFileLoader pfl = new ProviderFileLoader(is, classLoader);
把smackim.providers配置文件的输入流传入。

第四步:这里在看看ProviderFileLoader中做了什么?

public class ProviderFileLoader implements ProviderLoader {

    @SuppressWarnings("unchecked")
    public ProviderFileLoader(InputStream providerStream, ClassLoader classLoader) {
        //1.这里省略部分代码
        //其实就是把InputStream通过XmlPullParser拿到className节点信息.
        //...
        try {
            //2.加载class
            final Class<?> provider = classLoader.loadClass(className);
            switch (typeName) {
            case "iqProvider":
                //3.节点为iqProvider 创建实例类并加入内存iqProviders
                if (IqProvider.class.isAssignableFrom(provider)) {
                    IqProvider<IQ> iqProvider = (IqProvider<IQ>) provider.getConstructor().newInstance();
                    iqProviders.add(new IQProviderInfo(elementName, namespace, iqProvider));
                }
                else {
                    exceptions.add(new IllegalArgumentException(className + " is not a IQProvider"));
                }
                break;
            case "extensionProvider":
                //...
                //...
                break;
            case "streamFeatureProvider":
                //4.节点为streamFeatureProvider创建实例类并加入内存sfProviders
                ExtensionElementProvider<ExtensionElement> streamFeatureProvider = (ExtensionElementProvider<ExtensionElement>) provider.getConstructor().newInstance();
                sfProviders.add(new StreamFeatureProviderInfo(elementName,
                                namespace,
                                streamFeatureProvider));
                break;
            default:
                LOGGER.warning("Unknown provider type: " + typeName);
            }
        }
        catch (ClassNotFoundException cnfe) {
            LOGGER.log(Level.SEVERE, "Could not find provider class", cnfe);
            exceptions.add(cnfe);
        }
        catch (InstantiationException ie) {
            LOGGER.log(Level.SEVERE, "Could not instanciate " + className, ie);
            exceptions.add(ie);
        }
        
        //...
        //...
    }
    
}

1.把传入的配置文件流InputStream通过XmlPullParser拿到className节点信息.
2.加载配置文件中的class。
3.通过switch判断是iqProvider节点的加入到iqProviders中,是streamFeatureProvider节点的加入到sfProviders中。

其实smackim.providers文件跟 smack-config.xml文件一样都是配置文件,只不过smackim.providers是属于一个moudle中的配置文件,而smack-config.xml是核心smack-core Moudle中总得配置文件而已(及Smack配置文件)。

这里梳理一下:所有初始化都是在SmackInitialization的静态代码块中执行的。也就是说那些地方创建获取调用了此类方法,此类的静态代码块就被会执行。

最后看一下那些地方调用了这个方法:
SmackConfiguration类中调用了他获取版本的方法。


SmackConfiguration

在看看那些调用了SmackConfiguration.getVersion();


SmackConfigurationGetVersion

可以看到很多类中都有调用,并且关键的AbstractXMPPConnection连接类静态代码块中也调用了此方法。
所以我们得出结论:只要你创建对象进行连接操作,框架内部就会帮你初始化好配置。

1.1.2 SmackConfiguration类

这里截图这个类的部分代码

public final class SmackConfiguration {
    //...
    //...

    public static boolean DEBUG = false;

    private static SmackDebuggerFactory DEFAULT_DEBUGGER_FACTORY = ReflectionDebuggerFactory.INSTANCE;

    /**
     * The default parsing exception callback is {@link ExceptionThrowingCallback} which will
     * throw an exception and therefore disconnect the active connection.
     */
    private static ParsingExceptionCallback defaultCallback = new ExceptionThrowingCallbackWithHint();

    private static HostnameVerifier defaultHostnameVerififer;

    /**
     * Returns the Smack version information, eg "1.3.0".
     *
     * @return the Smack version information.
     */
    public static String getVersion() {
        return SmackInitialization.SMACK_VERSION;
    }
    
    //...
    //...
}

可以看到这个类是final 并且里面全都是静态方法和静态成员变量。其实这个类作用就是在你使用smack前可以进行静态值得配置。比如:SmackConfiguration.DEBUG = true;

小结:
其实Smack的初始化操作都是在smack-core moudle下SmackInitialization类的静态代码中完成的,并且配置文件在同一moudle下的smack-config.xml文件。

SmackConfiguration类getVersion()时,SmackInitialization的静态代码块就会执行。并且关键的AbstractXMPPConnection连接类静态代码块中也调用了此方法,所以只要你创建对象进行连接操作,框架内部就会帮你初始化好配置。

  1. 连接与登录

  2. 1创建连接类
    第一步:创建连接配置

configuration = XMPPTCPConnectionConfiguration.builder()
            .setXmppDomain(myibc_domain)
            .setHostAddress(InetAddress.getByName(mXmppHost))
            .setPort(mXmppPort)
            .setSendPresence(false)
            .setSecurityMode(ConnectionConfiguration.SecurityMode.required)
            .setConnectTimeout(30 * 1000)
            .setCompressionEnabled(false)
            .setCustomSSLContext(SSLUtils.getSSLCAContext(""))
            .setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER)
            .build();

这里我们用的是TCP的连接,所以我们使用XMPPTCPConnectionConfiguration来创建我们的配置信息。
其实XMPPTCPConnectionConfiguration继承了ConnectionConfiguration抽象类。并且在构造方法调用父类的构造方法。
配置都保存在了ConnectionConfiguration中

public abstract class ConnectionConfiguration {
    protected ConnectionConfiguration(Builder<?, ?> builder) {
        //略...
        authzid = builder.authzid;
        username = builder.username;
        password = builder.password;
        callbackHandler = builder.callbackHandler;
        //略...
    }
}

第二步:创建用于连接服务器的核心类XMPPTCPConnection

//将连接配置对象当作参数传入给XMPPTCPConnection构造函数
XMPPTCPConnection connection = new XMPPTCPConnection(configuration);

配置信息传入到XMPPTCPConnection和父类AbstractXMPPConnection中各个保存了一份。

  1. 2 连接
    用XMPPTCPConnection实例进行连接
//进行连接
connection.connect();

首先我们了解一下XMPPTCPConnection的继承关系


XMPPTCPConnectionExtends

connect()方法是在AbstractXMPPConnection抽象类中进行实现的。

public abstract class AbstractXMPPConnection implements XMPPConnection {
    //...
    
    public synchronized AbstractXMPPConnection connect() throws SmackException, IOException, XMPPException, InterruptedException {
        // 检测是否已连接。
        throwAlreadyConnectedExceptionIfAppropriate();

        // 重置连接状态
        initState();
        closingStreamReceived = false;
        streamId = null;

        try {
            //1.关键,执行连接的方法
            connectInternal();

            //判断TLS是否认证过
            if (!isSecureConnection() && getConfiguration().getSecurityMode() == SecurityMode.required) {
                throw new SecurityRequiredByClientException();
            }
        } catch (SmackException | IOException | XMPPException | InterruptedException e) {
            instantShutdown();
            throw e;
        }

        // 回调已连接监听
        connected = true;
        callConnectionConnectedListener();

        return this;
    }
    
    //...

    //2.
    protected abstract void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException;
}

关键方法是调用connectInternal();此方法是AbstractXMPPConnection类的抽象方法,具体要查看他的实现类。
我们实例的实现类为:
XMPPTCPConnection

public class XMPPTCPConnection extends AbstractXMPPConnection {

    @Override
    protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
        //1.连接
        connectUsingConfiguration();

        //2.连接成功后,通过流初始化writer Reader。
        initConnection();

        // TLS handled will be true either if TLS was established, or if it was not mandatory.
        waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS");

        // Wait with SASL auth until the SASL mechanisms have been received
        waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server");
    }


   private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException {
        //略...
        SocketFactory socketFactory = config.getSocketFactory();
        
        for (Rfc6120TcpRemoteConnectionEndpoint endpoint : result.discoveredRemoteConnectionEndpoints) {
            Iterator<? extends InetAddress> inetAddresses;
            String host = endpoint.getHost().toString();
            UInt16 portUint16 = endpoint.getPort();
            int port = portUint16.intValue();
            if (proxyInfo == null) {//无代理
                SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory);
                final InetAddress inetAddress = inetAddresses.next();
                final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
                LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress);
                //1.这里进行异步的连接
                socketFuture.connectAsync(inetSocketAddress, timeout);
                try {
                    //2.获取socket
                    socket = socketFuture.getOrThrow();
                } catch (IOException e) {
                    RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(
                                    endpoint, inetAddress, e);
                    connectionExceptions.add(rce);
                    if (inetAddresses.hasNext()) {
                        continue innerloop;
                    } else {
                        break innerloop;
                    }
                }
            } else {
                //略...
                return;
            }
        }

        // There are no more host addresses to try
        // throw an exception and report all tried
        // HostAddresses in the exception
        throw EndpointConnectionException.from(result.lookupFailures, connectionExceptions);
    }
    
}

connectUsingConfiguration连接方法
可以看见通过用户传入的配置信息,获取到SocketFactory对象,并调用connectAsync()进行异步连接。

主要步骤为两步:
1.异步的连接
socketFactory.connectAsync()
2.获取socket
socket = socketFuture.getOrThrow();

socketFactory.connectAsync() :

   public static class SocketFuture extends InternalSmackFuture<Socket, IOException> {

    private final Socket socket;

    private final Object wasInterruptedLock = new Object();

    private boolean wasInterrupted;

    public SocketFuture(SocketFactory socketFactory) throws IOException {
        socket = socketFactory.createSocket();
    }
    
    public void connectAsync(final SocketAddress socketAddress, final int timeout) {
        AbstractXMPPConnection.asyncGo(new Runnable() {
            @Override
            public void run() {
                try {
                    //在这里进行socket的连接操作
                    socket.connect(socketAddress, timeout);
                }
                catch (IOException e) {
                    setException(e);
                    return;
                }
                synchronized (wasInterruptedLock) {
                    if (wasInterrupted) {
                        closeSocket();
                        return;
                    }
                }
                setResult(socket);
            }
        });
    }
}

到此连接是已经建立了,在初始化写入写出流在看看我们之前XMPPTCPConnection类中connectInternal方法的第二步
initConnection()也是XMPPTCPConnection类中的方法。

private void initConnection() throws IOException, InterruptedException {
    compressionHandler = null;

    //1.初始化reader和writer实例对象
    initReaderAndWriter();

    //2.初始化 启动 写入流 线程 
    packetWriter.init();
    //3.启动读线程。startup()方法将阻塞,直到从服务器获取打开的流数据包
    packetReader.init();
}

initReaderAndWriter()

private void initReaderAndWriter() throws IOException {
    InputStream is = socket.getInputStream();
    OutputStream os = socket.getOutputStream();
    if (compressionHandler != null) {
        is = compressionHandler.getInputStream(is);
        os = compressionHandler.getOutputStream(os);
    }
    // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
    writer = new OutputStreamWriter(os, "UTF-8");
    reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));

    // If debugging is enabled, we open a window and write out all network traffic.
    initDebugger();
}

这里很明白,就是通过socket获取输入输出流,创建writer、reader。

2.packetWriter.init();
PacketWriter对象是在XMPPTCPConnection类的成员变量中创建的

protected final PacketWriter packetWriter = new PacketWriter();

protected final PacketReader packetReader = new PacketReader();
protected class PacketWriter {
    //队列
    private final ArrayBlockingQueueWithShutdown<Element> queue = 
            new ArrayBlockingQueueWithShutdown<>(QUEUE_SIZE, true);

    //略...
    void init() {
        shutdownTimestamp = null;

        if (unacknowledgedStanzas != null) {
            // It's possible that there are new stanzas in the writer queue that
            // came in while we were disconnected but resumable, drain those into
            // the unacknowledged queue so that they get resent now
            drainWriterQueueToUnacknowledgedStanzas();
        }

        queue.start();
        running = true;
        //1.开启线程
        Async.go(new Runnable() {
            @Override
            public void run() {
                LOGGER.finer(threadName + " start");
                try {
                    //2.写入报文,报文是在queue队列中获取的
                    writePackets();
                } finally {
                    LOGGER.finer(threadName + " exit");
                    running = false;
                    notifyWaitingThreads();
                }
            }
        }, threadName);
    }

    //...
    
}

//1.开启线程
//2.写入报文,报文是在queue队列中获取的
这里就不贴出writePackets代码了。

packetReader.init()
我们在看看之前packetReader.init()又做了些什么


PacketReader

可以看到它主要执行的代码为parsePackets() 方法。


parsePackets

通过XmlPullParser parser 解析器来读取收到的内容。

这里有一个问题XmlPullParser 哪里来的呢?我们现在看看他赋值的代码。
在AbstractXMPPConnection类中

private void openStreamAndResetParser() throws IOException, NotConnectedException, InterruptedException {
    sendStreamOpen();
    resetParser();
}
private void resetParser() throws IOException {
        try {
            packetReader.parser = SmackXmlParser.newXmlParser(reader);
        } catch (XmlPullParserException e) {
            throw new IOException(e);
        }
   }

看到reader对象了还记得我们之前连接成功后initReaderAndWriter()方法的赋值吗?

到这里我们整个连接过程就算完成了。

小结:
1.连接首先会调用AbstractXMPPConnection的connect()方法。
2.方法会调用实现类XMPPTCPConnection的connectInternal().

  • connectInternal方法做2步操作

    • connectUsingConfiguration:
      • 通过config创建SocketFactory并得到Socket。
      • Socket进行异步连接。

    -连接成功
    -initConnection:
    -初始化reader和writer实例对象
    -初始化packetWriter 写入也是在报文队列中,实际通过writer对象写入。
    -初始化packetReader,报文在队列中有数据时则会读取。实际会通过reader读取。

  1. 3 登录
mConnection.login(loginUserId, loginPassword, resource);

调用其实是抽象类AbstractXMPPConnection中的login()方法。

public synchronized void login(CharSequence username, String password, Resourcepart resource) throws XMPPException,SmackException, IOException, InterruptedException {
    if (!config.allowNullOrEmptyUsername) {
        StringUtils.requireNotNullNorEmpty(username, "Username must not be null nor empty");
    }
    throwNotConnectedExceptionIfAppropriate("Did you call connect() before login()?");
    throwAlreadyLoggedInExceptionIfAppropriate();
    usedUsername = username != null ? username.toString() : null;
    usedPassword = password;
    usedResource = resource;
    loginInternal(usedUsername, usedPassword, usedResource);
}

继续调用其实是AbstractXMPPConnection的一个抽象方法

protected abstract void loginInternal(String username, String password, Resourcepart resource)
                throws XMPPException, SmackException, IOException, InterruptedException;

我们这里的实现类是XMPPTCPConnection

Override
protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
                SmackException, IOException, InterruptedException {
    // 使用SASL进行身份验证
    SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;

    streamFeaturesAfterAuthenticationReceived = false;
    //1.进行身份认证
    authenticate(username, password, config.getAuthzid(), sslSession);

    //在身份验证之后等待流特性。
    waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");
    
    //如果启用了压缩,则请求服务器使用流压缩
    maybeEnableCompression();
    
    //略...
    
    //绑定资源
    bindResourceAndEstablishSession(resource);
    
    //略...
    
    //2.登录成功后
    afterSuccessfulLogin(false);
}

这个方法有2个地方需要注意
1.进行身份认证
2.登录成功后的状态发送与监听。

首先我们看看authenticate()方法做了些什么。可以看到这里是调用的方法

protected final SASLMechanism authenticate(String username, String password, EntityBareJid authzid,SSLSession sslSession)  {
    //1.认证
    SASLMechanism saslMechanism = saslAuthentication.authenticate(username, password, authzid, sslSession);
    //认证成功
    afterSaslAuthenticationSuccess();
    return saslMechanism;
}

调用的是 saslAuthentication对象authenticate方法。
saslAuthentication其实是在创建连接对象时新建出来的

protected AbstractXMPPConnection(ConnectionConfiguration configuration) {
    saslAuthentication = new SASLAuthentication(this, configuration);   
    //略...
}

我们看看SASLAuthentication类的authenticate方法:

    SASLMechanism authenticate(String username, String password, EntityBareJid authzid, SSLSession sslSession)
                    throws XMPPErrorException, SASLErrorException, IOException,
                    InterruptedException, SmackSaslException, NotConnectedException, NoResponseException {

        //获取相应jid的SASL机制。
        final SASLMechanism mechanism = selectMechanism(authzid);
        final CallbackHandler callbackHandler = configuration.getCallbackHandler();
        final String host = connection.getHost();
        final DomainBareJid xmppServiceDomain = connection.getXMPPServiceDomain();

        synchronized (this) {
            currentMechanism = mechanism;

            //这里我们配置未传入callbackHandler,所以走else
            if (callbackHandler != null) {
                currentMechanism.authenticate(host, xmppServiceDomain, callbackHandler, authzid, sslSession);
            }
            else {
                //(重要),认证。
                currentMechanism.authenticate(username, host, xmppServiceDomain, password, authzid, sslSession);
            }

            final long deadline = System.currentTimeMillis() + connection.getReplyTimeout();
            while (!mechanism.isFinished()) {
                final long now = System.currentTimeMillis();
                if (now >= deadline) break;
                // Wait until SASL negotiation finishes
                wait(deadline - now);
            }
        }

        mechanism.throwExceptionIfRequired();

        return mechanism;
    }

在看看对应SASLMechanism 类中的authenticate方法。

public final void authenticate(String username, String host, DomainBareJid serviceName, String password,
                EntityBareJid authzid, SSLSession sslSession)
                throws SmackSaslException, NotConnectedException, InterruptedException {
    this.authenticationId = username;
    this.host = host;
    this.serviceName = serviceName;
    this.password = password;
    this.authorizationId = authzid;
    this.sslSession = sslSession;
    assert authorizationId == null || authzidSupported();
    authenticateInternal();
    //认证
    authenticate();
}

private void authenticate() throws SmackSaslException, NotConnectedException, InterruptedException {
    //1.通过配置选择的加密方式加密用户账号,密码
    byte[] authenticationBytes = getAuthenticationText();
    String authenticationText;
    if (authenticationBytes != null && authenticationBytes.length > 0) {
        //进行base64编码
        authenticationText = Base64.encodeToString(authenticationBytes);
    } else {
        authenticationText = "=";
    }
    //发送认证到服务器。
    connection.sendNonza(new AuthMechanism(getName(), authenticationText));
}

到这里认证走完了,然后之前在AbstractXMPPConnection类中authenticate方法中还有afterSaslAuthenticationSuccess方法进行执行。

protected void afterSaslAuthenticationSuccess()
                 throws NotConnectedException, InterruptedException, SmackWrappedException {
     sendStreamOpen();
 }



protected void sendStreamOpen() throws NotConnectedException, InterruptedException {

    CharSequence to = getXMPPServiceDomain();
    CharSequence from = null;
    CharSequence localpart = config.getUsername();
    if (localpart != null) {
        from = XmppStringUtils.completeJidFrom(localpart, to);
    }
    String id = getStreamId();

    StreamOpen streamOpen = new StreamOpen(to, from, id, config.getXmlLang(), StreamOpen.StreamContentNamespace.client);
    //发送 流打开报文。
    sendNonza(streamOpen);

    XmlEnvironment.Builder xmlEnvironmentBuilder = XmlEnvironment.builder();
    xmlEnvironmentBuilder.with(streamOpen);
    outgoingStreamXmlEnvironment = xmlEnvironmentBuilder.build();
}

可以看到这里发送了一个流报文。

最后回到XMPPTCPConnection类中loginInternal方法,现在进行第二步,认证成功后。

Override
protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
                SmackException, IOException, InterruptedException {
    // 使用SASL进行身份验证
    SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;

    streamFeaturesAfterAuthenticationReceived = false;
    //1.进行身份认证
    authenticate(username, password, config.getAuthzid(), sslSession);

    //在身份验证之后等待流特性。
    waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");
    
    //如果启用了压缩,则请求服务器使用流压缩
    maybeEnableCompression();
    
    //略...
    
    //绑定资源
    bindResourceAndEstablishSession(resource);
    
    //略...
    
    //2.登录成功后
    afterSuccessfulLogin(false);
}

查看afterSuccessfulLogin方法首先调用的是XMPPTCPConnection类中的方法然后调用AbstractXMPPConnection抽象类
afterSuccessfulLogin。

protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
    // Reset the flag in case it was set
    disconnectedButResumeable = false;
    super.afterSuccessfulLogin(resumed);
}
protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
    if (!resumed) {
        authenticatedConnectionInitiallyEstablishedTimestamp = System.currentTimeMillis();
    }

    this.authenticated = true;


    if (debugger != null) {
        debugger.userHasLogged(user);
    }
    
    //1.回调认证成功的监听。
    callConnectionAuthenticatedListener(resumed);

    //2.如果配置了出席状态,认证成功后发送available状态
    if (config.isSendPresence() && !resumed) {
        Presence availablePresence = getStanzaFactory()
                        .buildPresenceStanza()
                        .ofType(Presence.Type.available)
                        .build();
        sendStanza(availablePresence);
    }
}

可以看见它主要就做了这2步操作:
1.回调认证成功的监听。
2.如果配置了出席状态,认证成功后发送available状态

  protected void callConnectionAuthenticatedListener(boolean resumed) {
       for (ConnectionListener listener : connectionListeners) {
           try {
               listener.authenticated(this, resumed);
           } catch (Exception e) {
               // Catch and print any exception so we can recover
               // from a faulty listener and finish the shutdown process
               LOGGER.log(Level.SEVERE, "Exception in authenticated listener", e);
           }
       }
   }

到这里整个登录流程就全部走完了。

小结:
其实整个认证过程首先会通过用户的Jid得到SASLMechanism,认证主要是在SASLMechanism类authenticate中进行的,它会通过getAuthenticationText()加密拼接你的用户名密码,这个方法是个抽象,加密的方式就是你之前设置的方式。然后发送认证的报文给服务器。

这时候认证成功了,然后: 1.发送了一个流打开的报文。2.回调认证成功的Listener。3.如果配置了发送状态,认证成功后还是发送一个出席状态的报文。

<h1 id="3">3. 消息的发送</h1>
第一步:创建ChatManager
这里是通过smack-extensions Moudle中ChatManager进行聊天的管理。

chatManager = ChatManager.getInstanceFor(xmppConnection);
public final class ChatManager extends Manager {
    private static final Map<XMPPConnection, ChatManager> INSTANCES = new WeakHashMap<>();

    public static synchronized ChatManager getInstanceFor(XMPPConnection connection) {
        ChatManager chatManager = INSTANCES.get(connection);
        if (chatManager == null) {
            chatManager = new ChatManager(connection);
            INSTANCES.put(connection, chatManager);
        }
        return chatManager;
    }
    
    //略...
}

这里可以看见ChatManager 是继承与Manager抽象类。在通过单例获取ChatManager对象的时候会缓存到静态HashMap内存中,下次如果内存中有就直接获取。

这里贴出Manager类代码。

public abstract class Manager {

    final WeakReference<XMPPConnection> weakConnection;

    public Manager(XMPPConnection connection) {
        Objects.requireNonNull(connection, "XMPPConnection must not be null");

        weakConnection = new WeakReference<>(connection);
    }

    protected final XMPPConnection connection() {
        return weakConnection.get();
    }

    protected final XMPPConnection getAuthenticatedConnectionOrThrow() throws NotLoggedInException {
        XMPPConnection connection = connection();
        if (!connection.isAuthenticated()) {
            throw new NotLoggedInException();
        }
        return connection;
    }

    protected static final ScheduledAction schedule(Runnable runnable, long delay, TimeUnit unit) {
        return schedule(runnable, delay, unit, ScheduledAction.Kind.NonBlocking);
    }

    protected static final ScheduledAction scheduleBlocking(Runnable runnable, long delay, TimeUnit unit) {
        return schedule(runnable, delay, unit, ScheduledAction.Kind.Blocking);
    }

    protected static final ScheduledAction schedule(Runnable runnable, long delay, TimeUnit unit, ScheduledAction.Kind scheduledActionKind) {
        return AbstractXMPPConnection.SMACK_REACTOR.schedule(runnable, delay, unit, scheduledActionKind);
    }
}

第二步:创建Chat对象

Chat chat = chatManager.chatWith(toUserJid);

private final Map<EntityBareJid, Chat> chats = new ConcurrentHashMap<>();

public Chat chatWith(EntityBareJid jid) {
    Chat chat = chats.get(jid);
    if (chat == null) {
        synchronized (chats) {
            // Double-checked locking.
            chat = chats.get(jid);
            if (chat != null) {
                return chat;
            }
            chat = new Chat(connection(), jid);
            chats.put(jid, chat);
        }
    }
    return chat;
}

在ChatManager中通过chatWith方法得到Chat对象,如果内存缓存中没有就进行创建。

第二步:发送消息。

Chat chat = chatManager.chatWith(toUserJid);
Message msg = new Message();
msg.setType(Message.Type.chat);
chat.send(msg);
public void send(Message message) throws NotConnectedException, InterruptedException {
    switch (message.getType()) {
    case normal:
    case chat:
        break;
    default:
        throw new IllegalArgumentException("Message must be of type 'normal' or 'chat'");
    }

    Jid to = lockedResource;
    if (to == null) {
        to = jid;
    }
    message.setTo(to);

    connection().sendStanza(message);
}

可以看见这里通过我们传入的connection调用sendStanza发送消息。

我们在看一下调用的sendStanza方法是在哪里执行的。
其实sendStanza是XMPPConnection接口的方法,我们用到的XMPPTCPConnection中没实现,在抽象类AbstractXMPPConnection中进行了实现

Override
public final void sendStanza(Stanza stanza) throws NotConnectedException, InterruptedException {
    Objects.requireNonNull(stanza, "Stanza must not be null");
    assert stanza instanceof Message || stanza instanceof Presence || stanza instanceof IQ;

    throwNotConnectedExceptionIfAppropriate();
    switch (fromMode) {
    case OMITTED:
        stanza.setFrom((Jid) null);
        break;
    case USER:
        stanza.setFrom(getUser());
        break;
    case UNCHANGED:
    default:
        break;
    }
    
    //设置拦截器
    Stanza stanzaAfterInterceptors = firePacketInterceptors(stanza);
    //发送Stanza
    sendStanzaInternal(stanzaAfterInterceptors);
}

可以看到这个方法主要做了2个重要事情:1.设置拦截器。2.发送Stanza。我们关注下sendStanzaInternal方法。

sendStanzaInternal是AbstractXMPPConnection类的一个抽象方法,我们用的是XMPPTCPConnection,所以具体实现查看XMPPTCPConnection中的sendStanzaInternal方法。

protected abstract void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException;
Override
protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException {
    //消息写入
    packetWriter.sendStreamElement(packet);
    if (isSmEnabled()) {
        for (StanzaFilter requestAckPredicate : requestAckPredicates) {
            if (requestAckPredicate.accept(packet)) {
                requestSmAcknowledgementInternal();
                break;
            }
        }
    }
}

看到packetWriter是不是似曾相识?是的,这个就是我们之前讲解的连接成功后初始化在内存的那个packetWriter。

我们看看packetWriter中的sendStreamElement做了什么操作。

private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
                        QUEUE_SIZE, true);

protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
    throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
    try {
        //存入队列
        queue.put(element);
    }
    catch (InterruptedException e) {
        // put() may throw an InterruptedException for two reasons:
        // 1. If the queue was shut down
        // 2. If the thread was interrupted
        // so we have to check which is the case
        throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
        // If the method above did not throw, then the sending thread was interrupted
        throw e;
    }
}

可以看见我们发送的报文存在了队列中,最终写入的方法是在writePackets里。writePackets方法中会从队列中获取packet,最终其实是PacketWriter类中writer进行了写入操作。PacketWriter和writer在前面smack连接的时候已讲解过,这里不再细述,下面贴出writePackets代码:


writePackets

小结:
消息发送我们是用了Smack扩展中的ChatManager进行的,在ChatManager中会保存Chat对象,在我们发送消息时根据Jid来获取到chat对象,在消息发送时调用chat.send(),其实最终的实现是在我们的实现类XMPPTCPConnection sendStanzaInternal中,sendStanzaInternal会调用packetWriter的sendStreamElement方法,最终在PacketWriter中sendStreamElement方法会把报文写入自己的内存队列中,在writePackets方法中取出进行发送,最后是PacketWriter类中的writer流进行的写入操作。

<h1 id="4">4. 消息的接收</h1>

第一步: 添加监听IncomingChatMessageListener
这里监听设置在smack-extensions Moudle中ChatManager中

chatManager.addIncomingListener(messageListener);
public interface IncomingChatMessageListener {

    void newIncomingMessage(EntityBareJid from, Message message, Chat chat);

}
public final class ChatManager extends Manager {

    private final Set<IncomingChatMessageListener> incomingListeners = new CopyOnWriteArraySet<>();

    private final AsyncButOrdered<Chat> asyncButOrdered = new AsyncButOrdered<>();

    private ChatManager(final XMPPConnection connection) {
        super(connection);
        
        //1.设置XMPPConnection 的监听
        connection.addSyncStanzaListener(new StanzaListener() {
            @Override
            public void processStanza(Stanza stanza) {
                final Message message = (Message) stanza;
                if (!shouldAcceptMessage(message)) {
                    return;
                }

                final Jid from = message.getFrom();
                final EntityFullJid fullFrom = from.asEntityFullJidOrThrow();
                final EntityBareJid bareFrom = fullFrom.asEntityBareJid();
                final Chat chat = chatWith(bareFrom);
                chat.lockedResource = fullFrom;
                
                //2.这里所有chat都是保存在asyncButOrdered中,监听是在队列里排队回调。
                asyncButOrdered.performAsyncButOrdered(chat, new Runnable() {
                    @Override
                    public void run() {
                        //3.回调incomingListeners
                        for (IncomingChatMessageListener listener : incomingListeners) {
                            listener.newIncomingMessage(bareFrom, message, chat);
                        }
                    }
                });

            }
        }, INCOMING_MESSAGE_FILTER);
        
        //略...
    }
    
    //...
}

我们首先分析里面第二步:
监听到时执行的runnable。asyncButOrdered.performAsyncButOrdered,asyncButOrdered源码这里就不贴出来了。

每个chat对象回调监听时都会在AsyncButOrdered 类中存储一个Map<K, Queue<Runnable>>,这里 K 为 chat。
然后进行包装存储在成员变量 Map<K, Handler> threadActiveMap = new HashMap<>() 中。
这里的Handle 是AsyncButOrdered的一个内部类,然后执行run方法。

private class Handler implements Runnable {
        private final Queue<Runnable> keyQueue;
        private final K key;
        
        Handler(Queue<Runnable> keyQueue, K key) {
            this.keyQueue = keyQueue;
            this.key = key;
        }

        @Override
        public void run() {
            //略。。。
            runnable.run();
        }
}

最后回到第三步:

asyncButOrdered.performAsyncButOrdered(chat, new Runnable() {
    @Override
    public void run() {
        //3.回调incomingListeners
        for (IncomingChatMessageListener listener : incomingListeners) {
            listener.newIncomingMessage(bareFrom, message, chat);
        }
    }
});

这里不难看出为什么要这么做,因为上层可能会有很多监听过来,回调的调用其实也消息同样,量大。并且会有很多不同的chat对象监听返回,smack还进行了区分保存不同的chat回调,并在回调中第三个参数中返回给调用者了。
并且这个回调是异步的,smack在内部维护了一个线程池进行处理。

好,现在我们在回到第一步监听是如何在connection中回调的。

第一步:设置XMPPConnection 的监听
XMPPConnection

void addSyncStanzaListener(StanzaListener stanzaListener, StanzaFilter stanzaFilter);

AbstractXMPPConnection

private final Map<StanzaListener, ListenerWrapper> syncRecvListeners = new LinkedHashMap<>();

@Override
public void addSyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) {
    if (packetListener == null) {
        throw new NullPointerException("Packet listener is null.");
    }
    ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
    synchronized (syncRecvListeners) {
        //添加Listener
        syncRecvListeners.put(packetListener, wrapper);
    }
}

最终实现是在AbstractXMPPConnection类中实现的。
可以看到保存的是一个ListenerWrapper包装类,第一个参数是回调监听,第二个参数是我们传入的监听过滤器。
现在我们需要看的是那些地方调用了成员变量syncRecvListeners中的回调。

我们这里检索查看是在AbstractXMPPConnection类中invokeStanzaCollectorsAndNotifyRecvListeners方法中进行了调用

invokeStanzaCollectorsAndNotifyRecvListeners()

protected void invokeStanzaCollectorsAndNotifyRecvListeners(final Stanza packet) {

    listenersToNotify.clear();
    //1.如果用户设置过滤器,则通过过滤消息类型,返回上层监听
    extractMatchingListeners(packet, syncRecvListeners, listenersToNotify);

    //2.
    ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() {
        @Override
        public void run() {

            Iterator<StanzaListener> it = listenersToNotify.iterator();
            synchronized (syncRecvListeners) {
                while (it.hasNext()) {
                    StanzaListener stanzaListener = it.next();
                    if (!syncRecvListeners.containsKey(stanzaListener)) {
                        it.remove();
                    }
                }
            }
            for (StanzaListener listener : listenersToNotify) {
                try {
                    listener.processStanza(packet);
                } catch (NotConnectedException e) {
                    LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e);
                    break;
                } catch (Exception e) {
                    LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
                }
            }
        }
    });

}

第一步:如果用户设置过滤器,则通过过滤消息类型,返回上层监听
extractMatchingListeners

private static void extractMatchingListeners(Stanza stanza, Map<StanzaListener, ListenerWrapper> listeners,
                Collection<StanzaListener> listenersToNotify) {
    synchronized (listeners) {
        for (ListenerWrapper listenerWrapper : listeners.values()) {
            if (listenerWrapper.filterMatches(stanza)) {//过滤
                //加入外部。
                listenersToNotify.add(listenerWrapper.getListener());
            }
        }
    }
}
private final StanzaFilter packetFilter;

public boolean filterMatches(Stanza packet) {
       return packetFilter == null || packetFilter.accept(packet);
}

这里过滤调用的是包装类传入的那个过滤器。并通过accept(packet)接口进行过滤。好,我们现在在回过头看看这个过滤器传入的啥?
还记得我们之前在ChatManager addSyncStanzaListener() 方法的第二个参数吗?

private static final StanzaFilter INCOMING_MESSAGE_FILTER = new AndFilter(
                    MESSAGE_FILTER,
                    FromTypeFilter.ENTITY_FULL_JID
                    );

注意:这个过滤器是一层一层的,可以层层过滤,所以它的构造过程也是一层一层的,接下来不要眨眼。

public class AndFilter extends AbstractListFilter implements StanzaFilter {


    public AndFilter() {
        super();
    }


    public AndFilter(StanzaFilter... filters) {
        super(filters);
    }

    @Override
    public boolean accept(Stanza packet) {
        for (StanzaFilter filter : filters) {
            if (!filter.accept(packet)) {
                return false;
            }
        }
        return true;
    }

}

可以看见这里传入的构造传入的参数是一个可变参数,他的accept方法调用了StanzaFilter 过滤器的accept方法。
StanzaFilter 其实是一个接口accept是接口 方法,所以具体实现是在其实现类中

我们这里传入了2过滤器,这里只拿出每个构造的第一个作为讲解。

    private static final StanzaFilter MESSAGE_FILTER = new AndFilter(
                    MessageTypeFilter.NORMAL_OR_CHAT,
                    new OrFilter(MessageWithBodiesFilter.INSTANCE, new StanzaExtensionFilter(XHTMLExtension.ELEMENT, XHTMLExtension.NAMESPACE))
                    );

public static final StanzaFilter NORMAL_OR_CHAT = new OrFilter(NORMAL, CHAT);
public static final StanzaFilter NORMAL = new MessageTypeFilter(Type.normal);

最后定格在MessageTypeFilter中。

public final class MessageTypeFilter extends FlexibleStanzaTypeFilter<Message> {

    private final Message.Type type;

    private MessageTypeFilter(Message.Type type) {
        super(Message.class);
        this.type = type;
    }

    @Override
    protected boolean acceptSpecific(Message message) {
        return message.getType() == type;
    }

    @Override
    public String toString() {
        return getClass().getSimpleName() + ": type=" + type;
    }
}

没有看见accept方法啊?客官,别急嘛!看看他的父类中是否有。

public abstract class FlexibleStanzaTypeFilter<S extends Stanza> implements StanzaFilter {

    protected final Class<S> stanzaType;

    public FlexibleStanzaTypeFilter(Class<S> packetType) {
        this.stanzaType = Objects.requireNonNull(packetType, "Type must not be null");
    }

    @SuppressWarnings("unchecked")
    public FlexibleStanzaTypeFilter() {
        stanzaType = (Class<S>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
    }

    @Override
    @SuppressWarnings("unchecked")
    public final boolean accept(Stanza packet) {
        if (stanzaType.isInstance(packet)) {//1.验证消息class是否匹配
            return acceptSpecific((S) packet);//2.实现类自己的过滤
        }
        return false;
    }

    protected abstract boolean acceptSpecific(S packet);

    @Override
    public String toString() {
        return getClass().getSimpleName() + ": " + stanzaType.toString();
    }
}

果然在父类中看见了accept方法。
1.处:调用了stanzaType.isInstance(packet),这里可以看见stanzaType就是传入的泛型类型。我们在MessageTypeFilter 这里传的是super(Message.class); 所以这个过滤就是过滤到不是Message消息的报文。
2.处:进行2次过滤acceptSpecific()方法,这里他的子类实现是MessageTypeFilter。
可以看见条件是message.getType() == type,所以消息Type也要与传入的type进行匹配。

这里过滤器是如何工作的就讲解完了。
好了,现在可以眨眼了。

回过头我们继续看过滤后的监听操作 [ invokeStanzaCollectorsAndNotifyRecvListeners]

protected void invokeStanzaCollectorsAndNotifyRecvListeners(final Stanza packet) {

    listenersToNotify.clear();
    //1.如果用户设置过滤器,则通过过滤消息类型,返回上层监听
    extractMatchingListeners(packet, syncRecvListeners, listenersToNotify);

    //2.
    ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() {
        @Override
        public void run() {

            Iterator<StanzaListener> it = listenersToNotify.iterator();
            synchronized (syncRecvListeners) {
                while (it.hasNext()) {
                    StanzaListener stanzaListener = it.next();
                    if (!syncRecvListeners.containsKey(stanzaListener)) {
                        it.remove();
                    }
                }
            }
            //3.回调监听
            for (StanzaListener listener : listenersToNotify) {
                try {
                    listener.processStanza(packet);
                } catch (NotConnectedException e) {
                    LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e);
                    break;
                } catch (Exception e) {
                    LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
                }
            }
        }
    });

}

extractMatchingListeners方法传入的packet 通过过滤后得到符合条件的监听listenersToNotify 对他们进行回调。

2.处:ASYNC_BUT_ORDERED.performAsyncButOrdered
这里就不在做讲解了,跟之前ChatManager回调监听时处理是一样的,只不过这里hashMap中存储的key为XMPPConnection。
3.处:进行回调的监听。

好,现在监听回调的调用地方算是已经找到了,但是我们想继续看看invokeStanzaCollectorsAndNotifyRecvListeners是在哪里调用过来的。现在就一步一步往上面推。

检索发有2地方进行调用,这里我们的实现类是继承AbstractXMPPConnection类的,所以查看AbstractXMPPConnection类中调用invokeStanzaCollectorsAndNotifyRecvListeners的方法

protected void processStanza(final Stanza stanza) throws InterruptedException {
    assert stanza != null;

    final SmackDebugger debugger = this.debugger;
    if (debugger != null) {
        debugger.onIncomingStreamElement(stanza);
    }

    lastStanzaReceived = System.currentTimeMillis();
    // 将传入的数据包传递给侦听器
    invokeStanzaCollectorsAndNotifyRecvListeners(stanza);
}

然后在向上检索在AbstractXMPPConnection类中

protected void parseAndProcessStanza(XmlPullParser parser)
                throws XmlPullParserException, IOException, InterruptedException {
    ParserUtils.assertAtStartTag(parser);
    int parserDepth = parser.getDepth();
    Stanza stanza = null;
    try {
        stanza = PacketParserUtils.parseStanza(parser, incomingStreamXmlEnvironment);
    }
    catch (XmlPullParserException | SmackParsingException | IOException | IllegalArgumentException e) {
        CharSequence content = PacketParserUtils.parseContentDepth(parser,
                        parserDepth);
        UnparseableStanza message = new UnparseableStanza(content, e);
        ParsingExceptionCallback callback = getParsingExceptionCallback();
        if (callback != null) {
            callback.handleUnparsableStanza(message);
        }
    }
    ParserUtils.assertAtEndTag(parser);
    if (stanza != null) {
        //1.调用
        processStanza(stanza);
    }
}

可以看到此方法把XmlPullParser解析的内容封装成stanza并调用了processStanza方法。

然后在向上面推,看哪里调用的parseAndProcessStanza方法。
检索发现有多处调用,这里我们的实现是XMPPTCPConnection类。


parsePackets

还记得我们之前连接成功后初始化的XMPPTCPConnection的内部类PacketReader吗?
对的,在他的parsePackets方法中,解析了报文的节点,然后进行了调用过程parseAndProcessStanza()

小结:
其他监听其实也是一样的,比如:群聊的监听。
当消息过来时,都是通过用户设置的过滤器过滤符合用户需要的监听进行回调。他们在处理过滤时的方法都是在 invokeStanzaCollectorsAndNotifyRecvListeners方法中进行的处理

到此,本文smack简单的解析就结束了,这里也是自己的一些小的见解,可能会有不完善和不正确的地方,望大家觉得有不准确的地方,欢迎指出并一起探讨。

邮箱:soarsy@163.com

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,295评论 6 512
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,928评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,682评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,209评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,237评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,965评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,586评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,487评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,016评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,136评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,271评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,948评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,619评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,139评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,252评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,598评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,267评论 2 358

推荐阅读更多精彩内容