MQ!Rabbit-client建立链接源码分析

MQ!Rabbit-client建立链接源码分析

参考地址:https://www.rabbitmq.com/api-guide.html


版本: rabbitmq-amqp-client:5.1.2

创建链接示例代码:

// 创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置rabbitmq 服务端所在地址 我这里在本地就是本地
connectionFactory.setHost("127.0.0.1");
// 设置端口号,连接用户名,虚拟地址等
connectionFactory.setPort(5672);
connectionFactory.setUsername("jimmy");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/");
return connectionFactory.newConnection();

源码分析

  • ConnectionFactory
  • Connection

准备工作

connectionFactory.newConnection()分析示例中代码的涉及到创建链接的主要节点

AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
conn.init();
-> 
this.cf.newConnection()
->
FrameHandler frameHandler = factory.create(addr, connectionName());
-> 
RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
conn.start();

涉及代码量还是比较多的。这里为了方便分析需要先了解下 ConnectionFactoryConnection的源码(当然不是一下子都能掌握,先找几个后面能用到的🤪)。

  • ConnectionFactory
// 几个基本信息 
private String username                       = DEFAULT_USER;
private String password                       = DEFAULT_PASS;
private String virtualHost                    = DEFAULT_VHOST;
private String host                           = DEFAULT_HOST;
private int port                              = USE_DEFAULT_PORT;

// 一些超时、最大值限制
private int requestedChannelMax               = DEFAULT_CHANNEL_MAX;
private int requestedFrameMax                 = DEFAULT_FRAME_MAX;
private int requestedHeartbeat                = DEFAULT_HEARTBEAT;
private int connectionTimeout                 = DEFAULT_CONNECTION_TIMEOUT;
private int handshakeTimeout                  = DEFAULT_HANDSHAKE_TIMEOUT;
private int shutdownTimeout                   = DEFAULT_SHUTDOWN_TIMEOUT;


private Map<String, Object> _clientProperties = AMQConnection.defaultClientProperties();
private SocketFactory socketFactory           = null;
private SaslConfig saslConfig                 = DefaultSaslConfig.PLAIN;
private ExecutorService sharedExecutor;
private ThreadFactory threadFactory           = Executors.defaultThreadFactory();
private ExecutorService shutdownExecutor;
// 心跳线程池
private ScheduledExecutorService heartbeatExecutor;
private SocketConfigurator socketConf         = new DefaultSocketConfigurator();
private ExceptionHandler exceptionHandler     = new DefaultExceptionHandler();
// 是否自动重连
private boolean automaticRecovery             = true;

private boolean topologyRecovery              = true;
private long networkRecoveryInterval          = DEFAULT_NETWORK_RECOVERY_INTERVAL;
private RecoveryDelayHandler recoveryDelayHandler;

// 提供的扩展点,提供了部分节点的时候的处理方法。
private MetricsCollector metricsCollector;

// 是否采用nio模式
private boolean nio = false;

// frameHandler工厂
private FrameHandlerFactory frameHandlerFactory;

private NioParams nioParams = new NioParams();

// 安全工厂
private SslContextFactory sslContextFactory;
private int channelRpcTimeout = DEFAULT_CHANNEL_RPC_TIMEOUT;
private boolean channelShouldCheckRpcResponseType = false;
  • AutorecoveringConnection(示例中的代码后续创建的connection实例对象)
// 重要的属性,具体怎么叫等我掌握整体流程的时候或许可以稍微取一个名字🐹
private final RecoveryAwareAMQConnectionFactory cf;

// con的channel集合
private final Map<Integer, AutorecoveringChannel> channels;
// 链接信息
private final ConnectionParams params;
private volatile RecoveryAwareAMQConnection delegate;

private final List<ShutdownListener> shutdownHooks  = Collections.synchronizedList(new ArrayList<ShutdownListener>());
private final List<RecoveryListener> recoveryListeners = Collections.synchronizedList(new ArrayList<RecoveryListener>());
private final List<BlockedListener> blockedListeners = Collections.synchronizedList(new ArrayList<BlockedListener>());

// Records topology changes
private final Map<String, RecordedQueue> recordedQueues = Collections.synchronizedMap(new LinkedHashMap<String, RecordedQueue>());
private final List<RecordedBinding> recordedBindings = Collections.synchronizedList(new ArrayList<RecordedBinding>());
private final Map<String, RecordedExchange> recordedExchanges = Collections.synchronizedMap(new LinkedHashMap<String, RecordedExchange>());
private final Map<String, RecordedConsumer> consumers = Collections.synchronizedMap(new LinkedHashMap<String, RecordedConsumer>());
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = Collections.synchronizedList(new ArrayList<ConsumerRecoveryListener>());
private final List<QueueRecoveryListener> queueRecoveryListeners = Collections.synchronizedList(new ArrayList<QueueRecoveryListener>());

