ActiveMQ源码解析(二)会话是什么

我其实不擅长写线程,主要也是练得少,所以这次在看到层层嵌套的线程写法时,真是一脸懵逼。
越来越认识到读源码真是一个令人头大的过程,非要耐下性子一层层往下探才能了解到设计者的初衷,但是一旦你读到最底层,明白了整个逻辑,这种爽快感也是无可比拟的。

套用一位好友的话:

所有的代码逻辑都是可以放在脑子里跑的,这便是确定性的契机,代码的世界里还没有什么灵异可言,只要冷静分析,总能寻到问题的根源。
所以即使是在做一件未知的事情,也有把握可以在有限的时间内达成。

这也是我耐下性子的原因,因为学得到东西,因为看得到未来,所以也就有了动力。

共勉。
——————————————
继上一篇的连接连接之后(见http://www.jianshu.com/p/d41f32ca22a5),客户端的的下一步操作一般是

ActiveMQSession session = (ActiveMQSession) connection.createSession(false,Session.AUTO_ACKNOWLEDGE);  
// 消息的目的地,消息发送到那个队列  
Destination destination = session.createQueue(QUEUE_NAME);  
// 创建消息发送者  
MessageProducer producer =session.createProducer(destination);  
// 设置是否持久化  
TextMessagemessage = session.createTextMessage(msg);  
// 发送消息到目的地方  
producer.send(message);  

在JMS协议中,所有的发送和消费的操作都是在session(会话)中完成的,一个连接中可以包含多个session,那么session到底是什么?
今天主要来看一下session的建立过程,session的一切都从createSession这个方法开始:

connection.createSession(false,Session.AUTO_ACKNOWLEDGE);  

在ActiveMQConnection类中找到createSession的方法:
1. 确认连接是否正常
2. 确认客户端的连接信息是否被关系
3. 校验事务型session的ack模式是否正确
4. 校验ack模式是否在范围内
5. 用ActiveMQSession的构造函数构造出一个ActiveMQSession

 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException  {  
        //检查连接有没关闭  
        checkClosedOrFailed();  
        //确认ConnectionInfo已经被发送
        ensureConnectionInfoSent();  
        if(!transacted)  
        {     
        //如果transacted为非事务,而acknowledgeMode为事务SESSION_TRANSACTED,抛出异常  
            if(acknowledgeMode == 0)  
                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");  
            //acknowledgeMode不在0-3范围之内,这  
        if(acknowledgeMode < 0 || acknowledgeMode > 4)  
                throw new JMSException((new StringBuilder()).append("invalid acknowledgeMode: ").append(acknowledgeMode).append(". Valid values are Session.AUTO_ACKNOWLEDGE (1), ").append("Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)").toString());  
        }  
        return new ActiveMQSession(this, getNextSessionId(), transacted ? 0 : acknowledgeMode != 0 ? acknowledgeMode : 1, isDispatchAsync(), isAlwaysSessionAsync());  
    }  

构造函数里做了这些事情:
1. 会话参数配置
2. 执行器的建立
3. 加入Connection的session队列并启动会话

    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
        //一般性参数设置,包括开启DEBUG,连接,Ack模式,Async模式
        this.debug = LOG.isDebugEnabled();
        this.connection = connection;
        this.acknowledgementMode = acknowledgeMode;
        this.asyncDispatch = asyncDispatch;
        this.sessionAsyncDispatch = sessionAsyncDispatch;
        //从ConnectionInfo中构造出SessionInfo
        this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
        //事物上下文
        setTransactionContext(new TransactionContext(connection));
        stats = new JMSSessionStatsImpl(producers, consumers);
        //异步发送消息设置
        this.connection.asyncSendPacket(info);
        setTransformer(connection.getTransformer());
        //BlobMessage的传输设置
        setBlobTransferPolicy(connection.getBlobTransferPolicy());
        //连接执行器
        this.connectionExecutor=connection.getExecutor();
        //会话执行器
        this.executor = new ActiveMQSessionExecutor(this);
        //在连接中加入这个Session
        connection.addSession(this);
        //启动这个Session(其实就是上一篇中讲过的启动这个session中所有的消费者)
        if (connection.isStarted()) {
            start();
        }
    }

