It may be wrong to use `async/await` or `coroutine` to execute low-level I/O.

I have spent several years on Node.js and libuv. I felt desperately that callback functions, Promise and async/await in Node.js work pretty well all the time. The callback functions is responsible for the low-level I/O, the data are read and written chunk by chunk. The Promise is responsible to wrap multiple callback functions to make a atomic I/O operation. And the async/await is responsible only to form a logical workflow.

And I have two to three years experience in asyncdispatch, asyncnet, asynchttpserver in Nim and I have written several asynchronous modules. Now, I'm writting an asynchronous Mysql client/connector in pure Nim. However, I now feel that it is not appropriate to use async/await entirely.

Now, let us suppose that we use a mysql client to execute some queries. This is a pseudo code:

var conn = newAsyncSocket()

proc query(sql: string): Stream {.async.} =
  await conn.send(mysql_format_function(sql))
  let data = await conn.recv_mysql_function()
  mysql_parse_function(data) 
  return newStream(conn)

proc main() {.async.} =
  let stream1 = await query("select 100; select 200;")
  while true:
    let data = await stream1.read()
    echo data  # 100, 200
    if data == "":
      break

  let stream2 = await query("select 300;  select 400;")
  while true:
    let data = await stream2.read()
    echo data  # 300, 400
    if data == "":
      break

waitFor main()

The code work fine if stream2 is always executing after stream1. However, if not:

proc main() {.async.} =
  let stream1 = await query("select 100; select 200;")
  let stream2 = await query("select 300;  select 400;")

  while true:
    let data = await stream1.read()
    echo data  # 100, 200
    if data == "":
      break

  while true:
    let data = await stream2.read()
    echo data  # 300, 400
    if data == "":
      break

or

proc do1() {.async.} =
  let stream1 = await query("select 100; select 200;")
  while true:
    let data = await stream1.read()
    echo data  # 100, 200
    if data == "":
      break

proc do2() {.async.} =
  while true:
    let data = await stream2.read()
    echo data  # 300, 400
    if data == "":
      break

proc main() {.async.} =
  asyncCheck do1()
  asyncCheck do2()

What happened then? The main function will blocking at (await) stream2 because stream1 is never finished. What does finished mean? The stream1 finished is mean that stream1 recv all the data that belong to the query of select 100; select 200;.

     stream1 data           stream2 data
|......................|....................|
|                      |                    |
v                      V                    V
stream1 begin       stream1 finished        stream2 finished
                    stream2 begin

A solution is to use lock or queue cache. This can solve the second problems, but it also introduce data race:

proc do1() {.async.} =
  await conn.lock()
  let stream1 = await query("select 100; select 200;")
  # ...
  conn.release()

proc do2() {.async.} =
  await conn.lock()
  let stream1 = await query("select 100; select 200;")
  # ...
  conn.release()

And it cannot prevent programmers to write the first kind of code:

proc main() {.async.} =
  let stream1 = await query("select 100; select 200;")
  let stream2 = await query("select 300;  select 400;")

Each of your asynchronous functions must be atomic. Otherwise, there are traps in them. The atomic function is mean that you can not construct streaming interfaces:

proc main() {.async.} =
  let all_data1 = await query("select 100; select 200;")
  let all_data2 = await query("select 300;  select 400;")

It is very bad to transfer large data, and it is a huge issue whenever you use async/await to write any program .


Ideally, using some callback functions to execute low-level I/O, using some Future to wrap these callback functions into atomic operations, and using async/await to construct the final logical workflow, then you can get the perfect solution:

# `Future` wrapper
proc query(sql: string, fileStream: FileStream): retFuture[void]  =  
  var retFuture = newFuture[void]()

  let stream = conn.send(mysql_format_function(sql))  # I/O interface

  stream.onBegin() do (data: string):                 # I/O interface
    fileStream.write(data)
 
  stream.onFinished() do ():                          # I/O interface
    complete(retFuture)

# logical workflow
proc main() {.async.} =
  await query("select 100; select 200;")  # write to file 
  await query("select 300; select 400;")  # write to file
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • **2014真题Directions:Read the following text. Choose the be...
    又是夜半惊坐起阅读 9,868评论 0 23
  • 何为爱情,爱情就是,当你正轰轰烈烈燃烧着时,突然泼来了一桶水,把你熄灭。你渴望着再次燃烧,却总是阴天。你等待着...
    翠翠_d200阅读 363评论 0 1
  • 春雨绵绵,果真不凡,接连不断地下了两周了,丝毫没有放晴的意思。 罢出,喝了杯茶,一口气把《异类》这本书读完了。现出...
    茶无界阅读 467评论 0 4