// Used to block connection recovery attempts after close() is invoked.
private volatile boolean manuallyClosed = false;

// This lock guards the manuallyClosed flag and the delegate connection.  Guarding these two ensures that a new connection can never
// be created after application code has initiated shutdown.  
private final Object recoveryLock = new Object();
  • RecoveryAwareAMQConnectionFactory(比较重要所以拿出来看下里面有什么东东)
private final ConnectionParams params;
private final FrameHandlerFactory factory;
private final AddressResolver addressResolver;
private final MetricsCollector metricsCollector;

开始分析

connectionFactory.newConnection() 跟进去之后最后调用的方式connectionFactory.newConnection( executor, addressResolver, clientProvidedName)

首先这个方法是要创建一个connection即获取一个connection实例。而获取这个实例是要从工厂类RecoveryAwareAMQConnectionFactory中获取的。就好比现在我们跟神龙许愿一样,要是想要神龙满足我们的愿望,就要凑齐七颗龙珠(🐍❤🧡💛💙💚💜🖤)。那这条龙的龙珠是什么呢?这就要找到RecoveryAwareAMQConnectionFactory生成实例的方法查看下如何创建的(其实还是点进去看而已🐾)。

下面就是创建实例的方法RecoveryAwareAMQConnectionFactory.createConnection(ConnectionParams,FrameHandler,MetricsCollector ) (太长了凑合看,主要看参数就好)。

那这里的七龙珠就分别是 ConnectionParams FrameHandler MetricsCollector

那这回我们回到connectionFactory.newConnection( executor, addressResolver, clientProvidedName),翠花!上酸菜。(这里挑这次要分析的酸菜来,其他的在说).

...
        if(this.metricsCollector == null) {
            this.metricsCollector = new NoOpMetricsCollector();
        }
        // make sure we respect the provided thread factory
        FrameHandlerFactory fhFactory = createFrameHandlerFactory();
        ConnectionParams params = params(executor);
        // set client-provided via a client property
        if (clientProvidedName != null) {
            Map<String, Object> properties = new HashMap<String, Object>(params.getClientProperties());
            properties.put("connection_name", clientProvidedName);
            params.setClientProperties(properties);
        }

        if (isAutomaticRecoveryEnabled()) {
            // see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
            AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);

            conn.init();
            return conn;
        } else {
            ...
        }
    }

很显然,前几行代码就是在准备龙珠:

// 七龙珠1号 MetricsCollector
if(this.metricsCollector == null) {
    this.metricsCollector = new NoOpMetricsCollector();
}
// 七龙珠2号生产厂家 FrameHandlerFactory
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
// 七龙珠3号 ConnectionParams
ConnectionParams params = params(executor);
// set client-provided via a client property
if (clientProvidedName != null) {
    Map<String, Object> properties = new                     HashMap<String,Object(params.getClientProperties());
    properties.put("connection_name", clientProvidedName);
    params.setClientProperties(properties);
}

接下来就是七龙珠合体了。

// 组装神龙  
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory,     addressResolver, metricsCollector);
// 许愿,给我一个connection吧
conn.init();

那来看下rabbotmq是如何组装神龙和许愿的

public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, AddressResolver addressResolver, MetricsCollector metricsCollector) {
        // 生合成conn的工厂(愿望生成器)
        this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addressResolver, metricsCollector);
        // 属性参数(密码、账号之类的东西)
        this.params = params;
        // conn的 channel集合
        this.channels = new ConcurrentHashMap<Integer, AutorecoveringChannel>();
}

/**
  * Private API.
  * @throws IOException
  * @see    com.rabbitmq.client.ConnectionFactory#newConnection(java.util.concurrent.ExecutorService)
  */
public void init() throws IOException, TimeoutException {
    // 使用愿望生成器获取愿望
    this.delegate = this.cf.newConnection();
    // 自动重连监听器  神龙掉线了就不能实现愿望了,所以要监听它⏱
    this.addAutomaticRecoveryListener(delegate);
}

来看看愿望生成器是如何生成愿望的,生成的时候都干了啥

