关于Java并发框架的鸟瞰视图

写文章

草稿保存中…

邀请预览

发布

为什么问题

几年前,当NoSQL像其他所有团队一样趋势时,我们的团队也热衷于新的和令人兴奋的事情; 我们计划在其中一个应用程序中更改数据库。但是当我们深入了解实现的细节时,我们记得一个聪明人曾经说过的“魔鬼在细节中”,最终,我们意识到NoSQL并不是解决所有问题的灵丹妙药,而且答案是答案NoSQL VS RDMS是:  “这取决于。” 同样,在去年,像RxJava和Spring Reactor这样的并发库正在热情洋溢地发表声明,就像异步,非阻塞方法一样,等等。为了不再犯同样的错误,我们试图评估像ExecutorService,RxJava,Disruptor和Akka这样的并发框架如何彼此不同,以及如何为各个框架确定正确的用例。

本文中使用的术语进行更详细的描述  在这里

用于分析并发框架的示例用例

快速刷新线程配置

在进行并发框架的比较之前,让我们快速回顾一下如何配置最佳线程数以提高并行任务的性能。该理论适用于所有框架,并且在所有框架中使用相同的线程配置来测量性能。

对于内存中任务,线程数大约等于具有最佳性能的内核数,尽管它可以根据相应处理器中的超线程功能进行一些更改。

例如,在8核机器中,如果对应用程序的每个请求必须并行执行四个内存中任务,那么此机器上的负载应保持@ 2 req / sec,其中包含8个线程  ThreadPool。

对于I / O任务,配置的线程数  ExecutorService 应基于外部服务的延迟。

与内存中任务不同,I / O任务中涉及的线程将被阻止,并且它将处于等待状态,直到外部服务响应或超时。因此,当涉及I / O任务时,由于线程被阻塞,应增加线程数以处理并发请求的额外负载。

应该以保守的方式增加I / O任务的线程数,因为处于活动状态的许多线程带来了上下文切换的成本,这将影响应用程序性能。为避免这种情况,应该按照I / O任务中涉及的线程的等待时间成比例地增加该机器的确切线程数和负载数。

参考:http//baddotrobot.com/blog/2013/06/01/optimum-number-of-threads/

表现结果

性能测试在GCP中运行 - >处理器型号名称:Intel(R)Xeon(R)CPU @ 2.30GHz; 架构:x86_64; 核心数量:8(注意:这些结果对于这个用例是主观的,并不意味着一个框架比另一个框架更好)。

标签请求数I / O任务的线程池大小以ms为单位的平均延迟(50 req / sec)所有操作都按顺序排列〜10000NA〜2100使用Executor Service并行化IO任务,并使用HTTP线程进行内存中任务〜1000016〜1800使用Executor Service并行化IO任务(Completable Future)并使用HTTP线程进行内存中任务〜1000016〜1800使用ExecutorService并行化所有任务,并用于@Suspended AsyncResponse response以非阻塞方式发送响应〜1000016〜3500使用Rx-Java执行所有任务并使用@Suspended AsyncResponse response以非阻塞方式发送响应〜10000NA〜2300使用Disruptor框架并行化所有任务(Http线程将被阻止)〜1000011〜3000使用Disruptor框架并行化所有任务,并用于@Suspended AsyncResponse response以非阻塞方式发送响应〜1000012〜3500使用Akka框架并行化所有任务(Http线程将被阻止)〜10000〜3000

使用Executor Service并行化IO任务

什么时候用?

如果应用程序部署在多个节点中,并且如果每个节点中的req / sec小于可用核心数,则  ExecutorService 可以使用它来并行化任务并更快地执行代码。

何时不使用?

如果应用程序部署在多个节点中,并且每个节点中的req / sec远远高于可用核心数,那么使用  ExecutorService 进一步并行化只会使事情变得更糟。

当外部服务的延迟增加到400毫秒时,性能结果(8核心机器中的请求率@ 50 req / sec)。

标签请求数I / O任务的线程池大小以ms为单位的平均延迟(50 req / sec)所有操作都按顺序排列〜3000NA〜2600使用Executor Service并行化IO任务,并使用HTTP线程进行内存中任务〜300024〜3000

示例在按顺序执行所有任务时:

// I/O tasks : invoke external services

String posts = JsonService.getPosts();

String comments = JsonService.getComments();

String albums = JsonService.getAlbums();

String photos = JsonService.getPhotos();

// merge the response from external service

// (in-memory tasks will be performed as part this operation)

int userId = new Random().nextInt(10) + 1;

String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);

String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);

// build the final response to send it back to client

String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;

return response;

代码示例当与ExecutorService并行执行I / O任务时

// add I/O Tasks

List<Callable<String>> ioCallableTasks = new ArrayList<>();

ioCallableTasks.add(JsonService::getPosts);

ioCallableTasks.add(JsonService::getComments);

ioCallableTasks.add(JsonService::getAlbums);

ioCallableTasks.add(JsonService::getPhotos);

// Invoke all parallel tasks

ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);

List<Future<String>> futuresOfIOTasks = ioExecutorService.invokeAll(ioCallableTasks);

