我其实不擅长写线程,主要也是练得少,所以这次在看到层层嵌套的线程写法时,真是一脸懵逼。
越来越认识到读源码真是一个令人头大的过程,非要耐下性子一层层往下探才能了解到设计者的初衷,但是一旦你读到最底层,明白了整个逻辑,这种爽快感也是无可比拟的。
套用一位好友的话:
所有的代码逻辑都是可以放在脑子里跑的,这便是确定性的契机,代码的世界里还没有什么灵异可言,只要冷静分析,总能寻到问题的根源。
所以即使是在做一件未知的事情,也有把握可以在有限的时间内达成。
这也是我耐下性子的原因,因为学得到东西,因为看得到未来,所以也就有了动力。
共勉。
——————————————
继上一篇的连接连接之后(见http://www.jianshu.com/p/d41f32ca22a5),客户端的的下一步操作一般是
ActiveMQSession session = (ActiveMQSession) connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 消息的目的地,消息发送到那个队列
Destination destination = session.createQueue(QUEUE_NAME);
// 创建消息发送者
MessageProducer producer =session.createProducer(destination);
// 设置是否持久化
TextMessagemessage = session.createTextMessage(msg);
// 发送消息到目的地方
producer.send(message);
在JMS协议中,所有的发送和消费的操作都是在session(会话)中完成的,一个连接中可以包含多个session,那么session到底是什么?
今天主要来看一下session的建立过程,session的一切都从createSession这个方法开始:
connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
在ActiveMQConnection类中找到createSession的方法:
1. 确认连接是否正常
2. 确认客户端的连接信息是否被关系
3. 校验事务型session的ack模式是否正确
4. 校验ack模式是否在范围内
5. 用ActiveMQSession的构造函数构造出一个ActiveMQSession
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
//检查连接有没关闭
checkClosedOrFailed();
//确认ConnectionInfo已经被发送
ensureConnectionInfoSent();
if(!transacted)
{
//如果transacted为非事务,而acknowledgeMode为事务SESSION_TRANSACTED,抛出异常
if(acknowledgeMode == 0)
throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
//acknowledgeMode不在0-3范围之内,这
if(acknowledgeMode < 0 || acknowledgeMode > 4)
throw new JMSException((new StringBuilder()).append("invalid acknowledgeMode: ").append(acknowledgeMode).append(". Valid values are Session.AUTO_ACKNOWLEDGE (1), ").append("Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)").toString());
}
return new ActiveMQSession(this, getNextSessionId(), transacted ? 0 : acknowledgeMode != 0 ? acknowledgeMode : 1, isDispatchAsync(), isAlwaysSessionAsync());
}
构造函数里做了这些事情:
1. 会话参数配置
2. 执行器的建立
3. 加入Connection的session队列并启动会话
protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
//一般性参数设置,包括开启DEBUG,连接,Ack模式,Async模式
this.debug = LOG.isDebugEnabled();
this.connection = connection;
this.acknowledgementMode = acknowledgeMode;
this.asyncDispatch = asyncDispatch;
this.sessionAsyncDispatch = sessionAsyncDispatch;
//从ConnectionInfo中构造出SessionInfo
this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
//事物上下文
setTransactionContext(new TransactionContext(connection));
stats = new JMSSessionStatsImpl(producers, consumers);
//异步发送消息设置
this.connection.asyncSendPacket(info);
setTransformer(connection.getTransformer());
//BlobMessage的传输设置
setBlobTransferPolicy(connection.getBlobTransferPolicy());
//连接执行器
this.connectionExecutor=connection.getExecutor();
//会话执行器
this.executor = new ActiveMQSessionExecutor(this);
//在连接中加入这个Session
connection.addSession(this);
//启动这个Session(其实就是上一篇中讲过的启动这个session中所有的消费者)
if (connection.isStarted()) {
start();
}
}
问题:可以看到方法里先进行connection.isStarted()的判断才启动线程呢,这其实就是如果不调用connection.start()方法,就无法消费的原因。但是为什么要这么设计?
可以看到最重要的是最后的start()方法,再来回顾一下:
1. Consumer的启动
2. executor的启动
/**
* Start this Session.
*
* @throws JMSException
*/
protected void start() throws JMSException {
started.set(true);
for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
ActiveMQMessageConsumer c = iter.next();
c.start();
}
executor.start();
}
Consumer的启动我们放在后续再看,主要是exectuor,在构造函数里可以看到session的executor就是ActiveMQSessionExecutor的实例,所以又跳到了ActiveMQSessionExecutor这个类里:
1. 构造函数里判断是否支持消费优先级,如果支持就新建SimplePriorityMessageDispatchChannel,否则新建FifoMessageDispatchChannel
2. start()方法中判断如果MessageQueue没启动,就启动messageQueue,如果发现messageQueue中存在未消费的消息,就wakeup
3. wakeup中唤醒一个taskRunner。
//构造函数
ActiveMQSessionExecutor(ActiveMQSession session) {
this.session = session;
if (this.session.connection != null && this.session.connection.isMessagePrioritySupported()) {
this.messageQueue = new SimplePriorityMessageDispatchChannel();
}else {
this.messageQueue = new FifoMessageDispatchChannel();
}
}
//executor的启动
synchronized void start() {
if (!messageQueue.isRunning()) {
messageQueue.start();
if (hasUncomsumedMessages()) {
wakeup();
}
}
}
//判断是否有未消费的消息
public boolean hasUncomsumedMessages() {
return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty();
}
//唤醒
public void wakeup() {
if (!dispatchedBySessionPool) {
if (session.isSessionAsyncDispatch()) {
try {
TaskRunner taskRunner = this.taskRunner;
if (taskRunner == null) {
synchronized (this) {
if (this.taskRunner == null) {
if (!isRunning()) {
// stop has been called
return;
}
this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
"ActiveMQ Session: " + session.getSessionId());
}
taskRunner = this.taskRunner;
}
}
taskRunner.wakeup();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
while (iterate()) {
}
}
}
}
看完有个疑惑,messageQueue是什么?messageQueue的判断贯穿了整个session的建立过程,在构造函数里可以看到messageQueue是FifoMessageDispatchChannel或SimplePriorityMessageDispatchChannel的一个实例,我们先不考虑支持优先级的情况,看看先入先出(FIFO)的messageQueue是什么。
public class FifoMessageDispatchChannel implements MessageDispatchChannel {
//锁
private final Object mutex = new Object();
//存放消息的链表
private final LinkedList<MessageDispatch> list;
private boolean closed;
private boolean running;
public FifoMessageDispatchChannel() {
this.list = new LinkedList<MessageDispatch>();
}
}
MessageDispatch其实可以看做是Message的一层封装。所以messageQueue其实真的是字面意思,先入先出的消息队列。
public class MessageDispatch extends BaseCommand {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH;
protected ConsumerId consumerId;
protected ActiveMQDestination destination;
protected Message message;
protected int redeliveryCounter;
protected transient long deliverySequenceId;
protected transient Object consumer;
protected transient TransmitCallback transmitCallback;
protected transient Throwable rollbackCause;
看完messageQueue,我们可以想到在初始化的时候,Session中的这个messageQueue其实是没有消息的,而且从源码读来,大部分的功能都取决于messageQueue是否是空(wakeup())。所以我们可以其实可以看到session的主要功能应该是针对消费者的。在有未消费的消息的时候,在wakeup中新建了一个taskRunner,这个对象是从connection中建立的,来看一下这个方法:
public TaskRunnerFactory getSessionTaskRunner() {
synchronized (this) {
if (sessionTaskRunner == null) {
sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
}
}
return sessionTaskRunner;
}
TaskRunnerFactory工厂类嘛,看看是如何建立TaskRunner的:
public TaskRunner createTaskRunner(Task task, String name) {
init();
ExecutorService executor = executorRef.get();
if (executor != null) {
return new PooledTaskRunner(executor, task, maxIterationsPerRun);
} else {
return new DedicatedTaskRunner(task, name, priority, daemon);
}
}
public void init() {
if (!initDone.get()) {
// If your OS/JVM combination has a good thread model, you may want to
// avoid using a thread pool to run tasks and use a DedicatedTaskRunner instead.
//翻译:如果你的操作系统或JVM支持一个优秀的线程模型,你可能不希望使用thread pool,而是使用DedicatedTaskRunner。
synchronized(this) {
//need to recheck if initDone is true under the lock
//判断是否需要使用dedicatedTaskRunner
if (!initDone.get()) {
if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) {
executorRef.set(null);
} else {
executorRef.compareAndSet(null, createDefaultExecutor());
}
LOG.debug("Initialized TaskRunnerFactory[{}] using ExecutorService: {}", name, executorRef.get());
initDone.set(true);
}
}
}
}
createTaskRunner整个逻辑是这样的:
1. 初始化,判断是否使用DedicatedTaskRunner,如果不使用,就建立一个DefaultExecutor,此时executorRef不为空。如果使用,则把executorRef置为空。
2. 尝试从executorRef中取出一个线程,如果可以取出来(表示使用了DefaultExecutor),使用PooledTaskRunner来丰富这个线程。如果取不出来,则新建一个DedicatedTaskRunner。
可以看到createTaskRunner里出现了两种类型:DedicatedTaskRunner和DefaultExecutor。研究了一下两种类型的代码,大致可以总结如下:
- DefaultExecutor是建立了一个JVM线程池ThreadPoolExecutor
- DedicatedTaskRunner是一个单独的线程,我认为应该是依靠操作系统或者JVM的线程管理来实现多个线程的并发
我直接看了看DedicatedTaskRunner的代码,这个类实例化出的对象里才建立了线程,代码不多,我觉得这个类的写法很值得学习一下。
class DedicatedTaskRunner implements TaskRunner {
private static final Logger LOG = LoggerFactory.getLogger(DedicatedTaskRunner.class);
private final Task task;
private final Thread thread;
private final Object mutex = new Object();
private boolean threadTerminated;
private boolean pending;
private boolean shutdown;
public DedicatedTaskRunner(final Task task, String name, int priority, boolean daemon) {
// 这个task其实是实例化ActiveMQSessionExecutor的对象
this.task = task;
// 建了个线程,线程里调用runTask()方法
thread = new Thread(name) {
@Override
public void run() {
try {
runTask();
} finally {
LOG.trace("Run task done: {}", task);
}
}
};
thread.setDaemon(daemon);
thread.setName(name);
thread.setPriority(priority);
thread.start();
}
@Override
public void wakeup() throws InterruptedException {
synchronized (mutex) {
if (shutdown) {
return;
}
pending = true;
mutex.notifyAll();
}
}
/**
* shut down the task
*
* @param timeout
* @throws InterruptedException
*/
@Override
public void shutdown(long timeout) throws InterruptedException {
LOG.trace("Shutdown timeout: {} task: {}", timeout, task);
synchronized (mutex) {
shutdown = true;
pending = true;
mutex.notifyAll();
// Wait till the thread stops ( no need to wait if shutdown
// is called from thread that is shutting down)
if (Thread.currentThread() != thread && !threadTerminated) {
mutex.wait(timeout);
}
}
}
/**
* shut down the task
*
* @throws InterruptedException
*/
@Override
public void shutdown() throws InterruptedException {
shutdown(0);
}
// 执行任务的主方法
final void runTask() {
try {
// 用一个死循环来不停执行。
while (true) {
// 获取mutex锁,设置线程为非挂起状态并判断是否线程关闭,若关闭则返回
synchronized (mutex) {
pending = false;
if (shutdown) {
return;
}
}
LOG.trace("Running task {}", task);
// 调用ActiveMQSessionExecutor的iterate()方法,见下文
if (!task.iterate()) {
// 获取mutex锁,判断线程是否关闭,如果没关闭,则判断线程是否为非挂起状态,若是则释放CPU,等待线程被唤醒。
synchronized (mutex) {
if (shutdown) {
return;
}
while (!pending) {
mutex.wait();
}
}
}
}
} catch (InterruptedException e) {
// Someone really wants this thread to die off.
Thread.currentThread().interrupt();
} finally {
// Make sure we notify any waiting threads that thread
// has terminated.
synchronized (mutex) {
threadTerminated = true;
mutex.notifyAll();
}
}
}
}
学习一下,下次写线程也可以用这种模式。
其实不管是建立了哪种线程,线程的执行方法都是调用ActiveMQSessionExecutor类中的iterate()方法:
public boolean iterate() {
// Deliver any messages queued on the consumer to their listeners.
// 把排队在消费者上的消息发送到消费者的监听器上
for (ActiveMQMessageConsumer consumer : this.session.consumers) {
if (consumer.iterate()) {
return true;
}
}
// No messages left queued on the listeners.. so now dispatch messages queued on the session
// 如果没有消息排队在监听器上,则把排队在session中的消息发送给消费者
MessageDispatch message = messageQueue.dequeueNoWait();
if (message == null) {
return false;
} else {
dispatch(message);
return !messageQueue.isEmpty();
}
}
//消息分发的方法
void dispatch(MessageDispatch message) {
// TODO - we should use a Map for this indexed by consumerId
// 这个方法里对这个session下所有的consumer进行一次循环,根据消息的consumerID来判断消息该往哪发。
// 在一般的使用场景下,一个session只会用一个consumer,所以看起来没什么问题,但是这个逻辑仍然是低效的。
// 可以看到TODO里表示应该用一个MAP来直接找到consumer。
for (ActiveMQMessageConsumer consumer : this.session.consumers) {
ConsumerId consumerId = message.getConsumerId();
if (consumerId.equals(consumer.getConsumerId())) {
consumer.dispatch(message);
break;
}
}
}
总结
会话的创建最主要的工作是:
1. 配置会话中的参数,根据连接是否start来判断是否启动所有消费者,启动会话执行器ActiveMQSessionExecutor。
2. 在启动会话执行器时,如果消息分发通道处于未启动状态,则启动消息分发通道(FIFO或者SimplePriority),如果有未消费的消息,唤醒消息执行器。
3. 唤醒主要做的是是通过任务执行工厂TaskRunnerFactory创建执行任务PooledTaskRunner或DedicatedTaskRunner,两种TaskRunner都是ActiveMQSessionExecutor的包装,TaskRunner执行就是执行ActiveMQSessionExecutor的iterate(),这个过程主要是将消息发送到消费者的listener上,或通过遍历消费者,将消息分配给消费者。
简单概括:
一个ActiveMQConnection可对应多个ActiveMQSession
一个ActiveMQSession对应一个ActiveMQSessionExecutor
通过ActiveMQSessionExecutor来进行消息向消费者的分配