public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
        Exception lastException = null;
        List<Address> shuffled = shuffle(addressResolver.getAddresses());

        for (Address addr : shuffled) {
            try {
                // 获取frameHandler(通过七龙珠2号生成器生成七龙珠2号)
                // 这里面的源码我不展开读取了,返回了一个包装socket的SocketFrameHandler对象
                FrameHandler frameHandler = factory.create(addr, connectionName());
                // 创建conn (创建愿望)
                RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
                // 启动conn (实现愿望)
                conn.start();
                // 扩展点
                metricsCollector.newConnection(conn);
                return conn;
            } catch (IOException e) {
                lastException = e;
            } catch (TimeoutException te) {
                lastException = te;
            }
        }

        if (lastException != null) {
            if (lastException instanceof IOException) {
                throw (IOException) lastException;
            } else if (lastException instanceof TimeoutException) {
                throw (TimeoutException) lastException;
            }
        }
        throw new IOException("failed to connect");
    }

最后在来看看conn的启动(😵太长了,不想看😭)

public void start() throws IOException, TimeoutException {
    // 初始化消费服务,用于处理channel
    initializeConsumerWorkService();
    // 初始化心跳
    initializeHeartbeatSender();
    this._running = true;
    // Make sure that the first thing we do is to send the header,
    // which should cause any socket errors to show up for us, rather
    // than risking them pop out in the MainLoop
    AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
        new AMQChannel.SimpleBlockingRpcContinuation();
    // We enqueue an RPC continuation here without sending an RPC
    // request, since the protocol specifies that after sending
    // the version negotiation header, the client (connection
    // initiator) is to wait for a connection.start method to
    // arrive.
    // 将rpc任务放入channel(阻塞),最后放入工作线程
    _channel0.enqueueRpc(connStartBlocker);
    try {
        // The following two lines are akin to AMQChannel's
        // transmit() method for this pseudo-RPC.
        // 设置超时时间发送header,这里是将一些数据写入到outputStream
        _frameHandler.setTimeout(handshakeTimeout);
        _frameHandler.sendHeader();
    } catch (IOException ioe) {
        _frameHandler.close();
        throw ioe;
    }
    // 开启主线程 MainLoop
    this._frameHandler.initialize(this);

    AMQP.Connection.Start connStart;
    AMQP.Connection.Tune connTune = null;
    try {
        // Connection.Start
        connStart = (AMQP.Connection.Start)connStartBlocker.getReply(handshakeTimeout/2).getMethod();
        // 服务器信息
        _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());
        Version serverVersion =
                new Version(connStart.getVersionMajor(),connStart.getVersionMinor());

        if (!Version.checkVersion(clientVersion, serverVersion)) {
            throw new ProtocolVersionMismatchException(clientVersion,serverVersion);
        }

        String[] mechanisms = connStart.getMechanisms().toString().split(" ");
        SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);
        if (sm == null) {
            throw new IOException("No compatible authentication mechanism found - " +
                                          "server offered [" + connStart.getMechanisms() + "]");
        }

        LongString challenge = null;
        LongString response = sm.handleChallenge(null, this.username, this.password);

        do {
            Method method = (challenge == null)
                                    ? new AMQP.Connection.StartOk.Builder()
                                              .clientProperties(_clientProperties)
                                              .mechanism(sm.getName())
                                              .response(response)
                                              .build()
                                    : new AMQP.Connection.SecureOk.Builder().response(response).build();

            try {
                // Connection.Tune
                Method serverResponse = 
                    _channel0.rpc(method, handshakeTimeout/2).getMethod();
                if (serverResponse instanceof AMQP.Connection.Tune) {
                    connTune = (AMQP.Connection.Tune) serverResponse;
                } else {
                    challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
                    response = sm.handleChallenge(challenge, this.username,this.password);
                }
            } catch (ShutdownSignalException e) {
                Method shutdownMethod = e.getReason();
                if (shutdownMethod instanceof AMQP.Connection.Close) {
                    AMQP.Connection.Close shutdownClose = 
                        (AMQP.Connection.Close) shutdownMethod;
                    if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
                        throw new AuthenticationFailureException(shutdownClose.getReplyText());
                    }
                }
                throw new PossibleAuthenticationFailureException(e);
            }
        } while (connTune == null);
    } catch (TimeoutException te) {
        _frameHandler.close();
        throw te;
    } catch (ShutdownSignalException sse) {
        _frameHandler.close();
        throw AMQChannel.wrap(sse);
    } catch(IOException ioe) {
        _frameHandler.close();
        throw ioe;
    }

    try {
        // channelMax
        int channelMax =
            negotiateChannelMax(this.requestedChannelMax,connTune.getChannelMax());
        // ChannelManager
        _channelManager = instantiateChannelManager(channelMax, threadFactory);
        // frameMax
        int frameMax =
            negotiatedMaxValue(this.requestedFrameMax,connTune.getFrameMax());
        this._frameMax = frameMax;
        
        int heartbeat =
            negotiatedMaxValue(this.requestedHeartbeat,connTune.getHeartbeat());
        
        // 启动心跳
        setHeartbeat(heartbeat);

        _channel0.transmit(new AMQP.Connection.TuneOk.Builder()
                            .channelMax(channelMax)
                            .frameMax(frameMax)
                            .heartbeat(heartbeat)
                          .build());
        _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
                                  .virtualHost(_virtualHost)
                                .build());
    } catch (IOException ioe) {
        _heartbeatSender.shutdown();
        _frameHandler.close();
        throw ioe;
    } catch (ShutdownSignalException sse) {
        _heartbeatSender.shutdown();
        _frameHandler.close();
        throw AMQChannel.wrap(sse);
    }

    // We can now respond to errors having finished tailoring the connection
    this._inConnectionNegotiation = false;
}