// get results of I/O operation (blocking call)

String posts = futuresOfIOTasks.get(0).get();

String comments = futuresOfIOTasks.get(1).get();

String albums = futuresOfIOTasks.get(2).get();

String photos = futuresOfIOTasks.get(3).get();

// merge the response (in-memory tasks will be part of this operation)

String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);

String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);

//build the final response to send it back to client

return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;

使用Executor服务并行化IO任务(CompletableFuture)

这类似于上面的情况:处理传入请求的HTTP线程将被阻止,并  CompletableFuture 用于处理并行任务

什么时候用?

如果没有  AsyncResponse,性能与  ExecutorService. 如果多个API调用必须是异步的,并且必须链接,这种方法更好(这类似于Node中的Promises)。

ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);

// I/O tasks

CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);

CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,

    ioExecutorService);

CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,

    ioExecutorService);

CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,

    ioExecutorService);

CompletableFuture.allOf(postsFuture, commentsFuture, albumsFuture, photosFuture).get();

// get response from I/O tasks (blocking call)

String posts = postsFuture.get();

String comments = commentsFuture.get();

String albums = albumsFuture.get();

String photos = photosFuture.get();

// merge response (in-memory tasks will be part of this operation)

String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);

String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);

// Build final response to send it back to client

return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;

并行化所有任务 ExecutorService

ExecutorService 使用和并行化所有任务,  并@Suspended AsyncResponse response以非阻塞方式发送响应。

[io vs nio]

图片来自  http://tutorials.jenkov.com/java-nio/nio-vs-io.html

传入请求将通过事件池处理,请求将传递到Executor池进行进一步处理,当所有任务完成后,来自事件池的另一个HTTP线程将响应发送回客户端。(异步和非阻塞)。

性能下降的原因:

在同步通信中,尽管I / O任务中涉及的线程被阻止,但只要有额外的线程来处理并发请求的加载,该进程仍将处于运行状态。

因此,以非阻塞方式保持线程所带来的好处非常少,并且以这种模式处理请求所涉及的成本似乎很高。

通常,对于我们在此讨论的用例使用异步非阻塞方法会降低应用程序性能。

什么时候用?

如果用例类似于服务器端聊天应用程序,其中线程无需保持连接,直到客户端响应,则异步,非阻塞方法可优先于同步通信; 在这些用例中,不仅仅是等待,系统资源可以通过异步,非阻塞方法更好地使用。

// submit parallel tasks for Async execution

ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);

CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);

CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,

ioExecutorService);

CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,

ioExecutorService);

CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,

ioExecutorService);

// When /posts API returns a response, it will be combined with the response from /comments API

// and as part of this operation, some in-memory tasks will be performed

CompletableFuture<String> postsAndCommentsFuture = postsFuture.thenCombineAsync(commentsFuture,

(posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments),

ioExecutorService);

// When /albums API returns a response, it will be combined with the response from /photos API

// and as part of this operation, some in-memory tasks will be performed

CompletableFuture<String> albumsAndPhotosFuture = albumsFuture.thenCombineAsync(photosFuture,

(albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos),

ioExecutorService);

// Build the final response and resume the http-connection to send the response back to client.

postsAndCommentsFuture.thenAcceptBothAsync(albumsAndPhotosFuture, (s1, s2) -> {

LOG.info("Building Async Response in Thread " + Thread.currentThread().getName());

String response = s1 + s2;

asyncHttpResponse.resume(response);

}, ioExecutorService);

RxJava / RxNetty

RxJava / RxNetty组合的主要区别在于,它可以通过使I / O任务完全无阻塞来处理带有事件池的传入和传出请求。

此外,RxJava提供了更好的DSL以流畅的方式编写代码,这个例子可能看不到。

性能优于处理并行任务 CompletableFuture

什么时候用?

如果异步,非阻塞方法适合用例,则可以优选RxJava或任何反应库。它具有额外的功能,如背压,可以平衡生产者和消费者之间的负荷。

// non blocking API call from Application - getPosts() 

HttpClientRequest<ByteBuf, ByteBuf> request = HttpClient.newClient(MOCKY_IO_SERVICE, 80)

  .createGet(POSTS_API).addHeader("content-type", "application/json; charset=utf-8");

rx.Observable<String> rx1ObservableResponse = request.flatMap(HttpClientResponse::getContent)

  .map(buf -> getBytesFromResponse(buf))

  .reduce(new byte[0], (acc, bytes) -> reduceAndAccumulateBytes(acc, bytes))

  .map(bytes -> getStringResponse(bytes, "getPosts", startTime));

int userId = new Random().nextInt(10) + 1;

// Submit parallel I/O tasks for each incoming request.

Observable<String> postsObservable = Observable.just(userId).flatMap(o -> NonBlockingJsonService.getPosts());

Observable<String> commentsObservable = Observable.just(userId)

  .flatMap(o -> NonBlockingJsonService.getComments());

Observable<String> albumsObservable = Observable.just(userId).flatMap(o -> NonBlockingJsonService.getAlbums());

Observable<String> photosObservable = Observable.just(userId).flatMap(o -> NonBlockingJsonService.getPhotos());

