手写RPC框架(6)-使用Netty改写后遇到的各种坑你是否清楚

手写RPC框架
1、手写一个RPC框架,看看100个线程同时调用效果如何
2、手写RPC框架(2)-引入zookeeper做服务治理
3、手写RPC框架(3)-引入Hessian序列化工具
4、手写RPC框架(4)-重写服务治理,开启1000个线程看看netty的执行调用情况
5、手写RPC框架(5)-Netty入门了解和实践

在上一篇 手写RPC框架(5)-Netty入门了解和实践 已经简单介绍了Netty怎么用的,这次通过实际代码编写中中遇到的各种问题为切入点介绍使用情况。具体内容如下:

  • 客户端还未等到netty连接上就发起请求(肯定失败)
  • 序列化和反序列化中,netty的粘包、拆包处理器错误使用遇到的问题
  • 客户端同步阻塞等待结果返回
  • 客户端对channel的管理
  • 负载均衡的修改

目录结构

开启1000个线程执行的效果图

image

1、客户端对长连接的管理

每当客户端连接上一个有效的服务端之后,就保持这个channel长连接不断,后续两台机器的数据传递就可以通过这个channel就行,而不需要每次都进行套接字连接一般还需进行3次握手操作,这样可以极大的提高效率。

在这个RPC代码中,在服务端有多台机器的情况下,客户端就需要连接多台机器,对这个channel的管理就非常必要,再合适的添加管理,在channel无效的时候则进行移除操作。

ClientHandler类部分代码

/**
 * ip  ==> channel
 */
private Map<String, Channel> ipMap = new ConcurrentHashMap<>();

/**
 * interface ==>  list(ipA, ipB, ...)
 */
private Map<String, List<String>> interfaceMap = new ConcurrentHashMap<>();
    
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    logger.info("断开 channel:{}", ctx.channel());
    // 需要移除操作
    removeChannel(ctx.channel());
}
    
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.channel().close();
    removeChannel(ctx.channel());
}
    
public void addChannel(Channel channel, String interfaceName, String ip) {
    ipMap.put(ip, channel);
    List<String> ipList = interfaceMap.get(interfaceName);
    if (ipList == null) {
        ipList = new ArrayList<>();
        interfaceMap.put(interfaceName, ipList);
    }
    ipList.add(ip);
}
    
public void removeChannel(Channel channel) {
    Iterator<Map.Entry<String, Channel>> iterator = ipMap.entrySet().iterator();
    while (iterator.hasNext()) {
        Map.Entry<String, Channel> entry = iterator.next();
        if (entry.getValue() == channel) {
            iterator.remove();
        }
    }
}

在上面的代码中,有两个map,一个是ip->channel,另一个是接口名称->ip列表,因为一个channel是关联一个服务机器和一个客户端机器的,此外在负载均衡中也是从众多有效channel选择一个使用,故使用了这两个map存储channel。

Netty断开channel时需要及时移除掉,防止客户端使用无效的channel而出现错误,而上面的addChannel在哪里被调用着呢?那肯定是在Netty进行链接操作完成之后进行的。

ClientConnection类connection方法部分代码

CountDownLatch countDownLatch = new CountDownLatch(1);
ChannelFuture channelFuture = bootstrap.connect(address);
channelFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (channelFuture.isSuccess()) {
            // channel连接完成
            clientHandler.addChannel(channelFuture.channel(), interfaceName, ip);
            // countdownlatch 计时器锁减1操作
            countDownLatch.countDown();
        }
    }
});

这代码看上去会有些莫名其妙,这主要是截取了关键的代码块,在Netty通过bootstrap.connect(address)异步连接后,会立即返回一个Future对象,通过添加监听器的做法获取到连接成功后的channel,后面再添加到channel管理容器中。可能有人需要问了,Netty的handler除了channelInactive事件之外,还有channelActive激活成功,同样可以异步的获取到有效的channel,为什么不选择呢?

答案也很简单,channelActive调用时,无法知道当前连接的业务接口名称等信息,故只能是在connection方法中采取这种方案获取channel和业务接口的关联关系了

更新:写代码期间并未想到使用Netty的Option透传业务数据,类似于ctx.channel().config().getOptions()就可以获取在boostrap链接时设置的数据,CountDownLatch也就不需要了,也不需要进行异步的调用

2、对粘包、拆包的处理

起初使用了hessian序列化工具,但是在调试中一直出现各种各样数据问题(就是拆包和粘包问题,现在已彻底解决),为了方便调试,就放弃了序列化,直接使用了POJO->toString()的操作。