问题:可以看到方法里先进行connection.isStarted()的判断才启动线程呢,这其实就是如果不调用connection.start()方法,就无法消费的原因。但是为什么要这么设计?

可以看到最重要的是最后的start()方法,再来回顾一下:
1. Consumer的启动
2. executor的启动

    /**
     * Start this Session.
     *
     * @throws JMSException
     */
    protected void start() throws JMSException {
        started.set(true);
        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
            ActiveMQMessageConsumer c = iter.next();
            c.start();
        }
        executor.start();
    }

Consumer的启动我们放在后续再看,主要是exectuor,在构造函数里可以看到session的executor就是ActiveMQSessionExecutor的实例,所以又跳到了ActiveMQSessionExecutor这个类里:
1. 构造函数里判断是否支持消费优先级,如果支持就新建SimplePriorityMessageDispatchChannel,否则新建FifoMessageDispatchChannel
2. start()方法中判断如果MessageQueue没启动,就启动messageQueue,如果发现messageQueue中存在未消费的消息,就wakeup
3. wakeup中唤醒一个taskRunner。

     //构造函数
    ActiveMQSessionExecutor(ActiveMQSession session) {
        this.session = session;
        if (this.session.connection != null && this.session.connection.isMessagePrioritySupported()) {
           this.messageQueue = new SimplePriorityMessageDispatchChannel();
        }else {
            this.messageQueue = new FifoMessageDispatchChannel();
        }
    }
    //executor的启动
    synchronized void start() {
        if (!messageQueue.isRunning()) {
            messageQueue.start();
            if (hasUncomsumedMessages()) {
                wakeup();
            }
        }
    }
    //判断是否有未消费的消息
    public boolean hasUncomsumedMessages() {
        return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty();
    }
    //唤醒
    public void wakeup() {
        if (!dispatchedBySessionPool) {
            if (session.isSessionAsyncDispatch()) {
                try {
                    TaskRunner taskRunner = this.taskRunner;
                    if (taskRunner == null) {
                        synchronized (this) {
                            if (this.taskRunner == null) {
                                if (!isRunning()) {
                                    // stop has been called
                                    return;
                                }
                                this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
                                        "ActiveMQ Session: " + session.getSessionId());
                            }
                            taskRunner = this.taskRunner;
                        }
                    }
                    taskRunner.wakeup();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            } else {
                while (iterate()) {
                }
            }
        }
    }

看完有个疑惑,messageQueue是什么?messageQueue的判断贯穿了整个session的建立过程,在构造函数里可以看到messageQueue是FifoMessageDispatchChannel或SimplePriorityMessageDispatchChannel的一个实例,我们先不考虑支持优先级的情况,看看先入先出(FIFO)的messageQueue是什么。

public class FifoMessageDispatchChannel implements MessageDispatchChannel {
    //锁
    private final Object mutex = new Object();
    //存放消息的链表
    private final LinkedList<MessageDispatch> list;
    private boolean closed;
    private boolean running;

    public FifoMessageDispatchChannel() {
        this.list = new LinkedList<MessageDispatch>();
    }
}

MessageDispatch其实可以看做是Message的一层封装。所以messageQueue其实真的是字面意思,先入先出的消息队列。

public class MessageDispatch extends BaseCommand {

    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH;

    protected ConsumerId consumerId;
    protected ActiveMQDestination destination;
    protected Message message;
    protected int redeliveryCounter;

    protected transient long deliverySequenceId;
    protected transient Object consumer;
    protected transient TransmitCallback transmitCallback;
    protected transient Throwable rollbackCause;

看完messageQueue,我们可以想到在初始化的时候,Session中的这个messageQueue其实是没有消息的,而且从源码读来,大部分的功能都取决于messageQueue是否是空(wakeup())。所以我们可以其实可以看到session的主要功能应该是针对消费者的。在有未消费的消息的时候,在wakeup中新建了一个taskRunner,这个对象是从connection中建立的,来看一下这个方法:

    public TaskRunnerFactory getSessionTaskRunner() {
        synchronized (this) {
            if (sessionTaskRunner == null) {
                sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
                sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
            }
        }
        return sessionTaskRunner;
    }

TaskRunnerFactory工厂类嘛,看看是如何建立TaskRunner的:

    public TaskRunner createTaskRunner(Task task, String name) {
        init();
        ExecutorService executor = executorRef.get();
        if (executor != null) {
            return new PooledTaskRunner(executor, task, maxIterationsPerRun);
        } else {
            return new DedicatedTaskRunner(task, name, priority, daemon);
        }
    }

    public void init() {
        if (!initDone.get()) {
            // If your OS/JVM combination has a good thread model, you may want to
            // avoid using a thread pool to run tasks and use a DedicatedTaskRunner instead.
            //翻译:如果你的操作系统或JVM支持一个优秀的线程模型,你可能不希望使用thread pool,而是使用DedicatedTaskRunner。
            synchronized(this) {
                //need to recheck if initDone is true under the lock
                //判断是否需要使用dedicatedTaskRunner
                if (!initDone.get()) {
                    if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) {
                        executorRef.set(null);
                    } else {
                        executorRef.compareAndSet(null, createDefaultExecutor());
                    }
                    LOG.debug("Initialized TaskRunnerFactory[{}] using ExecutorService: {}", name, executorRef.get());
                    initDone.set(true);
                }
            }
        }
    }

createTaskRunner整个逻辑是这样的:
1. 初始化,判断是否使用DedicatedTaskRunner,如果不使用,就建立一个DefaultExecutor,此时executorRef不为空。如果使用,则把executorRef置为空。
2. 尝试从executorRef中取出一个线程,如果可以取出来(表示使用了DefaultExecutor),使用PooledTaskRunner来丰富这个线程。如果取不出来,则新建一个DedicatedTaskRunner。

可以看到createTaskRunner里出现了两种类型:DedicatedTaskRunner和DefaultExecutor。研究了一下两种类型的代码,大致可以总结如下:

  1. DefaultExecutor是建立了一个JVM线程池ThreadPoolExecutor
  2. DedicatedTaskRunner是一个单独的线程,我认为应该是依靠操作系统或者JVM的线程管理来实现多个线程的并发

我直接看了看DedicatedTaskRunner的代码,这个类实例化出的对象里才建立了线程,代码不多,我觉得这个类的写法很值得学习一下。

class DedicatedTaskRunner implements TaskRunner {

    private static final Logger LOG = LoggerFactory.getLogger(DedicatedTaskRunner.class);
    private final Task task;
    private final Thread thread;

    private final Object mutex = new Object();
    private boolean threadTerminated;
    private boolean pending;
    private boolean shutdown;

    public DedicatedTaskRunner(final Task task, String name, int priority, boolean daemon) {
        // 这个task其实是实例化ActiveMQSessionExecutor的对象
        this.task = task;
        // 建了个线程,线程里调用runTask()方法
        thread = new Thread(name) {
            @Override
            public void run() {
                try {
                    runTask();
                } finally {
                    LOG.trace("Run task done: {}", task);
                }
            }
        };
        thread.setDaemon(daemon);
        thread.setName(name);
        thread.setPriority(priority);
        thread.start();
    }

    @Override
    public void wakeup() throws InterruptedException {
        synchronized (mutex) {
            if (shutdown) {
                return;
            }
            pending = true;
            mutex.notifyAll();
        }
    }

    /**
     * shut down the task
     *
     * @param timeout
     * @throws InterruptedException
     */
    @Override
    public void shutdown(long timeout) throws InterruptedException {
        LOG.trace("Shutdown timeout: {} task: {}", timeout, task);
        synchronized (mutex) {
            shutdown = true;
            pending = true;
            mutex.notifyAll();

            // Wait till the thread stops ( no need to wait if shutdown
            // is called from thread that is shutting down)
            if (Thread.currentThread() != thread && !threadTerminated) {
                mutex.wait(timeout);
            }
        }
    }

