1 FastThreadLocalThread
- FastThreadLocalThread继承Thread,对于原理的hash code或散键的方式获得指定情况采用一个索引的方式获取数据,利用空间的优势换取速度
Summary:
Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
简介:
Netty是一个NIO客户端服务器框架,支持快速、简单地开发协议服务器和客户端等网络应用程序。它大大简化和流线网络编程,如TCP和UDP套接字服务器。
2、ThreadLocal
- ThreadLocal 常用的就是用于线程上下文传递信息,常用的方法为get和set,get方法从ThreadLocal中取出数据,与之对应的set将数据存储到ThreadLocal
- 主要是通过当前thread,获得到ThreadLocalMap对象,如果对象之前存在那么将其添加,map.set 这个调用如果有冲突那么在后面追加,如果不存在将其创建。ThreadLocalMap 本身也是ThreadLocal 的一个静态内部类,该类中主要包括一个Entry,Entry类继承了WeakReference,这也是为了在一定程度上防止内存泄漏。
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
2、 ChannelPipeline
- ChannelPipeline本身是一个与ChannelContext有关的容器的对象存放一个又一个ChannelContext的ChannelHandler,而ChannelHandlerContext维护这对应的ChannelHandler对象,内置存放着一个又一个的ChannelHandler对象,而ChannelHandler不是我们自己创建的,而是Netty帮我们创建的,因此通过ChannelHandlerContext是连接,然而ChannelHandler与ChannelHandlerContext的关系。(难点在初始化连接时)
3、DefaultChannelPipeline
public class DefaultChannelPipeline implements ChannelPipeline {
//...略
/**
* 当AbstractChannel注册的时候被设置为true,设置之后以后就不会被改变。
*/
private boolean registered;
//...略
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);//判断是否已经添加过。
newCtx = newContext(group, filterName(name, handler), handler);//重要方法,创建有一个context
addLast0(newCtx);//最后添加到Pipeline的handlers集合里边的对象,准确的说不是handler,而是context。
//如果注册的是false,这意味着通道还没有在eventloop上注册。
//在本例中,我们将上下文添加到管道中,并添加一个将调用的任务
// ChannelHandler.handlerAdded(…)一旦注册了通道。
//如果registered是false,意味着channel没有在事件循环组中注册过,
//这种情况下我们将context添加到pipeline 当中,并且添加一个回调任务,当channel 被注册的时候,回调任务会执行
//ChannelHandler.handlerAdded(...)方法。
if (!registered) {
newCtx.setAddPending();//将当前context挂起。
callHandlerCallbackLater(newCtx, true);//建议一个线程任务稍后执行。
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
//我们自己重写的handler的handlerAdded方法会被执行。
callHandlerAdded0(newCtx);
return this;
}
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {//不是共享的,并且被添加过直接抛出异常
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;//设置added 标志位为true
}
}
//创建一个context,this是DefaultChannelPipeline,group为null,
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
//如果name为空,生成一个名字
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
return generateName(handler);
}
//判断名字是否重复
checkDuplicateName(name);
return name;
}
//生成名字的私有方法,nameCaches是一个FastThreadLocal(ThreadLocal原生ThreadLocal的封装,区别在于ThreadLocal是使用hash散列的
方式,而FastThreadLocal使用的是数组,用的索引定位,比ThreadLocal性能上稍微快了一些,可以看到netty对性能要求非常高。)
private String generateName(ChannelHandler handler) {
Map<Class<?>, String> cache = nameCaches.get();
Class<?> handlerType = handler.getClass();
String name = cache.get(handlerType);
if (name == null) {
name = generateName0(handlerType);
cache.put(handlerType, name);
}
//用户不太可能在同一类型中放置多个处理程序,但是一定要避免
//任何名称冲突。注意,我们不缓存这里生成的名称。
if (context0(name) != null) {
String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
for (int i = 1;; i ++) {
String newName = baseName + i;
if (context0(newName) == null) {
name = newName;
break;
}
}
}
return name;
}
//添加一个context到pipline操作(pipline默认只有tail和head2个节点),其实就是双向 链表的添加节点的操作。
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
//建立一个稍后执行的任务。
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
//将新建的任务添加到链表里边
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
//context被添加到pipline之后调用callHandlerAdded0,我们自己写的handler的handlerAdded方法会被执行,这也是handlerAdded
//为什么会被首先执行的原因。
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
...略
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
...略
4、DefaultChannelHandlerContext
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;//持有Handler的引用,从这里可以看出一个context对应一个Handler。
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
//获取持有的Handler
public ChannelHandler handler() {
return handler;
}
//入栈处理器是ChannelInboundHandler的实现
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
//出栈处理器是ChannelOutboundHandler的实现
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
}
5、DefaultChannelHandlerContext的super构造器结构:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;//赋值pipeline( private final DefaultChannelPipeline pipeline;)
//DefaultChannelPipeline 持有Channel的引用
this.executor = executor;
this.inbound = inbound;//入栈处理器
this.outbound = outbound;//出栈处理器
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
- 可以看到DefaultChannelHandlerContext持有pipeline 、handler 、channel(DefaultChannelPipeline的接口ChannelPipeline有 Channel channel();方法),Context是这三者的一个桥梁,并且pipline里边添加的对象准确的说不是handler而是Context,而Context持有handler 对象,到此为止我们已经非常清楚的知道addlast方法的逻辑是什么样子了。
我们回到ServerBootstrap的init方法看一下ChannelInitializer:
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
ChannelInitializer
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
....略
/**
*当initChannel方法被调用完毕返回的时候,当前ChannelInitializer对象会被从pipline里边删除掉。
*/
protected abstract void initChannel(C ch) throws Exception;
/**
* {@inheritDoc} If override this method ensure you call super!
* 如果重写,确保调用父类的方法。
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
//对于当前的DefaultChannelPipeline实现,这应该总是正确的。
//在handlerAdded(…)中调用initChannel(…)的好处是没有订单
//如果一个通道初始化器将添加另一个通道初始化器,会让人感到惊讶。这是所有的处理程序
//将按预期顺序添加。
initChannel(ctx);
}
}
//
@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
//初始化。
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
掉完initChannel之后从pipline删除当前对象
remove(ctx);
}
return true;
}
return false;
}
//删除逻辑,首先拿到ChannelPipeline ,然后remove掉
private void remove(ChannelHandlerContext ctx) {
try {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
} finally {
//同时删除对应的context
initMap.remove(ctx);
}
}
- ChannelInitializer的使命就是对handlers的一个暂时的封装处理,把所有的handler添加到pipline之后,他的使命就完成了,所以调用完initChannel之后会被清除掉。
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(8192));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TextWebSocketFrameHandler());
}
}