接下来在看下mainLoop线程中做了啥。

public void run() {
    try {
        while (_running) {
            Frame frame = _frameHandler.readFrame();
            // 处理帧
            readFrame(frame);
        }
    } catch (Throwable ex) {
        handleFailure(ex);
    } finally {
        doFinalShutdown();
    }
}

// readFrame(frame)内容
private void readFrame(Frame frame) throws IOException {
    if (frame != null) {
        _missedHeartbeats = 0;
        // 如果是心跳帧则忽略
        if (frame.type == AMQP.FRAME_HEARTBEAT) {
            // Ignore it: we've already just reset the heartbeat counter.
        } else {
            if (frame.channel == 0) { // the special channel
                // 特殊帧
                _channel0.handleFrame(frame);
            } else {
                if (isOpen()) {
                    // If we're still _running, but not isOpen(), then we
                    // must be quiescing, which means any inbound frames
                    // for non-zero channels (and any inbound commands on
                    // channel zero that aren't Connection.CloseOk) must
                    // be discarded.
                    ChannelManager cm = _channelManager;
                    if (cm != null) {
                        ChannelN channel;
                        try {
                            channel = cm.getChannel(frame.channel);
                        } catch(UnknownChannelException e) {
                            // this can happen if channel has been closed,
                            // but there was e.g. an in-flight delivery.
                            // just ignoring the frame to avoid closing the whole connection
                            LOGGER.info("Received a frame on an unknown channel, ignoring it");
                            return;
                        }
                        // 处理
                        channel.handleFrame(frame);
                    }
                }
            }
        }
    } else {
        // Socket timeout waiting for a frame.
        // Maybe missed heartbeat.
        handleSocketTimeout();
    }
}

// channel.handleFrame  后面在进行分析
public void handleFrame(Frame frame) throws IOException {
    AMQCommand command = _command;
    if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
        _command = new AMQCommand(); // prepare for the next one
        handleCompleteInboundCommand(command);
    }
}

总结:

connectionFactory.newConnection() - AutorecoveringConnection.init() - RecoveryAwareAMQConnectionFactory.newConnection() - RecoveryAwareAMQConnection.start()

步骤描述:

  • connectionFactory组装FrameHandlerFactoryMetricsCollectorConnectionParams

  • 构造AutorecoveringConnection实例这个实例中持有FrameHandlerFactory ConnectionParams(这个参数能构造RecoveryAwareAMQConnectionFactory实例cf

    metricsCollector以及连接地址信息。

  • 通过AutorecoveringConnection.init()来初始化A-conn(这里用A-表示是AutorecoveringConnection,下同)

    • 使用 A-conn持有的cf来创建 RecoveryAwareAMQConnection实例delegate
      • 获取每一帧数据的操作句柄FrameHandler
      • 将链接信息ConnectionParams 、操作句柄FrameHandler、 扩展组件metricsCollector传入RecoveryAwareAMQConnection的构造方法获取相应实例R-conn
      • R-conn.start()启动链接然后返回
    • 设置自动重连监听

后记:读起来还是比较费劲的,这个源码估计以后我还得在看2遍才能吃透一部分。目前就是先梳理一部分把,肯定会有遗漏。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 1.消息模型 根据官方文档得知,RabbitMQ有七种消息模型: 1.1 Hello World消息模型 1.1...
    秃头猿猿阅读 266评论 0 5
  • 在企业应用系统领域,会面对不同系统之间的通信、集成与整合,尤其当面临异构系统时,这种分布式的调用与通信变得越发重要...
    寒剑飘零阅读 497评论 0 0
  • 什么叫消息队列? 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复...
    Agile_dev阅读 2,391评论 0 24
  • 消息队列之 RabbitMQ 转载自:消息队列值RabbitMQ 关于消息队列,从前年开始断断续续看了些资料,想写...
    哥本哈根月光阅读 581评论 0 0
  • 夜莺2517阅读 127,761评论 1 9