线程是每个应用都必须关系的事情,毕竟任何服务器的资源都是有限的,服务线程过少的容易发生阻塞,服务线程过多的话上下文切换的开销又会影响效率,所以合适的线程模型对于一个高性能的应用来说必不可少。Dubbo作为一个带有服务治理功能的RPC框架,在线程模型上也有自己的处理,今天就让我们一起来看一下Dubbo的线程模型。
下面我们要看一下默认情况下的线程模型:
首先明确一个基本概念:IO线程和业务线程的区别
- IO线程:配置在netty连接点的用于处理网络数据的线程,主要处理编解码等直接与网络数据打交道的事件。
- 业务线程:用于处理具体业务逻辑的线程,可以理解为自己在provider上写的代码所执行的线程环境。
Dubbo默认采用的是长链接的方式,即默认情况下一个consumer和一个provider之间只会建立一条链接,这种情况下IO线程的工作就是编码和解码数据,监听具体的数据请求,直接通过Channel发布数据等等;二业务线程就是处理IO线程处理之后的数据,业务线程并不知道任何跟网络相关的内容,只是纯粹的处理业务逻辑,在业务处理逻辑的时候往往存在复杂的逻辑,所以业务线程池的配置往往都要比IO线程池的配置大很多。
Dubbo中线程相关参数的含义
iothreads:指定IO线程池(worker)的线程数量,默认情况下为CPU个数+1,因为这个线程的工作内容比较简单,所以一般情况下我们不会去配置这个值,除非IO线程的响应速度明显拖慢了整个工程的响应,IO线程的默认类型是CacheThreadPool,一分钟的线程死亡时间。
threadpool:业务线程的具体线程类型,默认采用的fixed线程池,即线程数量一定的线程池,这种线程池的好处就是不会频繁创建线程线程,适合线业务比较密集的应用。因为这个数据只管关系到服务的并发情况,所以在需要的时候可以适当调整该数量来增加工程的并发。
threads:该参数就是业务线程池的核心线程数配置,默认情况下为200。如果空间有条件的话可以适当地提升该数量,例如提升至400或者500都是可以的。
queues:该数量指定来在初始化业务线程池时候是否需要排队队列,如果不设置的话,业务线程池的排队队列是SynchronousQueue,即不允许业务事件排队,如果线程池没有空闲线程之后会直接排除异常信息。但是如果配置来queues之后则会使用LinkedBlockingQueue作为排队队列,queues则代表队列的初始队列。因为queues的配置直接关系到排队,所以在一般情况下建议不要配置,因为线程池满的情况下一般期望是直接失败,然后调用其他的机器,而不是再次队列继续等待,继续等待不仅可能会拉低响应时间,而且很有可能会超时。
acceptes:我们知道threadpool,threads和queues都是控制业务线程池的字段,而acceptes就是控制IO线程池的字段。这个字段标示着服务端可接受的最大长连接数,默认情况下为不限制,但是有时候为来保护服务器防止连接数过多导致请求失败率过高,则可以考虑设置该字段为一个定值。
connections:既然服务端可以设置最大接收的连接数,那么客户端也可以设置与服务端建立的连接数。connections可以配置在reference上表示要同对应的服务器建立的长链接数量,默认为只建立一条链接,如果配置来connections的话则会建立N条长链接以提供消费者的吞吐量。但是有一点需要注意是如果conenctions的数量配置大于服务端的accepts的话,超出的部分会直接报错,表示不支持更多的链接,该值不宜配置过多,因为如果多个消费者都配置来该值的话很容易到值服务端的accepts超过预期数量而报错。
-
dispatcher:这个字段代表的是IO线程池和业务线程池的边界,具体有这么几种类型,下面我们一一详细看看:
- all:所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。对应的是AllChannelHandler(具体这个Handler的处理位置以及他的作用,见前几篇博客,这里不再强调)
public class AllChannelHandler extends WrappedChannelHandler { public AllChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } //链接事件通过线程池处理 public void connected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try{ cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED)); }catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t); } } //链接断开事件通过线程池处理 public void disconnected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try{ cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED)); }catch (Throwable t) { throw new ExecutionException("disconnect event", channel, getClass()+" error when process disconnected event ." , t); } } //数据接收事件通过线程池处理 public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } //异常事件通过线程池梳理 public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try{ cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception)); }catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass()+" error when process caught event ." , t); } } private ExecutorService getExecutorService() { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } return cexecutor; } }
- direct:所有消息都不派发到线程池,全部在IO线程上直接执行。(这种做法在绝大多数情况下都不合理,因为毕竟业务逻辑相关对IO事件都是复杂的)。具体的实现方式就是在装饰者的层级上直接下调,不再包装线程池。
- message:只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在IO线程上执行。
/** * 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在IO线程上执行 */ public class MessageOnlyChannelHandler extends WrappedChannelHandler { public MessageOnlyChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } //接收到消息时候触发,无论是服务端接收到请求数据还是客户端接收到返回数据 public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } }
具体的做法其实很简单,相对于all来说只不过只会会将received事件在线程池中处理,其他的一概以默认方式处理(IO线程池)。
- execution:官方的说法是:只请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在IO线程上执行。相对于message来说,限制的更死了,也就是只有服务端的业务逻辑才会执行在业务线程池中执行。消费端如果收到的消息之后,处理逻辑还是IO线程上执行。但是实际情况是我看到的代码显示execution与all的处理逻辑几乎一样,并没有体现出官方的说法。 具体实现如下:
public class ExecutionChannelHandler extends WrappedChannelHandler { public ExecutionChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } //处理链接建立事件 public void connected(Channel channel) throws RemotingException { executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED)); } //处理链接断开事件 public void disconnected(Channel channel) throws RemotingException { executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED)); } //处理数据收到的事件 public void received(Channel channel, Object message) throws RemotingException { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } //处理异常事件 public void caught(Channel channel, Throwable exception) throws RemotingException { executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception)); } }
有可能是我的理解还不到位,对于其其他的用处没有理解到,如果这里有问题的话还请大家指出。
- connection:在IO线程上,将连接建立以及断开事件放入队列,有序逐个执行,其它消息派发到线程池。具体实现如下:
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler { protected final ThreadPoolExecutor connectionExecutor; private final int queuewarninglimit ; public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) { super(handler, url); String threadName = url.getParameter(Constants.THREAD_NAME_KEY,Constants.DEFAULT_THREAD_NAME); //初始化一个单独处理链接建立和断开的无界队列连接池 connectionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)), new NamedThreadFactory(threadName, true), new AbortPolicyWithReport(threadName, url) ); //预警排队数量 queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE); } //连接建立事件直接在单独的线程池中处理 public void connected(Channel channel) throws RemotingException { try{ checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED)); }catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t); } } //连接断开事件直接在单独的线程池中处理 public void disconnected(Channel channel) throws RemotingException { try{ checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED)); }catch (Throwable t) { throw new ExecutionException("disconnected event", channel, getClass()+" error when process disconnected event ." , t); } } //数据接收事件还是在业务线程池中处理 public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } //异常事件还是在业务线程池中处理 public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } try{ cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception)); }catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass()+" error when process caught event ." , t); } } //检查排队数量是否大于预警数量(默认为1000),如果炒过的话就打WARNING日志 private void checkQueueLength(){ if (connectionExecutor.getQueue().size() > queuewarninglimit){ logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: "+connectionExecutor.getQueue().size()+" exceed the warning limit number :"+queuewarninglimit)); } } }
这种dispatcher的意义就在于将将连接事件与IO线程池和业务线程池分开处理,是其不会相互干扰。假如在网络不稳定的环境下,不会因为频繁的网络抖动影响实际的业务处理效率。
关于dubbo线程模型的内容应该都已经讲完了,具体怎么配置还要根据实际的业务场景。