轻量级RPC设计与实现第三版

在前两个版本中,每次发起请求一次就新建一个netty的channel连接,如果在高并发情况下就会造成资源的浪费,这时实现异步请求就十分重要,当有多个请求线程时,需要设计一个线程池来进行管理。除此之外,当前方法过于依赖注册中心,在高并发情况下对注册中心造成了压力;另外如果注册中心出现宕机等情况,那么整合系统就崩溃了,为了解决这个问题,添加了一个适合高并发的服务缓存机制。以上为该版本的新增内容。

异步请求和线程池

这里就不具体介绍异步请求的概念了。用一个通俗的例子解释,如你在饭店点餐,当你点好餐后,会得到一个点餐号,但是饭菜并不会立即做好送过,需要你等待一段时间,在这个时间段中,你可以做其他的事情,当饭菜做好后,会根据点餐号进行广播,通知你去拿饭菜。这就是一个典型的异步处理。
在项目中涉及到异步的主要有三个自定义类,即ChannelHolder,LwRequestPoolLwRequestManager
ChannelHolder中定义的变量:

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ChannelHolder {
    private Channel channel;
    private EventLoopGroup eventLoopGroup;
}

LwRequestManager中的变量:

private static final ConcurrentHashMap<String, ChannelHolder> channelHolderMap = new ConcurrentHashMap<>();
    private static ExecutorService requestExecutor = new ThreadPoolExecutor(30, 100, 0, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(30),
            new BasicThreadFactory.Builder().namingPattern("request-service-connector-%d").build());

    private static LwRequestPool requestPool = SpringBeanFactory.getBean(LwRequestPool.class);

LwRequestPool中定义的变量:

private final ConcurrentHashMap<String, Promise<LwResponse>> requestPool = new ConcurrentHashMap<>();

刚开始在动态代理中会调用send()方法,开始了有关异步调用的内容。通过requestId来确定是哪个请求,利用线程池执行netty客户端的运行,并利用CountDownLatch来先暂停下面代码的运行,如果latch执行了countDown()方法,会再返回这里执行下面的步骤。

  public static void send(LwRequest request, URL url) throws Exception{
        String requestId = request.getRequestId();
        CountDownLatch latch = new CountDownLatch(1);
        requestExecutor.execute(new NettyClient(requestId, url, latch));
        latch.await();
        ChannelHolder channelHolder = channelHolderMap.get(requestId);
        channelHolder.getChannel().writeAndFlush(request);
        log.info("客户端发送消息:{}", channelHolder);
    }

之后运行Netty客户端中的run()方法,如果与服务端连接成功,将该请求id和对应的channel注册到channelHolderMap变量中,并执行submitRequest方法,将请求id和eventLoop注册到变量requestPool中。最后执行了countDown()方法。

 @Override