    /**
     * shut down the task
     *
     * @throws InterruptedException
     */
    @Override
    public void shutdown() throws InterruptedException {
        shutdown(0);
    }
    // 执行任务的主方法
    final void runTask() {

        try {
            // 用一个死循环来不停执行。
            while (true) {
                // 获取mutex锁,设置线程为非挂起状态并判断是否线程关闭,若关闭则返回
                synchronized (mutex) {
                    pending = false;
                    if (shutdown) {
                        return;
                    }
                }

                LOG.trace("Running task {}", task);
                // 调用ActiveMQSessionExecutor的iterate()方法,见下文
                if (!task.iterate()) {
                    // 获取mutex锁,判断线程是否关闭,如果没关闭,则判断线程是否为非挂起状态,若是则释放CPU,等待线程被唤醒。
                    synchronized (mutex) {
                        if (shutdown) {
                            return;
                        }
                        while (!pending) {
                            mutex.wait();
                        }
                    }
                }

            }

        } catch (InterruptedException e) {
            // Someone really wants this thread to die off.
            Thread.currentThread().interrupt();
        } finally {
            // Make sure we notify any waiting threads that thread
            // has terminated.
            synchronized (mutex) {
                threadTerminated = true;
                mutex.notifyAll();
            }
        }
    }
}

学习一下,下次写线程也可以用这种模式。

其实不管是建立了哪种线程,线程的执行方法都是调用ActiveMQSessionExecutor类中的iterate()方法:

    public boolean iterate() {

        // Deliver any messages queued on the consumer to their listeners.
        // 把排队在消费者上的消息发送到消费者的监听器上
        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
            if (consumer.iterate()) {
                return true;
            }
        }

        // No messages left queued on the listeners.. so now dispatch messages queued on the session
        // 如果没有消息排队在监听器上,则把排队在session中的消息发送给消费者
        MessageDispatch message = messageQueue.dequeueNoWait();
        if (message == null) {
            return false;
        } else {
            dispatch(message);
            return !messageQueue.isEmpty();
        }
    }
    //消息分发的方法
    void dispatch(MessageDispatch message) {
        // TODO - we should use a Map for this indexed by consumerId
        // 这个方法里对这个session下所有的consumer进行一次循环,根据消息的consumerID来判断消息该往哪发。
        // 在一般的使用场景下,一个session只会用一个consumer,所以看起来没什么问题,但是这个逻辑仍然是低效的。
        // 可以看到TODO里表示应该用一个MAP来直接找到consumer。
        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
            ConsumerId consumerId = message.getConsumerId();
            if (consumerId.equals(consumer.getConsumerId())) {
                consumer.dispatch(message);
                break;
            }
        }
    }

总结

会话的创建最主要的工作是:
1. 配置会话中的参数,根据连接是否start来判断是否启动所有消费者,启动会话执行器ActiveMQSessionExecutor。
2. 在启动会话执行器时,如果消息分发通道处于未启动状态,则启动消息分发通道(FIFO或者SimplePriority),如果有未消费的消息,唤醒消息执行器。
3. 唤醒主要做的是是通过任务执行工厂TaskRunnerFactory创建执行任务PooledTaskRunner或DedicatedTaskRunner,两种TaskRunner都是ActiveMQSessionExecutor的包装,TaskRunner执行就是执行ActiveMQSessionExecutor的iterate(),这个过程主要是将消息发送到消费者的listener上,或通过遍历消费者,将消息分配给消费者。

简单概括:
一个ActiveMQConnection可对应多个ActiveMQSession
一个ActiveMQSession对应一个ActiveMQSessionExecutor
通过ActiveMQSessionExecutor来进行消息向消费者的分配

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352

推荐阅读更多精彩内容