bootstrap.group(work).channel(NioSocketChannel.class)
   .handler(new ChannelInitializer<SocketChannel>() {
       @Override
       protected void initChannel(SocketChannel socketChannel) throws Exception {
           socketChannel.pipeline()
               .addLast(new LineBasedFrameDecoder(1024))
               .addLast(new StringDecoder())
               .addLast(new StringEncoder())
               .addLast(clientHandler);
       }
});

使用StringDecoder和LineBasedFrameDecoder搭配使用,因为LineBasedFrameDecoder是通过\r\n进行切割的,那么转为String格式的时候也需要再加上分隔符。如下代码块

channel.writeAndFlush(JSON.toJSONString(request) + System.getProperty("line.separator"))

后面再使用hessian序列化,如下设置的处理器,一直出现切割字符出错

.addLast(new LineBasedFrameDecoder(1024))
.addLast(new RpcEncoder(RpcRequest.class, serializeProtocol))
.addLast(new RpcDecoder(RpcResponse.class, serializeProtocol))
.addLast(clientHandler);

使用这段代码的时候,因为考虑到使用LineBasedFrameDecoder进行字符处理,在序列完之后添加了System.getProperty("line.separator"))对应的字节数据,经过调试才发现为什么会出现切割的问题,看下图

image

往bytebuf添加完数据字节后,又添加了System.getProperty("line.separator"))数据字节,可以看到bytebuf中的字节数据明显超过65的(测试中实际是255),继续看下图在服务端进行数据获取

image

从bytebuf中获取有效数据长度却只有65,再回顾上一幅图圈中的内容,可以发现经过序列化的字节数就中在索引值为65的地方是0,而System.getProperty("line.separator"))的字节恰好也是0,所以经过序列化之后的数据实际上是:

XXXXXXXXXXX0XXXXXXXXXX0

字节中包含了多个可切割的字节0,原本应该切最后一个,实际上切成了前面一个,使得一个完整的数据被切割成不完整的段,自然导致后面的反序列化失败

/**
 * A decoder that splits the received {@link ByteBuf}s on line endings.
 * <p>
 * Both {@code "\n"} and {@code "\r\n"} are handled.
 * <p>
 * The byte stream is expected to be in UTF-8 character encoding or ASCII. The current implementation
 * uses direct {@code byte} to {@code char} cast and then compares that {@code char} to a few low range
 * ASCII characters like {@code '\n'} or {@code '\r'}. UTF-8 is not using low range [0..0x7F]
 * byte values for multibyte codepoint representations therefore fully supported by this implementation.
 * <p>
 * For a more general delimiter-based decoder, see {@link DelimiterBasedFrameDecoder}.
 */
public class LineBasedFrameDecoder extends ByteToMessageDecoder {

再看LineBasedFrameDecoder类的注释说明,该类适用在UTF-8字符或者ASCII码,而我们序列化生成的字节数据肯定不在其规定的范围内,就导致了数据错误的情况,这也是个误用的典型例子,知其然而不知其所以然

后面改成.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)),使用长度+内容的样式的处理器才顺利解决字符粘包问题。

3、CountDownLatch的使用场景

在调试中屡次发现客户端启动后还未成功链接到服务端就进行服务调用的问题,而且大多出现在客户端启动之后短时间内的调用导致,如下图,此时并没有有效的channel使用,自然就出现错误了,那么如何解决呢?

image

在初次连接使用阻塞模式,必须等到连接完成才认为启动的准备工作完成才可以进行服务调用的操作。

public void connection(String interfaceName, String ip, boolean sync) {
    Channel channel = clientHandler.getChannel(ip);
    if (channel != null) {
        logger.warn("已经连接好了, IP:{}, interface:{}, channel:{}", ip, interfaceName, channel);
        return;
    }
    InetSocketAddress address = CommonUtils.parseIp(ip);

    SerializeProtocol serializeProtocol = rpcClient.getSerializeProtocol();

    EventLoopGroup work = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(work).channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
            xxx
            });
    CountDownLatch countDownLatch = new CountDownLatch(1);
    // 使用了一个计时器锁,阻塞操作
    ChannelFuture channelFuture = bootstrap.connect(address);
    channelFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                clientHandler.addChannel(channelFuture.channel(), interfaceName, ip);
                countDownLatch.countDown();
                // 释放操作
            }
        }
    });
    if (sync) {
        // 如果传入的sync是true
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
        }
    }
}