public void run() {
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4));
                        pipeline.addLast(new LwRpcEncoder(LwRequest.class, new HessianSerializer()));
                        pipeline.addLast(new LwRpcDecoder(LwResponse.class, new HessianSerializer()));
                        pipeline.addLast(clientHandler);
                    }
                });
        try {
            ChannelFuture future = bootstrap.connect(url.getHostname(), url.getPort()).sync();
            //连接成功
            if (future.isSuccess()) {
                ChannelHolder channelHolder = ChannelHolder.builder()
                        .channel(future.channel())
                        .eventLoopGroup(group).build();
                LwRequestManager.registerChannelHolder(requestId, channelHolder);
                latch.countDown();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

requestPool.submitRequest(requestId, channelHolder.getChannel().eventLoop());

public void submitRequest(String requestId, EventExecutor executor) {
        requestPool.put(requestId, new DefaultPromise<>(executor));
    }

当执行了countDown()方法,会跳转到原来最初的地方,执行剩下的代码部分,进行请求发送。等待服务端的响应。

ChannelHolder channelHolder = channelHolderMap.get(requestId);
        channelHolder.getChannel().writeAndFlush(request);

当客户端接收到服务端发回的结果信息时,会执行notifyRequest方法。

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, LwResponse response) throws Exception {
        lwRequestPool.notifyRequest(response.getRequestId(), response);
    }

notifyRequest方法中,会从变量requestPool中获取到返回的LwResponse变量,并封装在Promise中,最后调用setsuccess()方法。

public void notifyRequest(String requestId, LwResponse response) {
        Promise<LwResponse> promise = requestPool.get(requestId);
        if (promise != null) {
            promise.setSuccess(response);
        }
  }

setsuccess()方法是netty的Promise中的方法。它会通知所有的监听器。在官方解释如下:
Marks this future as a success and notifies all
此时就可以通过fetchResponse根据请求id获取到了服务端发送过来的消息,此时已经执行完毕,需要从requestpool中删除该请求信息。

 LwResponse response = lwRequestPool.fetchResponse(requestId);
 
 public LwResponse fetchResponse(String requestId) throws Exception {
        Promise<LwResponse> promise = requestPool.get(requestId);
        if (promise == null)
            return null;
        LwResponse response = promise.get(10, TimeUnit.SECONDS);
        requestPool.remove(requestId);

        LwRequestManager.destroyChannelHolder(requestId);
        return response;
    }

高并发下的缓存机制

在原来的版本中,每次请求远程服务时,都需要从注册中心获取服务地址,在高并发情况下,会对注册中心造成一定的影响;或者如果注册中心突然宕机,那么就无法获取待服务地址,整个系统就崩溃了。所以设计一个缓存机制,将请求到的服务地址持久化到本地,当下次请求时,就无须再需要注册中心了,直接从持久化文件中获取,减轻了注册中心的压力。

在进行本地缓存时,会先调用saveServices方法,将URL数组信息保存到Properties中,并获取当前version版本号,然后执行doSaveProperties方法来保存到本地。这个步骤支持同步和异步两种方式。

public void saveServices(String serviceName, List<URL> urlList) {
        if (file == null)
            return;
        try {
            StringBuilder buf = new StringBuilder();
            for(URL url : urlList) {
                if (buf.length() > 0) {
                    buf.append(";");
                }
                buf.append(url.getAllInformation());
            }
            properties.setProperty(serviceName, buf.toString());
            long version = lastCacheChanged.incrementAndGet();
            if (syncSaveFile) {
                doSaveProperties(version);
            } else {
                registerCacheExecutor.execute(new SaveProperties(version));
            }

        } catch (Throwable  t) {
            log.warn(t.getMessage(), t);
        }
    }

doSaveProperties方法中,如果传入的版本号不是最新的版本号,说明其他线程已经修改了,内容发生了变化,直接退出。在写入到文件时会添加锁,进一步保证信息的准确性。如果添加失败,会进行重试操作。

private void doSaveProperties(long version) {
        if (version < lastCacheChanged.get())
            return;
        if (file == null)
            return;
        try {
            File lockfile = new File(file.getAbsolutePath() + ".lock");
            if (!lockfile.exists()) {
                lockfile.createNewFile();
            }
            try(RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
            FileChannel channel = raf.getChannel();) {
                FileLock lock = channel.tryLock();
                if (lock == null) {
                    throw new IOException("不能锁住注册的缓存文件");
                }
                try {
                    if (!file.exists()) {
                        file.createNewFile();
                    }
                    try (FileOutputStream outputFile = new FileOutputStream(file)) {
                        properties.store(outputFile, "RPC Server Cache");
                    }
                } finally {
                    lock.release();
                }
            }
        }catch (Throwable e) {
            savePropertiesRetryTimes.incrementAndGet();
            if (savePropertiesRetryTimes.get() > SAVE_MAX_RETRY) {
                log.warn("超过最大重试次数,缓存失败!");
                savePropertiesRetryTimes.set(0);
                return;
            }
            if (version < lastCacheChanged.get()) {
                savePropertiesRetryTimes.set(0);
                return;
            }
            e.printStackTrace();
        }
    }

具体详细代码可以到我的项目中进行查看:轻量级RPC第三版

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,331评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,372评论 3 398
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,755评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,528评论 1 296
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,526评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,166评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,768评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,664评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,205评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,290评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,435评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,126评论 5 349
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,804评论 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,276评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,393评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,818评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,442评论 2 359