- [Smack]
- [1 初始化]
- [1.1 初始化启动类]
- [2 连接与登录]
- [2.1 创建连接类]
- [2.2 连接]
- [2.3 登录]
- [3 消息的发送]
- [4 消息的接收]
本文分析Smack源码版本为4.4.0-alpha6
Smack简介
Smack是一个开源,易于使用的XMPP(jabber)客户端类库。
Smack是用Java编写的开放源代码,高度模块化,易于使用的XMPP客户端库,用于Java SE兼容JVM和Android。它是纯Java库,可以嵌入到您的应用程序中,以创建从完整的XMPP即时消息客户端到简单的XMPP集成(例如发送通知消息和支持状态的设备)的任何内容。Smack和XMPP允许您在(M2M,IoT,…)之间以多种方式轻松交换数据,例如“发后不管”,“发布订阅”。
Smack官方资料:
官网地址:https://www.igniterealtime.org/
Github:https://github.com/igniterealtime/Smack/tree/master
文档:https://download.igniterealtime.org/smack/docs/latest/documentation/
Javadoc: http://www.igniterealtime.org/builds/smack/dailybuilds/javadoc/
Smack Modules:
Smack可以轻松地嵌入到任何现有的Java应用程序中。该库作为几个模块提供,可为应用程序所需的功能提供更大的灵活性:
- smack-core-提供核心XMPP功能。包括XMPP RFC中的所有XMPP功能。
- smack-im -提供RFC 6121(XMPP-IM)中定义的功能,例如名册。
- smack-tcp-支持基于TCP的XMPP。包括您通常要使用的XMPPTCPConnection类
- smack-extensions-支持XMPP标准基金会定义的许多扩展(XEP),包括多用户聊天,文件传输,用户搜索等。这些扩展记录在扩展手册中。
- smack-experimental-支持XMPP标准基金会定义的实验扩展(XEP)。这些扩展的API和功能应视为不稳定。
- smack-legacy -支持XMPP标准基金会定义的旧版扩展(XEP)。
- smack-bosh-支持BOSH(XEP-0124)。此代码应视为Beta。
- smack-resolver-minidns-支持在MiniDNS的帮助下解析DNS SRV记录。不支持javax.naming API的平台的理想选择。还支持DNSSEC。
- smack-resolver-dnsjava -支持在dnsjava的帮助下解析DNS SRV记录。
- smack-resolver-javax -支持使用javax名称空间API解析DNS SRV记录。
- smack-debug-用于协议流量的增强型GUI调试器。在类路径中找到并启用调试后,它将自动使用它。
初始化
1初始化启动类
1.1.1 SmackInitialization类
配置文件:
初始化是通过配置文件完成的。在smack-core模块org.jivesoftware.smack / smack-config.xml。默认情况下,Smack将把此文件打入jar中。此特定配置包含要加载的初始化程序类的列表。此初始化器列表中包含所有需要初始化的管理器类型类。
smack-config.xml
<?xml version="1.0"?>
<!-- Smack configuration file. -->
<smack>
<!-- Classes that will be loaded when Smack starts -->
<startupClasses>
<className>org.jivesoftware.smack.initializer.VmArgInitializer</className>
<className>org.jivesoftware.smack.ReconnectionManager</className>
</startupClasses>
<optionalStartupClasses>
<className>org.jivesoftware.smack.util.dns.javax.JavaxResolver</className>
<className>org.jivesoftware.smack.util.dns.minidns.MiniDnsResolver</className>
<className>org.jivesoftware.smack.util.dns.dnsjava.DNSJavaResolver</className>
<className>org.jivesoftware.smack.extensions.ExtensionsInitializer</className>
<className>org.jivesoftware.smack.experimental.ExperimentalInitializer</className>
<className>org.jivesoftware.smack.legacy.LegacyInitializer</className>
<className>org.jivesoftware.smack.tcp.TCPInitializer</className>
<className>org.jivesoftware.smack.sasl.javax.SASLJavaXSmackInitializer</className>
<className>org.jivesoftware.smack.sasl.provided.SASLProvidedSmackInitializer</className>
<className>org.jivesoftware.smack.android.AndroidSmackInitializer</className>
<className>org.jivesoftware.smack.java7.Java7SmackInitializer</className>
<className>org.jivesoftware.smack.im.SmackImInitializer</className>
<className>org.jivesoftware.smackx.omemo.OmemoInitializer</className>
<className>org.jivesoftware.smackx.ox.util.OpenPgpInitializer</className>
</optionalStartupClasses>
</smack>
然而配置文件是哪里加载的呢?其实是在SmackInitialization类,他在静态代码块中进行的加载,这里贴出关键代码:
private static final String DEFAULT_CONFIG_FILE = "org.jivesoftware.smack/smack-config.xml";
//...
static {
//....
//....
InputStream configFileStream;
try {
//1.在这里得到文件的输入流
configFileStream = FileUtils.getStreamForClasspathFile(DEFAULT_CONFIG_FILE, null);
}
catch (Exception e) {
throw new IllegalStateException("Could not load Smack configuration file", e);
}
try {
//2.
processConfigFile(configFileStream, null);
}
catch (Exception e) {
throw new IllegalStateException("Could not parse Smack configuration file", e);
}
// Add the Java7 compression handler first, since it's preferred
SmackConfiguration.addCompressionHandler(new Java7ZlibInputOutputStream());
//....
//....
}
我们在看看第二步中processConfigFile做了什么操作。
public static void processConfigFile(InputStream cfgFileStream,
Collection<Exception> exceptions) throws Exception {
processConfigFile(cfgFileStream, exceptions, SmackInitialization.class.getClassLoader());
}
public static void processConfigFile(InputStream cfgFileStream,
Collection<Exception> exceptions, ClassLoader classLoader) throws Exception {
XmlPullParser parser = PacketParserUtils.getParserFor(cfgFileStream);
XmlPullParser.Event eventType = parser.getEventType();
do {
if (eventType == XmlPullParser.Event.START_ELEMENT) {
//2.
if (parser.getName().equals("startupClasses")) {
parseClassesToLoad(parser, false, exceptions, classLoader);
}
else if (parser.getName().equals("optionalStartupClasses")) {
parseClassesToLoad(parser, true, exceptions, classLoader);
}
}
eventType = parser.next();
}
//1.判断是否是节点结束 循环解析
while (eventType != XmlPullParser.Event.END_DOCUMENT);
CloseableUtil.maybeClose(cfgFileStream, LOGGER);
}
可以看见它把流通过XmlPullParser解析出startupClasses、optionalStartupClasses两个节点中的内容。也就是我们之前 smack-config.xml中的节点。
并且都是调用parseClassesToLoad方法,区别只在于第二个参数的boolen不一样,其实这个boolen关系不大,在后面我们就知道了。
继续看它的parseClassesToLoad方法做了什么:
private static void parseClassesToLoad(XmlPullParser parser, boolean optional,
Collection<Exception> exceptions, ClassLoader classLoader)
throws Exception {
final String startName = parser.getName();
XmlPullParser.Event eventType;
outerloop: do {//1.循环解析出className
eventType = parser.next();
if (eventType == XmlPullParser.Event.START_ELEMENT && "className".equals(parser.getName())) {
String classToLoad = parser.nextText();
if (SmackConfiguration.isDisabledSmackClass(classToLoad)) {
continue outerloop;
}
try {
//2.方法中进行反射加载类 参数classToLoad为节点中className的内容。
loadSmackClass(classToLoad, optional, classLoader);
} catch (Exception e) {
// Don't throw the exception if an exceptions collection is given, instead
// record it there. This is used for unit testing purposes.
if (exceptions != null) {
exceptions.add(e);
} else {
throw e;
}
}
}
}
while (!(eventType == XmlPullParser.Event.END_ELEMENT && startName.equals(parser.getName())));
}
可以看到第一步循环出className节点的内容,第二步把className内容String传给了loadSmackClass方法进行反射加载。
调用loadSmackClass方法:
private static void loadSmackClass(String className, boolean optional, ClassLoader classLoader) throws Exception {
Class<?> initClass;
try {
// Attempt to load and initialize the class so that all static initializer blocks of
// class are executed
//1.反射类的class
initClass = Class.forName(className, true, classLoader);
}
catch (ClassNotFoundException cnfe) {
Level logLevel;
if (optional) {
logLevel = Level.FINE;
}
else {
logLevel = Level.WARNING;
}
LOGGER.log(logLevel, "A startup class '" + className + "' could not be loaded.");
if (!optional) {
throw cnfe;
} else {
return;
}
}
//2.判断传入class类型是否是SmackInitializer类型或者其实现类
if (SmackInitializer.class.isAssignableFrom(initClass)) {
//3.创建实例对象
SmackInitializer initializer = (SmackInitializer) initClass.getConstructor().newInstance();
//4.调用接口initialize方法,其实也就是调用实例的initialize方法
List<Exception> exceptions = initializer.initialize();
if (exceptions == null || exceptions.size() == 0) {
LOGGER.log(Level.FINE, "Loaded SmackInitializer " + className);
} else {
for (Exception e : exceptions) {
LOGGER.log(Level.SEVERE, "Exception in loadSmackClass", e);
}
}
} else {
LOGGER.log(Level.FINE, "Loaded " + className);
}
}
第一步:反射类的class。
第二步:判断传入class类型是否是SmackInitializer类型或者其实现类
第三步:创建实例对象
第四步:调用接口initialize方法,其实也就是调用实例的initialize方法
我们现在看一下SmackInitializer类,其实是一个接口。
package org.jivesoftware.smack.initializer;
import java.util.List;
import org.jivesoftware.smack.SmackConfiguration;
/**
* Defines an initialization class that will be instantiated and invoked by the {@link SmackConfiguration} class during initialization.
*
* <p>
* Any implementation of this class MUST have a default constructor.
*
* @author Robin Collier
*
*/
public interface SmackInitializer {
List<Exception> initialize();
}
现在可以看出第二步判断里的逻辑为:SmackInitializer接口的实现类调用initialize()方法。
好,这时候我们在回过头看看我们smack-config.xml中配置的内容。
VmArgInitializer类
public class VmArgInitializer extends UrlInitializer {
protected String getFilePath() {
return System.getProperty("smack.provider.file");
}
@Override
public List<Exception> initialize() {
if (getFilePath() != null) {
super.initialize();
}
return Collections.emptyList();
}
}
父类UrlInitializer
public abstract class UrlInitializer implements SmackInitializer {
//...
@Override
public List<Exception> initialize() {
//...
//...
}
//...
}
可以看见这个类是实现了SmackInitializer接口的。
在看看[smack-config.xml]中的第二个配置类
ReconnectionManager
public final class ReconnectionManager {
//...
//...
static {
XMPPConnectionRegistry.addConnectionCreationListener(new ConnectionCreationListener() {
@Override
public void connectionCreated(XMPPConnection connection) {
if (connection instanceof AbstractXMPPConnection) {
ReconnectionManager.getInstanceFor((AbstractXMPPConnection) connection);
}
}
});
}
//...
//...
}
可以看到这个类并没有实现SmackInitializer接口,只是类中有静态代码块执行而已。
在看看[smack-config.xml]中的第三个配置类
JavaxResolver
public class JavaxResolver extends DNSResolver implements SmackInitializer {
//...
//...
static {
try {
Hashtable<String, String> env = new Hashtable<>();
env.put("java.naming.factory.initial", "com.sun.jndi.dns.DnsContextFactory");
dirContext = new InitialDirContext(env);
} catch (NamingException e) {
LOGGER.log(Level.SEVERE, "Could not construct InitialDirContext", e);
}
// Try to set this DNS resolver as primary one
setup();
}
public static void setup() {
DNSUtil.setDNSResolver(getInstance());
}
//...
//...
@Override
public List<Exception> initialize() {
setup();
return null;
}
//...
//...
}
这个类也实现了SmackInitializer 接口。
然而实现类方法到底执行了什么呢,这里我们举个栗子,其他的就不一一给大家分析了。
看好,就是这个栗子:SmackImInitializer
public class SmackImInitializer extends UrlInitializer {
@Override
protected String getProvidersUri() {
return "classpath:org.jivesoftware.smack.im/smackim.providers";
}
@Override
protected String getConfigUri() {
return "classpath:org.jivesoftware.smack.im/smackim.xml";
}
}
父类UrlInitializer
public abstract class UrlInitializer implements SmackInitializer {
private static final Logger LOGGER = Logger.getLogger(UrlInitializer.class.getName());
//1.接口的实现方法
@Override
public List<Exception> initialize() {
InputStream is = null;
final ClassLoader classLoader = this.getClass().getClassLoader();
final List<Exception> exceptions = new LinkedList<Exception>();
//2.getProvidersUri() 有些子类对其进行了方法重载,所以具体获取要看子类是否重载
final String providerUriString = getProvidersUri();
if (providerUriString != null) {
try {
final URI providerUri = URI.create(providerUriString);
is = FileUtils.getStreamForUri(providerUri, classLoader);
LOGGER.log(Level.FINE, "Loading providers for providerUri [" + providerUri + "]");
//3.此方法中进行类加载到内存操作
ProviderFileLoader pfl = new ProviderFileLoader(is, classLoader);
ProviderManager.addLoader(pfl);
exceptions.addAll(pfl.getLoadingExceptions());
}
catch (Exception e) {
LOGGER.log(Level.SEVERE, "Error trying to load provider file " + providerUriString, e);
exceptions.add(e);
} finally {
maybeClose(is);
}
}
//4.getConfigUri() 有些子类对其进行了方法重载,所以具体获取要看子类是否重载
final String configUriString = getConfigUri();
if (configUriString != null) {
try {
final URI configUri = URI.create(configUriString);
is = FileUtils.getStreamForUri(configUri, classLoader);
//5.调用加载类操作
SmackInitialization.processConfigFile(is, exceptions, classLoader);
}
catch (Exception e) {
exceptions.add(e);
} finally {
maybeClose(is);
}
}
return exceptions;
}
protected String getProvidersUri() {
return null;
}
protected String getConfigUri() {
return null;
}
private static void maybeClose(InputStream is) {
CloseableUtil.maybeClose(is, LOGGER);
}
}
第一步:SmackImInitializer实例会调用父类UrlInitializer的initialize方法。
第二步:getProvidersUri()方法,因为SmackImInitializer对其进行了重载,所以这里看SmackImInitializer中getProvidersUri()的方法
protected String getProvidersUri() {
return "classpath:org.jivesoftware.smack.im/smackim.providers";
}
可以看见是一个String路径,这个路径文件其实是在SmackImInitializer类所在的smack-im moudle下,我们看一下内容:
smackim.providers
<?xml version="1.0"?>
<smackProviders>
<!-- RFC-6121: Extensible Messaging and Presence Protocol (XMPP): Instant Messaging and Presence -->
<iqProvider>
<elementName>query</elementName>
<namespace>jabber:iq:roster</namespace>
<className>org.jivesoftware.smack.roster.provider.RosterPacketReNameProvider</className>
</iqProvider>
<streamFeatureProvider>
<elementName>sub</elementName>
<namespace>urn:xmpp:features:pre-approval</namespace>
<className>org.jivesoftware.smack.roster.provider.SubscriptionPreApprovalStreamFeatureProvider</className>
</streamFeatureProvider>
<!-- XEP-0237: Roster Versioning -->
<streamFeatureProvider>
<elementName>ver</elementName>
<namespace>urn:xmpp:features:rosterver</namespace>
<className>org.jivesoftware.smack.roster.provider.RosterVerStreamFeatureProvider</className>
</streamFeatureProvider>
</smackProviders>
第三步:ProviderFileLoader pfl = new ProviderFileLoader(is, classLoader);
把smackim.providers配置文件的输入流传入。
第四步:这里在看看ProviderFileLoader中做了什么?
public class ProviderFileLoader implements ProviderLoader {
@SuppressWarnings("unchecked")
public ProviderFileLoader(InputStream providerStream, ClassLoader classLoader) {
//1.这里省略部分代码
//其实就是把InputStream通过XmlPullParser拿到className节点信息.
//...
try {
//2.加载class
final Class<?> provider = classLoader.loadClass(className);
switch (typeName) {
case "iqProvider":
//3.节点为iqProvider 创建实例类并加入内存iqProviders
if (IqProvider.class.isAssignableFrom(provider)) {
IqProvider<IQ> iqProvider = (IqProvider<IQ>) provider.getConstructor().newInstance();
iqProviders.add(new IQProviderInfo(elementName, namespace, iqProvider));
}
else {
exceptions.add(new IllegalArgumentException(className + " is not a IQProvider"));
}
break;
case "extensionProvider":
//...
//...
break;
case "streamFeatureProvider":
//4.节点为streamFeatureProvider创建实例类并加入内存sfProviders
ExtensionElementProvider<ExtensionElement> streamFeatureProvider = (ExtensionElementProvider<ExtensionElement>) provider.getConstructor().newInstance();
sfProviders.add(new StreamFeatureProviderInfo(elementName,
namespace,
streamFeatureProvider));
break;
default:
LOGGER.warning("Unknown provider type: " + typeName);
}
}
catch (ClassNotFoundException cnfe) {
LOGGER.log(Level.SEVERE, "Could not find provider class", cnfe);
exceptions.add(cnfe);
}
catch (InstantiationException ie) {
LOGGER.log(Level.SEVERE, "Could not instanciate " + className, ie);
exceptions.add(ie);
}
//...
//...
}
}
1.把传入的配置文件流InputStream通过XmlPullParser拿到className节点信息.
2.加载配置文件中的class。
3.通过switch判断是iqProvider节点的加入到iqProviders中,是streamFeatureProvider节点的加入到sfProviders中。
其实smackim.providers文件跟 smack-config.xml文件一样都是配置文件,只不过smackim.providers是属于一个moudle中的配置文件,而smack-config.xml是核心smack-core Moudle中总得配置文件而已(及Smack配置文件)。
这里梳理一下:所有初始化都是在SmackInitialization的静态代码块中执行的。也就是说那些地方创建获取调用了此类方法,此类的静态代码块就被会执行。
最后看一下那些地方调用了这个方法:
SmackConfiguration类中调用了他获取版本的方法。
在看看那些调用了SmackConfiguration.getVersion();
可以看到很多类中都有调用,并且关键的AbstractXMPPConnection连接类静态代码块中也调用了此方法。
所以我们得出结论:只要你创建对象进行连接操作,框架内部就会帮你初始化好配置。
1.1.2 SmackConfiguration类
这里截图这个类的部分代码
public final class SmackConfiguration {
//...
//...
public static boolean DEBUG = false;
private static SmackDebuggerFactory DEFAULT_DEBUGGER_FACTORY = ReflectionDebuggerFactory.INSTANCE;
/**
* The default parsing exception callback is {@link ExceptionThrowingCallback} which will
* throw an exception and therefore disconnect the active connection.
*/
private static ParsingExceptionCallback defaultCallback = new ExceptionThrowingCallbackWithHint();
private static HostnameVerifier defaultHostnameVerififer;
/**
* Returns the Smack version information, eg "1.3.0".
*
* @return the Smack version information.
*/
public static String getVersion() {
return SmackInitialization.SMACK_VERSION;
}
//...
//...
}
可以看到这个类是final 并且里面全都是静态方法和静态成员变量。其实这个类作用就是在你使用smack前可以进行静态值得配置。比如:SmackConfiguration.DEBUG = true;
小结:
其实Smack的初始化操作都是在smack-core moudle下SmackInitialization类的静态代码中完成的,并且配置文件在同一moudle下的smack-config.xml文件。
SmackConfiguration类getVersion()时,SmackInitialization的静态代码块就会执行。并且关键的AbstractXMPPConnection连接类静态代码块中也调用了此方法,所以只要你创建对象进行连接操作,框架内部就会帮你初始化好配置。
连接与登录
1创建连接类
第一步:创建连接配置
configuration = XMPPTCPConnectionConfiguration.builder()
.setXmppDomain(myibc_domain)
.setHostAddress(InetAddress.getByName(mXmppHost))
.setPort(mXmppPort)
.setSendPresence(false)
.setSecurityMode(ConnectionConfiguration.SecurityMode.required)
.setConnectTimeout(30 * 1000)
.setCompressionEnabled(false)
.setCustomSSLContext(SSLUtils.getSSLCAContext(""))
.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER)
.build();
这里我们用的是TCP的连接,所以我们使用XMPPTCPConnectionConfiguration来创建我们的配置信息。
其实XMPPTCPConnectionConfiguration继承了ConnectionConfiguration抽象类。并且在构造方法调用父类的构造方法。
配置都保存在了ConnectionConfiguration中
public abstract class ConnectionConfiguration {
protected ConnectionConfiguration(Builder<?, ?> builder) {
//略...
authzid = builder.authzid;
username = builder.username;
password = builder.password;
callbackHandler = builder.callbackHandler;
//略...
}
}
第二步:创建用于连接服务器的核心类XMPPTCPConnection
//将连接配置对象当作参数传入给XMPPTCPConnection构造函数
XMPPTCPConnection connection = new XMPPTCPConnection(configuration);
配置信息传入到XMPPTCPConnection和父类AbstractXMPPConnection中各个保存了一份。
- 2 连接
用XMPPTCPConnection实例进行连接
//进行连接
connection.connect();
首先我们了解一下XMPPTCPConnection的继承关系
connect()方法是在AbstractXMPPConnection抽象类中进行实现的。
public abstract class AbstractXMPPConnection implements XMPPConnection {
//...
public synchronized AbstractXMPPConnection connect() throws SmackException, IOException, XMPPException, InterruptedException {
// 检测是否已连接。
throwAlreadyConnectedExceptionIfAppropriate();
// 重置连接状态
initState();
closingStreamReceived = false;
streamId = null;
try {
//1.关键,执行连接的方法
connectInternal();
//判断TLS是否认证过
if (!isSecureConnection() && getConfiguration().getSecurityMode() == SecurityMode.required) {
throw new SecurityRequiredByClientException();
}
} catch (SmackException | IOException | XMPPException | InterruptedException e) {
instantShutdown();
throw e;
}
// 回调已连接监听
connected = true;
callConnectionConnectedListener();
return this;
}
//...
//2.
protected abstract void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException;
}
关键方法是调用connectInternal();此方法是AbstractXMPPConnection类的抽象方法,具体要查看他的实现类。
我们实例的实现类为:
XMPPTCPConnection
public class XMPPTCPConnection extends AbstractXMPPConnection {
@Override
protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
//1.连接
connectUsingConfiguration();
//2.连接成功后,通过流初始化writer Reader。
initConnection();
// TLS handled will be true either if TLS was established, or if it was not mandatory.
waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS");
// Wait with SASL auth until the SASL mechanisms have been received
waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server");
}
private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException {
//略...
SocketFactory socketFactory = config.getSocketFactory();
for (Rfc6120TcpRemoteConnectionEndpoint endpoint : result.discoveredRemoteConnectionEndpoints) {
Iterator<? extends InetAddress> inetAddresses;
String host = endpoint.getHost().toString();
UInt16 portUint16 = endpoint.getPort();
int port = portUint16.intValue();
if (proxyInfo == null) {//无代理
SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory);
final InetAddress inetAddress = inetAddresses.next();
final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress);
//1.这里进行异步的连接
socketFuture.connectAsync(inetSocketAddress, timeout);
try {
//2.获取socket
socket = socketFuture.getOrThrow();
} catch (IOException e) {
RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(
endpoint, inetAddress, e);
connectionExceptions.add(rce);
if (inetAddresses.hasNext()) {
continue innerloop;
} else {
break innerloop;
}
}
} else {
//略...
return;
}
}
// There are no more host addresses to try
// throw an exception and report all tried
// HostAddresses in the exception
throw EndpointConnectionException.from(result.lookupFailures, connectionExceptions);
}
}
connectUsingConfiguration连接方法
可以看见通过用户传入的配置信息,获取到SocketFactory对象,并调用connectAsync()进行异步连接。
主要步骤为两步:
1.异步的连接
socketFactory.connectAsync()
2.获取socket
socket = socketFuture.getOrThrow();
socketFactory.connectAsync() :
public static class SocketFuture extends InternalSmackFuture<Socket, IOException> {
private final Socket socket;
private final Object wasInterruptedLock = new Object();
private boolean wasInterrupted;
public SocketFuture(SocketFactory socketFactory) throws IOException {
socket = socketFactory.createSocket();
}
public void connectAsync(final SocketAddress socketAddress, final int timeout) {
AbstractXMPPConnection.asyncGo(new Runnable() {
@Override
public void run() {
try {
//在这里进行socket的连接操作
socket.connect(socketAddress, timeout);
}
catch (IOException e) {
setException(e);
return;
}
synchronized (wasInterruptedLock) {
if (wasInterrupted) {
closeSocket();
return;
}
}
setResult(socket);
}
});
}
}
到此连接是已经建立了,在初始化写入写出流在看看我们之前XMPPTCPConnection类中connectInternal方法的第二步
initConnection()也是XMPPTCPConnection类中的方法。
private void initConnection() throws IOException, InterruptedException {
compressionHandler = null;
//1.初始化reader和writer实例对象
initReaderAndWriter();
//2.初始化 启动 写入流 线程
packetWriter.init();
//3.启动读线程。startup()方法将阻塞,直到从服务器获取打开的流数据包
packetReader.init();
}
initReaderAndWriter()
private void initReaderAndWriter() throws IOException {
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
if (compressionHandler != null) {
is = compressionHandler.getInputStream(is);
os = compressionHandler.getOutputStream(os);
}
// OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
writer = new OutputStreamWriter(os, "UTF-8");
reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
// If debugging is enabled, we open a window and write out all network traffic.
initDebugger();
}
这里很明白,就是通过socket获取输入输出流,创建writer、reader。
2.packetWriter.init();
PacketWriter对象是在XMPPTCPConnection类的成员变量中创建的
protected final PacketWriter packetWriter = new PacketWriter();
protected final PacketReader packetReader = new PacketReader();
protected class PacketWriter {
//队列
private final ArrayBlockingQueueWithShutdown<Element> queue =
new ArrayBlockingQueueWithShutdown<>(QUEUE_SIZE, true);
//略...
void init() {
shutdownTimestamp = null;
if (unacknowledgedStanzas != null) {
// It's possible that there are new stanzas in the writer queue that
// came in while we were disconnected but resumable, drain those into
// the unacknowledged queue so that they get resent now
drainWriterQueueToUnacknowledgedStanzas();
}
queue.start();
running = true;
//1.开启线程
Async.go(new Runnable() {
@Override
public void run() {
LOGGER.finer(threadName + " start");
try {
//2.写入报文,报文是在queue队列中获取的
writePackets();
} finally {
LOGGER.finer(threadName + " exit");
running = false;
notifyWaitingThreads();
}
}
}, threadName);
}
//...
}
//1.开启线程
//2.写入报文,报文是在queue队列中获取的
这里就不贴出writePackets代码了。
packetReader.init()
我们在看看之前packetReader.init()又做了些什么
可以看到它主要执行的代码为parsePackets() 方法。
通过XmlPullParser parser 解析器来读取收到的内容。
这里有一个问题XmlPullParser 哪里来的呢?我们现在看看他赋值的代码。
在AbstractXMPPConnection类中
private void openStreamAndResetParser() throws IOException, NotConnectedException, InterruptedException {
sendStreamOpen();
resetParser();
}
private void resetParser() throws IOException {
try {
packetReader.parser = SmackXmlParser.newXmlParser(reader);
} catch (XmlPullParserException e) {
throw new IOException(e);
}
}
看到reader对象了还记得我们之前连接成功后initReaderAndWriter()方法的赋值吗?
到这里我们整个连接过程就算完成了。
小结:
1.连接首先会调用AbstractXMPPConnection的connect()方法。
2.方法会调用实现类XMPPTCPConnection的connectInternal().
-
connectInternal方法做2步操作
- connectUsingConfiguration:
- 通过config创建SocketFactory并得到Socket。
- Socket进行异步连接。
-连接成功
-initConnection:
-初始化reader和writer实例对象
-初始化packetWriter 写入也是在报文队列中,实际通过writer对象写入。
-初始化packetReader,报文在队列中有数据时则会读取。实际会通过reader读取。 - connectUsingConfiguration:
- 3 登录
mConnection.login(loginUserId, loginPassword, resource);
调用其实是抽象类AbstractXMPPConnection中的login()方法。
public synchronized void login(CharSequence username, String password, Resourcepart resource) throws XMPPException,SmackException, IOException, InterruptedException {
if (!config.allowNullOrEmptyUsername) {
StringUtils.requireNotNullNorEmpty(username, "Username must not be null nor empty");
}
throwNotConnectedExceptionIfAppropriate("Did you call connect() before login()?");
throwAlreadyLoggedInExceptionIfAppropriate();
usedUsername = username != null ? username.toString() : null;
usedPassword = password;
usedResource = resource;
loginInternal(usedUsername, usedPassword, usedResource);
}
继续调用其实是AbstractXMPPConnection的一个抽象方法
protected abstract void loginInternal(String username, String password, Resourcepart resource)
throws XMPPException, SmackException, IOException, InterruptedException;
我们这里的实现类是XMPPTCPConnection
Override
protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
SmackException, IOException, InterruptedException {
// 使用SASL进行身份验证
SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
streamFeaturesAfterAuthenticationReceived = false;
//1.进行身份认证
authenticate(username, password, config.getAuthzid(), sslSession);
//在身份验证之后等待流特性。
waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");
//如果启用了压缩,则请求服务器使用流压缩
maybeEnableCompression();
//略...
//绑定资源
bindResourceAndEstablishSession(resource);
//略...
//2.登录成功后
afterSuccessfulLogin(false);
}
这个方法有2个地方需要注意
1.进行身份认证
2.登录成功后的状态发送与监听。
首先我们看看authenticate()方法做了些什么。可以看到这里是调用的方法
protected final SASLMechanism authenticate(String username, String password, EntityBareJid authzid,SSLSession sslSession) {
//1.认证
SASLMechanism saslMechanism = saslAuthentication.authenticate(username, password, authzid, sslSession);
//认证成功
afterSaslAuthenticationSuccess();
return saslMechanism;
}
调用的是 saslAuthentication对象authenticate方法。
saslAuthentication其实是在创建连接对象时新建出来的
protected AbstractXMPPConnection(ConnectionConfiguration configuration) {
saslAuthentication = new SASLAuthentication(this, configuration);
//略...
}
我们看看SASLAuthentication类的authenticate方法:
SASLMechanism authenticate(String username, String password, EntityBareJid authzid, SSLSession sslSession)
throws XMPPErrorException, SASLErrorException, IOException,
InterruptedException, SmackSaslException, NotConnectedException, NoResponseException {
//获取相应jid的SASL机制。
final SASLMechanism mechanism = selectMechanism(authzid);
final CallbackHandler callbackHandler = configuration.getCallbackHandler();
final String host = connection.getHost();
final DomainBareJid xmppServiceDomain = connection.getXMPPServiceDomain();
synchronized (this) {
currentMechanism = mechanism;
//这里我们配置未传入callbackHandler,所以走else
if (callbackHandler != null) {
currentMechanism.authenticate(host, xmppServiceDomain, callbackHandler, authzid, sslSession);
}
else {
//(重要),认证。
currentMechanism.authenticate(username, host, xmppServiceDomain, password, authzid, sslSession);
}
final long deadline = System.currentTimeMillis() + connection.getReplyTimeout();
while (!mechanism.isFinished()) {
final long now = System.currentTimeMillis();
if (now >= deadline) break;
// Wait until SASL negotiation finishes
wait(deadline - now);
}
}
mechanism.throwExceptionIfRequired();
return mechanism;
}
在看看对应SASLMechanism 类中的authenticate方法。
public final void authenticate(String username, String host, DomainBareJid serviceName, String password,
EntityBareJid authzid, SSLSession sslSession)
throws SmackSaslException, NotConnectedException, InterruptedException {
this.authenticationId = username;
this.host = host;
this.serviceName = serviceName;
this.password = password;
this.authorizationId = authzid;
this.sslSession = sslSession;
assert authorizationId == null || authzidSupported();
authenticateInternal();
//认证
authenticate();
}
private void authenticate() throws SmackSaslException, NotConnectedException, InterruptedException {
//1.通过配置选择的加密方式加密用户账号,密码
byte[] authenticationBytes = getAuthenticationText();
String authenticationText;
if (authenticationBytes != null && authenticationBytes.length > 0) {
//进行base64编码
authenticationText = Base64.encodeToString(authenticationBytes);
} else {
authenticationText = "=";
}
//发送认证到服务器。
connection.sendNonza(new AuthMechanism(getName(), authenticationText));
}
到这里认证走完了,然后之前在AbstractXMPPConnection类中authenticate方法中还有afterSaslAuthenticationSuccess方法进行执行。
protected void afterSaslAuthenticationSuccess()
throws NotConnectedException, InterruptedException, SmackWrappedException {
sendStreamOpen();
}
protected void sendStreamOpen() throws NotConnectedException, InterruptedException {
CharSequence to = getXMPPServiceDomain();
CharSequence from = null;
CharSequence localpart = config.getUsername();
if (localpart != null) {
from = XmppStringUtils.completeJidFrom(localpart, to);
}
String id = getStreamId();
StreamOpen streamOpen = new StreamOpen(to, from, id, config.getXmlLang(), StreamOpen.StreamContentNamespace.client);
//发送 流打开报文。
sendNonza(streamOpen);
XmlEnvironment.Builder xmlEnvironmentBuilder = XmlEnvironment.builder();
xmlEnvironmentBuilder.with(streamOpen);
outgoingStreamXmlEnvironment = xmlEnvironmentBuilder.build();
}
可以看到这里发送了一个流报文。
最后回到XMPPTCPConnection类中loginInternal方法,现在进行第二步,认证成功后。
Override
protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
SmackException, IOException, InterruptedException {
// 使用SASL进行身份验证
SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
streamFeaturesAfterAuthenticationReceived = false;
//1.进行身份认证
authenticate(username, password, config.getAuthzid(), sslSession);
//在身份验证之后等待流特性。
waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");
//如果启用了压缩,则请求服务器使用流压缩
maybeEnableCompression();
//略...
//绑定资源
bindResourceAndEstablishSession(resource);
//略...
//2.登录成功后
afterSuccessfulLogin(false);
}
查看afterSuccessfulLogin方法首先调用的是XMPPTCPConnection类中的方法然后调用AbstractXMPPConnection抽象类
afterSuccessfulLogin。
protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
// Reset the flag in case it was set
disconnectedButResumeable = false;
super.afterSuccessfulLogin(resumed);
}
protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
if (!resumed) {
authenticatedConnectionInitiallyEstablishedTimestamp = System.currentTimeMillis();
}
this.authenticated = true;
if (debugger != null) {
debugger.userHasLogged(user);
}
//1.回调认证成功的监听。
callConnectionAuthenticatedListener(resumed);
//2.如果配置了出席状态,认证成功后发送available状态
if (config.isSendPresence() && !resumed) {
Presence availablePresence = getStanzaFactory()
.buildPresenceStanza()
.ofType(Presence.Type.available)
.build();
sendStanza(availablePresence);
}
}
可以看见它主要就做了这2步操作:
1.回调认证成功的监听。
2.如果配置了出席状态,认证成功后发送available状态
protected void callConnectionAuthenticatedListener(boolean resumed) {
for (ConnectionListener listener : connectionListeners) {
try {
listener.authenticated(this, resumed);
} catch (Exception e) {
// Catch and print any exception so we can recover
// from a faulty listener and finish the shutdown process
LOGGER.log(Level.SEVERE, "Exception in authenticated listener", e);
}
}
}
到这里整个登录流程就全部走完了。
小结:
其实整个认证过程首先会通过用户的Jid得到SASLMechanism,认证主要是在SASLMechanism类authenticate中进行的,它会通过getAuthenticationText()加密拼接你的用户名密码,这个方法是个抽象,加密的方式就是你之前设置的方式。然后发送认证的报文给服务器。
这时候认证成功了,然后: 1.发送了一个流打开的报文。2.回调认证成功的Listener。3.如果配置了发送状态,认证成功后还是发送一个出席状态的报文。
<h1 id="3">3. 消息的发送</h1>
第一步:创建ChatManager
这里是通过smack-extensions Moudle中ChatManager进行聊天的管理。
chatManager = ChatManager.getInstanceFor(xmppConnection);
public final class ChatManager extends Manager {
private static final Map<XMPPConnection, ChatManager> INSTANCES = new WeakHashMap<>();
public static synchronized ChatManager getInstanceFor(XMPPConnection connection) {
ChatManager chatManager = INSTANCES.get(connection);
if (chatManager == null) {
chatManager = new ChatManager(connection);
INSTANCES.put(connection, chatManager);
}
return chatManager;
}
//略...
}
这里可以看见ChatManager 是继承与Manager抽象类。在通过单例获取ChatManager对象的时候会缓存到静态HashMap内存中,下次如果内存中有就直接获取。
这里贴出Manager类代码。
public abstract class Manager {
final WeakReference<XMPPConnection> weakConnection;
public Manager(XMPPConnection connection) {
Objects.requireNonNull(connection, "XMPPConnection must not be null");
weakConnection = new WeakReference<>(connection);
}
protected final XMPPConnection connection() {
return weakConnection.get();
}
protected final XMPPConnection getAuthenticatedConnectionOrThrow() throws NotLoggedInException {
XMPPConnection connection = connection();
if (!connection.isAuthenticated()) {
throw new NotLoggedInException();
}
return connection;
}
protected static final ScheduledAction schedule(Runnable runnable, long delay, TimeUnit unit) {
return schedule(runnable, delay, unit, ScheduledAction.Kind.NonBlocking);
}
protected static final ScheduledAction scheduleBlocking(Runnable runnable, long delay, TimeUnit unit) {
return schedule(runnable, delay, unit, ScheduledAction.Kind.Blocking);
}
protected static final ScheduledAction schedule(Runnable runnable, long delay, TimeUnit unit, ScheduledAction.Kind scheduledActionKind) {
return AbstractXMPPConnection.SMACK_REACTOR.schedule(runnable, delay, unit, scheduledActionKind);
}
}
第二步:创建Chat对象
Chat chat = chatManager.chatWith(toUserJid);
private final Map<EntityBareJid, Chat> chats = new ConcurrentHashMap<>();
public Chat chatWith(EntityBareJid jid) {
Chat chat = chats.get(jid);
if (chat == null) {
synchronized (chats) {
// Double-checked locking.
chat = chats.get(jid);
if (chat != null) {
return chat;
}
chat = new Chat(connection(), jid);
chats.put(jid, chat);
}
}
return chat;
}
在ChatManager中通过chatWith方法得到Chat对象,如果内存缓存中没有就进行创建。
第二步:发送消息。
Chat chat = chatManager.chatWith(toUserJid);
Message msg = new Message();
msg.setType(Message.Type.chat);
chat.send(msg);
public void send(Message message) throws NotConnectedException, InterruptedException {
switch (message.getType()) {
case normal:
case chat:
break;
default:
throw new IllegalArgumentException("Message must be of type 'normal' or 'chat'");
}
Jid to = lockedResource;
if (to == null) {
to = jid;
}
message.setTo(to);
connection().sendStanza(message);
}
可以看见这里通过我们传入的connection调用sendStanza发送消息。
我们在看一下调用的sendStanza方法是在哪里执行的。
其实sendStanza是XMPPConnection接口的方法,我们用到的XMPPTCPConnection中没实现,在抽象类AbstractXMPPConnection中进行了实现
Override
public final void sendStanza(Stanza stanza) throws NotConnectedException, InterruptedException {
Objects.requireNonNull(stanza, "Stanza must not be null");
assert stanza instanceof Message || stanza instanceof Presence || stanza instanceof IQ;
throwNotConnectedExceptionIfAppropriate();
switch (fromMode) {
case OMITTED:
stanza.setFrom((Jid) null);
break;
case USER:
stanza.setFrom(getUser());
break;
case UNCHANGED:
default:
break;
}
//设置拦截器
Stanza stanzaAfterInterceptors = firePacketInterceptors(stanza);
//发送Stanza
sendStanzaInternal(stanzaAfterInterceptors);
}
可以看到这个方法主要做了2个重要事情:1.设置拦截器。2.发送Stanza。我们关注下sendStanzaInternal方法。
sendStanzaInternal是AbstractXMPPConnection类的一个抽象方法,我们用的是XMPPTCPConnection,所以具体实现查看XMPPTCPConnection中的sendStanzaInternal方法。
protected abstract void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException;
Override
protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException {
//消息写入
packetWriter.sendStreamElement(packet);
if (isSmEnabled()) {
for (StanzaFilter requestAckPredicate : requestAckPredicates) {
if (requestAckPredicate.accept(packet)) {
requestSmAcknowledgementInternal();
break;
}
}
}
}
看到packetWriter是不是似曾相识?是的,这个就是我们之前讲解的连接成功后初始化在内存的那个packetWriter。
我们看看packetWriter中的sendStreamElement做了什么操作。
private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
QUEUE_SIZE, true);
protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
try {
//存入队列
queue.put(element);
}
catch (InterruptedException e) {
// put() may throw an InterruptedException for two reasons:
// 1. If the queue was shut down
// 2. If the thread was interrupted
// so we have to check which is the case
throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
// If the method above did not throw, then the sending thread was interrupted
throw e;
}
}
可以看见我们发送的报文存在了队列中,最终写入的方法是在writePackets里。writePackets方法中会从队列中获取packet,最终其实是PacketWriter类中writer进行了写入操作。PacketWriter和writer在前面smack连接的时候已讲解过,这里不再细述,下面贴出writePackets代码:
小结:
消息发送我们是用了Smack扩展中的ChatManager进行的,在ChatManager中会保存Chat对象,在我们发送消息时根据Jid来获取到chat对象,在消息发送时调用chat.send(),其实最终的实现是在我们的实现类XMPPTCPConnection sendStanzaInternal中,sendStanzaInternal会调用packetWriter的sendStreamElement方法,最终在PacketWriter中sendStreamElement方法会把报文写入自己的内存队列中,在writePackets方法中取出进行发送,最后是PacketWriter类中的writer流进行的写入操作。
<h1 id="4">4. 消息的接收</h1>
第一步: 添加监听IncomingChatMessageListener
这里监听设置在smack-extensions Moudle中ChatManager中
chatManager.addIncomingListener(messageListener);
public interface IncomingChatMessageListener {
void newIncomingMessage(EntityBareJid from, Message message, Chat chat);
}
public final class ChatManager extends Manager {
private final Set<IncomingChatMessageListener> incomingListeners = new CopyOnWriteArraySet<>();
private final AsyncButOrdered<Chat> asyncButOrdered = new AsyncButOrdered<>();
private ChatManager(final XMPPConnection connection) {
super(connection);
//1.设置XMPPConnection 的监听
connection.addSyncStanzaListener(new StanzaListener() {
@Override
public void processStanza(Stanza stanza) {
final Message message = (Message) stanza;
if (!shouldAcceptMessage(message)) {
return;
}
final Jid from = message.getFrom();
final EntityFullJid fullFrom = from.asEntityFullJidOrThrow();
final EntityBareJid bareFrom = fullFrom.asEntityBareJid();
final Chat chat = chatWith(bareFrom);
chat.lockedResource = fullFrom;
//2.这里所有chat都是保存在asyncButOrdered中,监听是在队列里排队回调。
asyncButOrdered.performAsyncButOrdered(chat, new Runnable() {
@Override
public void run() {
//3.回调incomingListeners
for (IncomingChatMessageListener listener : incomingListeners) {
listener.newIncomingMessage(bareFrom, message, chat);
}
}
});
}
}, INCOMING_MESSAGE_FILTER);
//略...
}
//...
}
我们首先分析里面第二步:
监听到时执行的runnable。asyncButOrdered.performAsyncButOrdered,asyncButOrdered源码这里就不贴出来了。
每个chat对象回调监听时都会在AsyncButOrdered 类中存储一个Map<K, Queue<Runnable>>,这里 K 为 chat。
然后进行包装存储在成员变量 Map<K, Handler> threadActiveMap = new HashMap<>() 中。
这里的Handle 是AsyncButOrdered的一个内部类,然后执行run方法。
private class Handler implements Runnable {
private final Queue<Runnable> keyQueue;
private final K key;
Handler(Queue<Runnable> keyQueue, K key) {
this.keyQueue = keyQueue;
this.key = key;
}
@Override
public void run() {
//略。。。
runnable.run();
}
}
最后回到第三步:
asyncButOrdered.performAsyncButOrdered(chat, new Runnable() {
@Override
public void run() {
//3.回调incomingListeners
for (IncomingChatMessageListener listener : incomingListeners) {
listener.newIncomingMessage(bareFrom, message, chat);
}
}
});
这里不难看出为什么要这么做,因为上层可能会有很多监听过来,回调的调用其实也消息同样,量大。并且会有很多不同的chat对象监听返回,smack还进行了区分保存不同的chat回调,并在回调中第三个参数中返回给调用者了。
并且这个回调是异步的,smack在内部维护了一个线程池进行处理。
好,现在我们在回到第一步监听是如何在connection中回调的。
第一步:设置XMPPConnection 的监听
XMPPConnection
void addSyncStanzaListener(StanzaListener stanzaListener, StanzaFilter stanzaFilter);
AbstractXMPPConnection
private final Map<StanzaListener, ListenerWrapper> syncRecvListeners = new LinkedHashMap<>();
@Override
public void addSyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) {
if (packetListener == null) {
throw new NullPointerException("Packet listener is null.");
}
ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
synchronized (syncRecvListeners) {
//添加Listener
syncRecvListeners.put(packetListener, wrapper);
}
}
最终实现是在AbstractXMPPConnection类中实现的。
可以看到保存的是一个ListenerWrapper包装类,第一个参数是回调监听,第二个参数是我们传入的监听过滤器。
现在我们需要看的是那些地方调用了成员变量syncRecvListeners中的回调。
我们这里检索查看是在AbstractXMPPConnection类中invokeStanzaCollectorsAndNotifyRecvListeners方法中进行了调用
invokeStanzaCollectorsAndNotifyRecvListeners()
protected void invokeStanzaCollectorsAndNotifyRecvListeners(final Stanza packet) {
listenersToNotify.clear();
//1.如果用户设置过滤器,则通过过滤消息类型,返回上层监听
extractMatchingListeners(packet, syncRecvListeners, listenersToNotify);
//2.
ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() {
@Override
public void run() {
Iterator<StanzaListener> it = listenersToNotify.iterator();
synchronized (syncRecvListeners) {
while (it.hasNext()) {
StanzaListener stanzaListener = it.next();
if (!syncRecvListeners.containsKey(stanzaListener)) {
it.remove();
}
}
}
for (StanzaListener listener : listenersToNotify) {
try {
listener.processStanza(packet);
} catch (NotConnectedException e) {
LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e);
break;
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
}
}
}
});
}
第一步:如果用户设置过滤器,则通过过滤消息类型,返回上层监听
extractMatchingListeners
private static void extractMatchingListeners(Stanza stanza, Map<StanzaListener, ListenerWrapper> listeners,
Collection<StanzaListener> listenersToNotify) {
synchronized (listeners) {
for (ListenerWrapper listenerWrapper : listeners.values()) {
if (listenerWrapper.filterMatches(stanza)) {//过滤
//加入外部。
listenersToNotify.add(listenerWrapper.getListener());
}
}
}
}
private final StanzaFilter packetFilter;
public boolean filterMatches(Stanza packet) {
return packetFilter == null || packetFilter.accept(packet);
}
这里过滤调用的是包装类传入的那个过滤器。并通过accept(packet)接口进行过滤。好,我们现在在回过头看看这个过滤器传入的啥?
还记得我们之前在ChatManager addSyncStanzaListener() 方法的第二个参数吗?
private static final StanzaFilter INCOMING_MESSAGE_FILTER = new AndFilter(
MESSAGE_FILTER,
FromTypeFilter.ENTITY_FULL_JID
);
注意:这个过滤器是一层一层的,可以层层过滤,所以它的构造过程也是一层一层的,接下来不要眨眼。
public class AndFilter extends AbstractListFilter implements StanzaFilter {
public AndFilter() {
super();
}
public AndFilter(StanzaFilter... filters) {
super(filters);
}
@Override
public boolean accept(Stanza packet) {
for (StanzaFilter filter : filters) {
if (!filter.accept(packet)) {
return false;
}
}
return true;
}
}
可以看见这里传入的构造传入的参数是一个可变参数,他的accept方法调用了StanzaFilter 过滤器的accept方法。
StanzaFilter 其实是一个接口accept是接口 方法,所以具体实现是在其实现类中
我们这里传入了2过滤器,这里只拿出每个构造的第一个作为讲解。
private static final StanzaFilter MESSAGE_FILTER = new AndFilter(
MessageTypeFilter.NORMAL_OR_CHAT,
new OrFilter(MessageWithBodiesFilter.INSTANCE, new StanzaExtensionFilter(XHTMLExtension.ELEMENT, XHTMLExtension.NAMESPACE))
);
public static final StanzaFilter NORMAL_OR_CHAT = new OrFilter(NORMAL, CHAT);
public static final StanzaFilter NORMAL = new MessageTypeFilter(Type.normal);
最后定格在MessageTypeFilter中。
public final class MessageTypeFilter extends FlexibleStanzaTypeFilter<Message> {
private final Message.Type type;
private MessageTypeFilter(Message.Type type) {
super(Message.class);
this.type = type;
}
@Override
protected boolean acceptSpecific(Message message) {
return message.getType() == type;
}
@Override
public String toString() {
return getClass().getSimpleName() + ": type=" + type;
}
}
没有看见accept方法啊?客官,别急嘛!看看他的父类中是否有。
public abstract class FlexibleStanzaTypeFilter<S extends Stanza> implements StanzaFilter {
protected final Class<S> stanzaType;
public FlexibleStanzaTypeFilter(Class<S> packetType) {
this.stanzaType = Objects.requireNonNull(packetType, "Type must not be null");
}
@SuppressWarnings("unchecked")
public FlexibleStanzaTypeFilter() {
stanzaType = (Class<S>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
}
@Override
@SuppressWarnings("unchecked")
public final boolean accept(Stanza packet) {
if (stanzaType.isInstance(packet)) {//1.验证消息class是否匹配
return acceptSpecific((S) packet);//2.实现类自己的过滤
}
return false;
}
protected abstract boolean acceptSpecific(S packet);
@Override
public String toString() {
return getClass().getSimpleName() + ": " + stanzaType.toString();
}
}
果然在父类中看见了accept方法。
1.处:调用了stanzaType.isInstance(packet),这里可以看见stanzaType就是传入的泛型类型。我们在MessageTypeFilter 这里传的是super(Message.class); 所以这个过滤就是过滤到不是Message消息的报文。
2.处:进行2次过滤acceptSpecific()方法,这里他的子类实现是MessageTypeFilter。
可以看见条件是message.getType() == type,所以消息Type也要与传入的type进行匹配。
这里过滤器是如何工作的就讲解完了。
好了,现在可以眨眼了。
回过头我们继续看过滤后的监听操作 [ invokeStanzaCollectorsAndNotifyRecvListeners]
protected void invokeStanzaCollectorsAndNotifyRecvListeners(final Stanza packet) {
listenersToNotify.clear();
//1.如果用户设置过滤器,则通过过滤消息类型,返回上层监听
extractMatchingListeners(packet, syncRecvListeners, listenersToNotify);
//2.
ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() {
@Override
public void run() {
Iterator<StanzaListener> it = listenersToNotify.iterator();
synchronized (syncRecvListeners) {
while (it.hasNext()) {
StanzaListener stanzaListener = it.next();
if (!syncRecvListeners.containsKey(stanzaListener)) {
it.remove();
}
}
}
//3.回调监听
for (StanzaListener listener : listenersToNotify) {
try {
listener.processStanza(packet);
} catch (NotConnectedException e) {
LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e);
break;
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
}
}
}
});
}
extractMatchingListeners方法传入的packet 通过过滤后得到符合条件的监听listenersToNotify 对他们进行回调。
2.处:ASYNC_BUT_ORDERED.performAsyncButOrdered
这里就不在做讲解了,跟之前ChatManager回调监听时处理是一样的,只不过这里hashMap中存储的key为XMPPConnection。
3.处:进行回调的监听。
好,现在监听回调的调用地方算是已经找到了,但是我们想继续看看invokeStanzaCollectorsAndNotifyRecvListeners是在哪里调用过来的。现在就一步一步往上面推。
检索发有2地方进行调用,这里我们的实现类是继承AbstractXMPPConnection类的,所以查看AbstractXMPPConnection类中调用invokeStanzaCollectorsAndNotifyRecvListeners的方法
protected void processStanza(final Stanza stanza) throws InterruptedException {
assert stanza != null;
final SmackDebugger debugger = this.debugger;
if (debugger != null) {
debugger.onIncomingStreamElement(stanza);
}
lastStanzaReceived = System.currentTimeMillis();
// 将传入的数据包传递给侦听器
invokeStanzaCollectorsAndNotifyRecvListeners(stanza);
}
然后在向上检索在AbstractXMPPConnection类中
protected void parseAndProcessStanza(XmlPullParser parser)
throws XmlPullParserException, IOException, InterruptedException {
ParserUtils.assertAtStartTag(parser);
int parserDepth = parser.getDepth();
Stanza stanza = null;
try {
stanza = PacketParserUtils.parseStanza(parser, incomingStreamXmlEnvironment);
}
catch (XmlPullParserException | SmackParsingException | IOException | IllegalArgumentException e) {
CharSequence content = PacketParserUtils.parseContentDepth(parser,
parserDepth);
UnparseableStanza message = new UnparseableStanza(content, e);
ParsingExceptionCallback callback = getParsingExceptionCallback();
if (callback != null) {
callback.handleUnparsableStanza(message);
}
}
ParserUtils.assertAtEndTag(parser);
if (stanza != null) {
//1.调用
processStanza(stanza);
}
}
可以看到此方法把XmlPullParser解析的内容封装成stanza并调用了processStanza方法。
然后在向上面推,看哪里调用的parseAndProcessStanza方法。
检索发现有多处调用,这里我们的实现是XMPPTCPConnection类。
还记得我们之前连接成功后初始化的XMPPTCPConnection的内部类PacketReader吗?
对的,在他的parsePackets方法中,解析了报文的节点,然后进行了调用过程parseAndProcessStanza()
小结:
其他监听其实也是一样的,比如:群聊的监听。
当消息过来时,都是通过用户设置的过滤器过滤符合用户需要的监听进行回调。他们在处理过滤时的方法都是在 invokeStanzaCollectorsAndNotifyRecvListeners方法中进行的处理
到此,本文smack简单的解析就结束了,这里也是自己的一些小的见解,可能会有不完善和不正确的地方,望大家觉得有不准确的地方,欢迎指出并一起探讨。