// When /posts API returns a response, it will be combined with the response from /comments API

// and as part of this operation, some in-memory tasks will be performed

Observable<String> postsAndCommentsObservable = Observable.zip(postsObservable, commentsObservable,

                                                              (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments));

// When /albums API returns a response, it will be combined with the response from /photos API

// and as part of this operation, some in-memory tasks will be performed

Observable<String> albumsAndPhotosObservable = Observable.zip(albumsObservable, photosObservable,

                                                              (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos));

// build final response

Observable.zip(postsAndCommentsObservable, albumsAndPhotosObservable, (r1, r2) -> r1 + r2)

  .subscribe((response) -> asyncResponse.resume(response), e -> {

    LOG.error("Error", e);

    asyncResponse.resume("Error");

  });

干扰器

[队列与RingBuffer]

图1:http:  //tutorials.jenkov.com/java-concurrency/blocking-queues.html图2:https:  //www.baeldung.com/lmax-disruptor-concurrency

在此示例中,HTTP线程将被阻塞,直到破坏程序完成任务并且  CountDownLatch 已用于将HTTP线程与来自的线程同步  ExecutorService。

该框架的主要特点是在没有任何锁的情况下处理线程间通信; 在  ExecutorService,生产者和消费者之间的数据将通过队列传递,并且  Lock 在生产者和消费者之间的数据传输期间涉及。Disruptor框架在没有任何Locks 借助称为Ring Buffer的数据结构的情况下处理此Producer-Consumer通信  ,该数据结构是循环阵列队列的扩展版本。

该库不适用于我们在此讨论的用例。它只是出于好奇而加入。

什么时候用?

当与事件驱动的体系结构模式一起使用时,以及当有一个生产者和多个消费者时,Disruptor框架的表现更好,主要关注内存中的任务。

static {

int userId = new Random().nextInt(10) + 1;

// Sample Event-Handler; count down latch is used to synchronize the thread with http-thread

  EventHandler<Event> postsApiHandler = (event, sequence, endOfBatch) -> {

  event.posts = JsonService.getPosts();

  event.countDownLatch.countDown();

  };

  // Disruptor set-up to handle events

  DISRUPTOR.handleEventsWith(postsApiHandler, commentsApiHandler, albumsApiHandler)

  .handleEventsWithWorkerPool(photosApiHandler1, photosApiHandler2)

  .thenHandleEventsWithWorkerPool(postsAndCommentsResponseHandler1, postsAndCommentsResponseHandler2)

  .handleEventsWithWorkerPool(albumsAndPhotosResponseHandler1, albumsAndPhotosResponseHandler2);

  DISRUPTOR.start();

}

// for each request, publish an event in RingBuffer:

Event event = null;

RingBuffer<Event> ringBuffer = DISRUPTOR.getRingBuffer();

long sequence = ringBuffer.next();

CountDownLatch countDownLatch = new CountDownLatch(6);

try {

event = ringBuffer.get(sequence);

event.countDownLatch = countDownLatch;

event.startTime = System.currentTimeMillis();

} finally {

ringBuffer.publish(sequence);

}

try {

event.countDownLatch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

阿卡

图片来源:https:  //blog.codecentric.de/en/2015/08/introduction-to-akka-actors/

Akka库的主要优点是它具有构建分布式系统的本机支持。

它运行在一个名为Actor System的系统上,它抽象出Threads的概念,Actor系统中的Actors通过异步消息进行通信,这类似于Producer和Consumer之间的通信。

这种额外的抽象级别有助于Actor系统提供Fault ToleranceLocation Transparency等功能。

使用正确的Actor-to-Thread策略,可以优化此框架,使其性能优于上表中显示的结果。虽然它无法与单个节点上的传统方法的性能相匹配,但仍然可以优先考虑其构建分布式和弹性系统的能力。

示例代码

// from controller :

Actors.masterActor.tell(new Master.Request("Get Response", event, Actors.workerActor), ActorRef.noSender());

// handler :

public Receive createReceive() {

    return receiveBuilder().match(Request.class, request -> {

    Event event = request.event; // Ideally, immutable data structures should be used here.

    request.worker.tell(new JsonServiceWorker.Request("posts", event), getSelf());

    request.worker.tell(new JsonServiceWorker.Request("comments", event), getSelf());

    request.worker.tell(new JsonServiceWorker.Request("albums", event), getSelf());

    request.worker.tell(new JsonServiceWorker.Request("photos", event), getSelf());

    }).match(Event.class, e -> {

    if (e.posts != null && e.comments != null & e.albums != null & e.photos != null) {

    int userId = new Random().nextInt(10) + 1;

    String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, e.posts,

    e.comments);

    String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, e.albums,

    e.photos);

    String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;

    e.response = response;

    e.countDownLatch.countDown();

    }

    }).build();

}

另外本人从事在线教育多年,将自己的资料整合建了一个公众号(yunxijava)对于有兴趣一起交流学习java,这里面有大神会给予解答,也会有许多的资源可以供大家学习分享,欢迎大家前来一起学习进步!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容