利用协程提高客户端并发性能的尝试

假设服务器端提供无限的服务能力,客户端如何达到最高的QPS?如果使用多线程来提高并发,线程数量会成为瓶颈。是否可以通过协程来提高客户端的并发能力?下面做了一组测试,分别使用同步、异步、协程+同步等方式请求百度的首页,运行10s,对比一下QPS。

测试环境

2017年MBP,2核4线程,16GB内存。

Maven依赖

<properties>
    <kotlin.version>1.3.11</kotlin.version>
</properties>

<dependency>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-stdlib-jdk8</artifactId>
    <version>${kotlin.version}</version>
</dependency>

<dependency>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-test</artifactId>
    <version>${kotlin.version}</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-core</artifactId>
    <version>1.1.0</version>
</dependency>

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpasyncclient</artifactId>
    <version>4.1</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>khttp</groupId>
    <artifactId>khttp</artifactId>
    <version>0.1.0</version>
</dependency>

<dependency>
    <groupId>io.github.rybalkinsd</groupId>
    <artifactId>kohttp</artifactId>
    <version>0.5.0</version>
</dependency>

<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>okhttp</artifactId>
    <version>3.12.1</version>
</dependency>

Kotlin多线程同步

使用Kotlin的khttp发起同步请求。

import org.junit.Test
import java.util.concurrent.atomic.AtomicInteger

public class TestHttpSyncClient {

    val api = "https://www.baidu.com"

    val completedCount = AtomicInteger(0)
    var failedCount = AtomicInteger(0)

    @Test
    fun test() {

        for (i in 0..80) {

            Thread() {

                while (true) {

                    try {

                        var response = khttp.get(api)

                        if (response.statusCode == 200) {
                            completedCount.incrementAndGet()
                        } else {
                            failedCount.incrementAndGet()
                        }
                    }catch (e : Exception) {

                        failedCount.incrementAndGet()
                    }
                }
            }.start()
        }

        Thread.sleep(10 * 1000)
        println("completedCount: ${completedCount}, failedCount: ${failedCount}");
        System.exit(0);
    }
}

线程开到80个,QPS可达2600左右。

completedCount: 26089, failedCount: 111

HttpAsyncClients异步

使用HttpAsyncClients发起异步请求。

import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.junit.Test;

import java.util.concurrent.atomic.AtomicInteger;

public class TestHttpAsyncClient implements Runnable {

   public final static AtomicInteger completedCount = new AtomicInteger(0);
   public final static AtomicInteger failedCount = new AtomicInteger(0);
   public final static AtomicInteger canceledCount = new AtomicInteger(0);

   @Test
   public void test() {

       //启动线程
       TestHttpAsyncClient testHttpAsyncClient = new TestHttpAsyncClient();
       new Thread(testHttpAsyncClient).start();

       //测试10s
       try {

           Thread.sleep(10 * 1000);
           System.out.println(String.format("completedCount: %d, failedCount: %d, canceledCount: %d",
                   completedCount.get(), failedCount.get(), canceledCount.get()));

           System.exit(0);
       } catch (Exception e) {

           System.err.println(String.format("System.exit exception: %s", e));
       }
   }

   public void run() {

       CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom()
               .setMaxConnTotal(128)
               .setMaxConnPerRoute(128)
               .build();
       httpclient.start();

       final HttpGet request = new HttpGet("https://www.baidu.com");

       while (true) {

           httpclient.execute(request, new FutureCallback<HttpResponse>() {

               @Override
               public void completed(HttpResponse httpResponse) {

                   if (httpResponse.getStatusLine().getStatusCode() == 200) {
                       completedCount.incrementAndGet();
                   }else {
                       failedCount.incrementAndGet();
                 }
               }

               @Override
               public void failed(Exception e) {
                   failedCount.incrementAndGet();
               }

               @Override
               public void cancelled() {
                   canceledCount.incrementAndGet();
               }
           });
       }
   }
}

并发设置为128,QPS可以达到2000左右。

completedCount: 19319, failedCount: 125, canceledCount: 0

通过调试可以发现,只需要4个dispatcher线程,就可以满足需要。

image.png

Kotlin协程+khttp

使用Kotlin Coroutine+khttp同步请求测试。

import khttp.responses.Response
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.launch
import java.util.concurrent.atomic.AtomicInteger
import org.junit.Test

public class TestKotlinCoroutine {

    val api = "https://www.baidu.com"
    val completedCount = AtomicInteger(0)
    val failedCount = AtomicInteger(0)

    @Test
    fun test() {

        Thread(){

            while (true) {

                val resp =
                        GlobalScope.launch {

                            val result: Deferred<Response> = async {

                                get()
                            }

                            result.await()
                        }
            }
        }.start()

        Thread.sleep(10 * 1000)
        println("completedCount: ${completedCount}, failedCount: ${failedCount}");
        System.exit(0);
    }

    suspend fun get(): khttp.responses.Response {

        var response = khttp.get(api)

        if (response.statusCode == 200) {
            completedCount.addAndGet(1)
        }else {
            failedCount.addAndGet(1)
        }

        return response;
    }
}

协程版本的性能甚至不如同步版本,QPS只有50。换一种方式,起多个协程,每个线程里面循环请求,QPS也一样只有50左右。

completedCount: 498, failedCount: 0

