1.Channel
1.1 AbstractChannel
AbstractChannel基本没做什么
public abstract class AbstractChannel extends AbstractPeer implements Channel {
public AbstractChannel(URL url, ChannelHandler handler) {
super(url, handler);
}
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (isClosed()) {
throw new RemotingException(this, "Failed to send message "
+ (message == null ? "" : message.getClass().getName()) + ":" + message
+ ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress());
}
}
@Override
public String toString() {
return getLocalAddress() + " -> " + getRemoteAddress();
}
}
1.2 NettyChannel
其内部实现由io.netty.channel.Channel完成,attribute则由attributes完成
final class NettyChannel extends AbstractChannel {
private static final Logger logger = LoggerFactory.getLogger(NettyChannel.class);
private final Channel channel;
private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();
private NettyChannel(Channel channel, URL url, ChannelHandler handler) {
super(url, handler);
if (channel == null) {
throw new IllegalArgumentException("netty channel == null;");
}
this.channel = channel;
}
}
2.Server
2.1 AbstractServer
AbstractServer实现了默认的Server接口,并从URL中获取了很多必要的信息,构造函数就是一个模板方法,将doOpen方法留给子类实现
public abstract class AbstractServer extends AbstractEndpoint implements Server {
protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
ExecutorService executor;
private InetSocketAddress localAddress;
private InetSocketAddress bindAddress;
private int accepts;
private int idleTimeout = 600; //600 seconds
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
}
2.2 NettyServer
NettyServer是真正启动Netty的Server端的流程
public class NettyServer extends AbstractServer implements Server {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
private Map<String, Channel> channels; // <ip:port, channel>
private ServerBootstrap bootstrap;
private io.netty.channel.Channel channel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
}
3.Client
3.1 AbstractClient
类似AbstractServer,AbstractClient也是一个模板类, doOpen方法留给子类实现
public abstract class AbstractClient extends AbstractEndpoint implements Client {
protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";
private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger();
private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true));
private final Lock connectLock = new ReentrantLock();
private final boolean send_reconnect;
private final AtomicInteger reconnect_count = new AtomicInteger(0);
// Reconnection error log has been called before?
private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false);
// reconnect warning period. Reconnect warning interval (log warning after how many times) //for test
private final int reconnect_warning_period;
private final long shutdown_timeout;
protected volatile ExecutorService executor;
private volatile ScheduledFuture<?> reconnectExecutorFuture = null;
// the last successed connected time
private long lastConnectedTime = System.currentTimeMillis();
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
// The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
try {
doOpen();
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
// connect.
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}
}
3.2 NettyClient
同样NettyClient也是Netty的Client实现
public class NettyClient extends AbstractClient {
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
private Bootstrap bootstrap;
private volatile Channel channel; // volatile, please copy reference to use
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(NioSocketChannel.class);
if (getTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyClientHandler);
}
});
}
}
4.相关适配器
dubbo定义了很多的适配器,都是通过构造函数传入参数,内部实现均是代理实现
public class ClientDelegate implements Client {
private transient Client client;
public ClientDelegate() {
}
public ClientDelegate(Client client) {
setClient(client);
}
public Client getClient() {
return client;
}
public void setClient(Client client) {
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client;
}
@Override
public void reset(URL url) {
client.reset(url);
}
@Override
@Deprecated
public void reset(com.alibaba.dubbo.common.Parameters parameters) {
reset(getUrl().addParameters(parameters.getParameters()));
}
@Override
public URL getUrl() {
return client.getUrl();
}
@Override
public InetSocketAddress getRemoteAddress() {
return client.getRemoteAddress();
}
@Override
public void reconnect() throws RemotingException {
client.reconnect();
}
}
...