使用并发工具实现 RPC 调用流量控制

前言

RPC 服务中,每个服务的容量都是有限的,即资源有限,只能承受住给定的网络请求,所以,在设计 RPC 框架的时候,一定要考虑流量控制这个问题。而 Java 中,实现流量控制有很多中方式,今天说 2 种。

Semaphore 实现流控

代码:

  static Semaphore semaphore = new Semaphore(100);

  public static void main(String[] args) {

    Executor timeTask = Executors.newScheduledThreadPool(1);
    ((ScheduledExecutorService) timeTask).scheduleAtFixedRate(
        () -> semaphore.release(100 - semaphore.availablePermits()), 1000, 1000,
        TimeUnit.MILLISECONDS);

    Executor pool = Executors.newFixedThreadPool(100);

    for (int i = 0; i < 100; i++) {
      final int num = i;
      pool.execute(() -> {
        for (; ; ) {
          for (int j = 0; j < 200; j++) {
            if (semaphore.tryAcquire()) {
              callRpc(num, j);
            } else {
              System.err.println("call fail");
            }
          }
        }
      });
    }
  }

  private static void callRpc(int num, int j) {
    System.out.println(String.format("%s - %s: %d %d", new Date(), Thread.currentThread(), num, j));
  }

代码中,我们模拟了 100 个线程,每个线程无限调用 RPC。

同时使用另一个定时任务,定时更新 Semaphore 可用许可为 100。

客户端线程调用时,会尝试获取信号量,当获取成功时,才会调用调用 RPC,反之,打印失败。

这个小程序实现了每秒钟限制 100 个请求的 RPC 的流量控制。

AtomicInteger 实现流控

代码:

  static AtomicInteger count = new AtomicInteger();

  public static void main(String[] args) {
    Executor timeTask = Executors.newScheduledThreadPool(1);
    ((ScheduledExecutorService) timeTask).scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        count.getAndSet(100);
      }
    }, 1000, 1000, TimeUnit.MILLISECONDS);

    Executor pool = Executors.newFixedThreadPool(100);

    for (int i = 0; i < 100; i++) {
      final int num = i;
      pool.execute(() -> {
        for (; ; ) {
          for (int j = 0; j < 200; j++) {
            if (count.get() >= 0) {// 快速判断,否则大量的 CAS 操作将会定时任务更新计数器 count
              if (count.decrementAndGet() >= 0) {
                callRpc(num, j);
              }
            }
          }
        }
      });
    }
  }

  private static void callRpc(int num, int j) {
    System.out.println(String.format("%s - %s: %d %d", new Date(), Thread.currentThread(), num, j));
  }

这段代码和上面的类似,只是使用的 API 不同,这里使用的是 CAS。通过对 CAS 递减,达到流控的目的。

注意,这里有一个双重判断,先判断 count.get() >= 0,为什么呢?

如果直接使用 decrementAndGet 方法,则会使用 CAS,100 个线程并发使用 CAS ,将会导致定时任务的 CAS 操作不够及时。

所以,先判断,是否小于0 ,如果小于0了,就不必尝试 CAS,避免影响定时任务。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 172,976评论 25 708
  • 掀开你的红盖头 惊现一摊黑色漩涡 怎知劲头愈加强烈 一步一步靠近 恋上她的味道 闭眼回味无穷 脑里尽现缠绕画面 有...
    courage9869阅读 245评论 0 0
  • 深夜一个人抱着平板躺在自己的小窝里,看完了2011年法国票房冠军《无法触碰》。 看完以后的感想就是,我还没有理解为...
    柚子粒阅读 3,177评论 3 3
  • 经过了无休止的噩梦循环,六点半我决定不再睡了。最近一段时间真的是鲜有的困难时期,第一是因为家里唯一的爷们失业,女儿...
    人生何处惹尘埃阅读 88评论 0 0