这个版本性能很差,通过调试可以发现,最多也就启动了4个worker线程,并且khttp又不支持协程阻塞。

image.png

下面是一个优化的版本,将工作放到Dispatchers.IO里面去做,尽量多起一些worker线程。

public class TestKotlinCoroutineClient {

    val api = "https://www.baidu.com"

    val completedCount = AtomicInteger(0)
    var failedCount = AtomicInteger(0)
    val time = 10000

    @Test
    fun test() = runBlocking {

        val start = System.currentTimeMillis()
        val channel = Channel<Int>()

        repeat(time) {
            launch { channel.send(get()) }
        }

        repeat(time) {
            val code = channel.receive()
            if (code == 200) {
                completedCount.incrementAndGet()
            } else {
                failedCount.incrementAndGet()
            }
        }

        val end = System.currentTimeMillis()
        println("completedCount: ${completedCount}, failedCount: ${failedCount}, cost${(end - start) / 1000}");
        System.exit(0);
    }

    suspend fun get() = withContext(Dispatchers.IO) { khttp.get(api) }.statusCode
}

性能可以提升到1200左右。调试可以发现线程数量最多可达68个。

image.png

Kotlin协程+kohttp

kohttp看起来支持协程。使用kohttp请求一万次,花费时间100s左右,QPS为100。

import io.github.rybalkinsd.kohttp.ext.asyncHttpGet
import junit.framework.Assert.assertEquals
import kotlinx.coroutines.runBlocking
import org.junit.Test
import kotlin.system.measureTimeMillis

public class TestKotlinCoroutine {

    val api = "https://www.baidu.com"

    @Test
    fun `many async invokes of httpGet`() {
        measureTimeMillis {
            GlobalScope.launch {
                val tasks = List(10000) {
                    api.asyncHttpGet()
                }
                tasks.map { r ->
                    r.await().also { it.close() }
                }.forEach {
                    assertEquals(200, it.code())
                }
            }
        }.also { println("$it ms") }
    }
}

分析一下kohttp的实现,可以发现它只是封装了okhttp3,使用了异步模式。okhttp3本身不支持协程,所以性能也不会太好。

fun String.asyncHttpGet(client: Call.Factory = defaultHttpClient): Deferred<Response> =
    GlobalScope.async(context = Unconfined) {
        client.suspendCall(Request.Builder().url(this@asyncHttpGet).build())
    }

internal suspend fun Call.Factory.suspendCall(request: Request): Response =
    suspendCoroutine { cont ->
        newCall(request).enqueue(object : Callback {
            override fun onResponse(call: Call, response: Response) {
                cont.resume(response)
            }

            override fun onFailure(call: Call, e: IOException) {
                cont.resumeWithException(e)
            }
        })
    }

Go协程

顺便试一下Go协程的性能。Go协程版本在使用keepalive的情况下,256个协程的QPS可以达到1700左右。注意需要读一下resp.Body,这样keepalive才会生效。参考文章:A brief intro of TCP keep-alive in Go’s HTTP implementation

package main

import (
    "fmt"
    "io"
    "io/ioutil"
    "net/http"
    "os"
    "runtime"
    "sync"
    "sync/atomic"
    "time"
)

var completedCount uint64
var failedCount uint64
var lock = &sync.Mutex{}

func main() {

    runtime.GOMAXPROCS(4)
    for i := 0; i < 256; i++ {
        clt := newQPSClient()
        go clt.test()
    }

    time.Sleep(10 * time.Second)
    fmt.Printf("completedCount: %d, failedCount: %d\n", completedCount, failedCount)
    os.Exit(0)
}

type QPSClient struct {
    clt *http.Client
}

func newQPSClient() *QPSClient {
    return &QPSClient{
        clt: &http.Client{},
    }
}

func (qc *QPSClient) test() {

    for {
        resp, err := qc.clt.Get("https://www.baidu.com")
        if err == nil && (resp.StatusCode == 200) {

            _, err = io.Copy(ioutil.Discard, resp.Body)
            resp.Body.Close()
            atomic.AddUint64(&completedCount, 1)
        } else {

            _, err = io.Copy(ioutil.Discard, resp.Body)
            atomic.AddUint64(&failedCount, 1)
        }
    }
}

结论

请求方式 QPS
Kotlin多线程同步 80个线程,QPS 2600
HttpAsyncClients异步 最大128个连接,QPS 2000
Kotlin协程+khttp 循环起协程+khttp同步,QPS 50
Kotlin协程+kohttp 循环起协程+okhttp3异步,QPS 100
Go协程 256个协程,QPS 1700

上面这些测试,缺乏更精细的设置,比如HttpAsyncClients 128个连接使用了多少个线程?Kotlin和Go的协程版本使用了多少线程?还有没有提升空间?

Coroutine运行有很多受限条件,不能堵塞在操作系统会堵塞线程的地方,需要自己实现对这些堵塞API的处理,在用户态做上下文的保存和恢复。Java生态圈中缺乏对协程支持良好的基础库,导致不能发挥协程真正的威力,使用异步IO是更好的解决办法。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,548评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,069评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,985评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,305评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,324评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,030评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,639评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,552评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,081评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,194评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,327评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,004评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,688评论 3 332
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,188评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,307评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,667评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,337评论 2 358

推荐阅读更多精彩内容