使用了一个计时器锁CountDownLatch,在最后面如果传入的sync为true则会进行阻塞等到,直到在完成链接后进行countDownLatch.countDown()操作,从而释放锁释,继续执行,如果sync是false,就是非阻塞的模式,通过这sync参数兼容两种模式,在后面客户端阻塞等到获取数据的时候也采用了类似的方案,如下代码

RpcResponseFuture类

public class RpcResponseFuture {
    private RpcResponse response;
    private CountDownLatch countDownLatch;
    public RpcResponseFuture() {
        this.countDownLatch = new CountDownLatch(1);
    }

    public RpcResponse getResponse() throws InterruptedException {
        // 阻塞在这里,直到得到了数据
        this.countDownLatch.await();
        return this.response;
    }

    public void setResponse(RpcResponse response) {
        this.response = response;
        this.countDownLatch.countDown();
    }
}

意图也很明显,直到setResponse方法被调用完成后,getResponse的阻塞方法才被放行

4、负载均衡完善

负载均衡此前是针对IP进行一个选择操作,此方法效率低下而且存在不可用的情况,在上面完善了客户端长连接管理方法的工作后,负载均衡也由对IP的选择修改成对channel的选择。如下代码块,基本无改变,就是把String改成了Channel而已。

public abstract class AbstractLoadBalance implements LoadBalance {

    @Override
    public Channel balance(List<Channel> channelList) {
        if (channelList == null || channelList.isEmpty()) {
            return null;
        }
        if (channelList.size() == 1) {
            return channelList.get(0);
        }
        return doLoad(channelList);
    }

    abstract Channel doLoad(List<Channel> addressList);
}

那么为什么使用IP是,效率低下而且不可用呢?

  • 1、每次获取IP节点后都需要进行三次握手的链接操作,效率低下,无法发挥长连接的功效
  • 2、服务端的IP数据存在在zk节点的,容易出现IP数据变更了,但是客户端还未及时感知,依旧尝试连接,继而出现错误,而channel则可以很高效的感知到是否可用

最后贴一个负载均衡的场景效果图,服务端1启动后,客户端启动,成功的完成一定的任务后,服务端2启动,这时候客户端的负载均衡就会生效,会随机的从服务端1和服务端2选择一个进行调用操作

public static void main(String[] args) {
    RpcClient rpcClient = new RpcClient();
    rpcClient.subscribe(Calculate.class);
    rpcClient.start();

    Calculate<Integer> calculateProxy = rpcClient.getInstance(Calculate.class);

    for(int i=0;i<10;i++) {
        new Thread(() -> { test(calculateProxy);}).start();
    }

    try {
        Thread.sleep(1000 * 20);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    for(int i=0;i<20;i++) {
        new Thread(() -> { test(calculateProxy); }).start();
    }
}

private static void test(Calculate<Integer> calculateProxy) {
    int s1 = new Random().nextInt(100);
    int s2 = new Random().nextInt(100);
    long start = System.currentTimeMillis();
    int s3 = calculateProxy.add(s1, s2);
    logger.info("[" + Thread.currentThread().getName() + "]a: " + s1 + ", b:" + s2 + ", c=" + s3 + ", 耗时:" + (System.currentTimeMillis() - start));
}
image

如圈住的图显示,在执行完第一批10个任务后,新启动了一个服务端的机器,客户端通过zk及时感知到该数据变化后进行连接操作,再紧接着第二批20个任务执行中,就成功的使用了负载均衡的逻辑,从10001和10002两个服务端随机调用,如下图服务端1和服务端2的请求记录。

image

13.17分执行完成后,直到第二波请求在13.18分来到,顺利执行

image

13.17.52启动的新服务的,也在13.18顺利接收到客户端请求,通过负载均衡就可以实现灵活的上下线处理

至此本篇学习笔记就算是完成了,还剩下了心跳检测、断开重连等功能,由于现在设置的请求体格式并不是很完善,故下次改写请求头内容,以便于更好的支持完善心跳检测和断开重连等操作,再后面netty完成就剩下结合spring了。在实际代码编写中由于某些点掌握理解不透,也是经过大量的测试,踩坑不少才勉强做到这样的,代码还有诸多问题,继续学习和完善~

如代码存在的问题欢迎提出~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,744评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,505评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,105评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,242评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,269评论 6 389
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,215评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,096评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,939评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,354评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,573评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,745评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,448评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,048评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,683评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,838评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,776评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,652评论 2 354

推荐阅读更多精彩内容