多进程
Elixir强大的并发来自其actor并发模型,简而言之就是可以使用大量的进程来实现并发。elixir中的进程依托与erlang虚拟机的存在,这个进程与操作系统的进程不一样,虽然他们可以像原生进程一样在处理器中运行,但是他们比原生进程轻,在普通机器创建十万个进程是轻而易举的事情,甚至比普通语言创建线程还要轻便。下面就看看elixir的多进程是如何work。
使用erlang的timer模块,模拟进程中耗时的操作。定义一个匿名函数,运行函数之后,可以看到iex在两秒之后才打印消息,连续调用5次,则一共耗时10秒:
iex(1)> run_query = fn query_def ->
...(1)> :timer.sleep 2000
...(1)> "#{query_def} result"
...(1)> end
#Function<6.54118792/1 in :erl_eval.expr/5>
iex(2)> run_query.("query 1")
"query 1 result"
iex(3)> 1..5 |> Enum.map(&(run_query.("query #{&1}")))
["query 1 result", "query 2 result", "query 3 result", "query 4 result",
"query 5 result"]
创建进程,Elixir创建进程只需要使用spawn宏即可,sqwan/1 接受一个匿名函数,创建另外一个新进程。
spawn(
fn ->
expression_1
...
expression_2
end
)
修改上面的例子,使用spawn创建一个新进程。可以看到,执行spawn之后,马上返回了函数调用的结果,为新进程新pid。两秒后,新进程执行并打印内容返回到iex中。
iex(4)> spawn(fn -> IO.puts run_query.("1") end )
#PID<0.65.0>
1 result
因此可以定义一个异步的查询函数并执行,可以发现每次执行函数都马上返回,而新创建的进程将会在后台运行,并打印最终结果:
iex(5)> async_query = fn query_def ->
...(5)> spawn(fn -> IO.puts run_query.(query_def) end) end
#Function<6.54118792/1 in :erl_eval.expr/5>
iex(6)> async_query.("query 1")
#PID<0.71.0>
query 1 result
iex(7)> 1..5 |> Enum.map(&(async_query.("query #{&1}")))
[#PID<0.73.0>, #PID<0.74.0>, #PID<0.75.0>, #PID<0.76.0>, #PID<0.77.0>]
query 1 result
query 2 result
query 3 result
query 4 result
query 5 result
Elixir中,所有代码都运行在一个在进程中,iex也是运行在进程中,并且是shell中的主进程。上面的例子中,进程是并发运行的,因此并不会按照顺序输出结果。每一个进程都是独立的,不同的进程是不能读取对方的数据的,而进程之间想要通信需要通过message
。
进程中的message
通常的语言中,使用多线程进行并发,所有线程共享内存数据。而elixir提供的是aotor的进程模型。进程通过message同步数据。进程A想要让进程B做点事情,需要A给B的mailbox发送异步消息,B进程读取mailbox的消息,解析后执行。因为进程见是无法共享内存的,因此消息发送的时候存在着深拷贝(deep-copied)。
发送信息使用 send函数,接受消息使用 receive do
结构。 send提供两个参数,第一个是进程的标识pid,第二个是所需要发送的数据。receive的结构如下
receive do
pattern_1 -> do_something
pattern_2 -> do_something_else
end
receive 结构和 case 结构十分相似,也支持mailbox的模式匹配。在iex中,self表示当前的进程,下面使用self来对消息做实验:
iex(8)> send(self, "a message")
"a message"
iex(9)> receive do
...(9)> message -> IO.puts message
...(9)> end
a message
:ok
iex(10)> send(self, {:message, 1})
{:message, 1}
iex(11)> receive do
...(11)> {:message, id} -> IO.puts "received message #{id}"
...(11)> end
received message 1
:ok
iex(13)> receive do
...(13)> {_, _, _} ->
...(13)> IO.puts "received"
...(13)> end
send调用之后会返回发送的内容。由于是自身给自身发消息,所以可以在当前的进程中调用receive结构拉取自身的mailbox的message。如果模式匹配失败,当前的进程会被block。可以设置一个after分支,当匹配失败之后,执行after的内容。
iex(2)> receive do
...(2)> {_,_,_} -> 'nonthing'
...(2)> after 3000 -> "message not received"
...(2)> end
"message not received"
iex(3)>
receive结构主要从当前的mailbox中pull数据。如果当前消息模式匹配失败,这个消息将会被放回进程的mailbox之中,仅需读取下一条消息。总结receive的工作流如下:
- 从mailbox中读取第一条消息。
- 尝试使用 receive中模式和消息进行模式匹配,从上到下依次进行匹配。
- 匹配成功则执行对应分支的代码。
- 如果模式匹配失败,将消息放回原处,接着出现下一条消息。
- 如果mailbox的队列没有消息了,则等待下一个消息到达,消息一到达,则重复开始第一步。
- 如果存在after语句,在等到消息未到到或匹配失败之后,执行after的代码分支逻辑。
进程通信
通常send消息都是异步执行的,进程发送消息之后就返回,然后当前进程并不知道子进程的执行状况。通常我们需要子进程的执行过程,然后子进程将会send消息来反馈主进程。创建了子进程将会返回进程标识pid,基于pid和message可以进行进程间的通信。重写async_query 函数并调用:
iex(1)> run_query = fn(query_def) ->
...(1)> :timer.sleep(2000)
...(1)> "#{query_def} result"
...(1)> end
#Function<6.54118792/1 in :erl_eval.expr/5>
iex(3)> async_query = fn(query_def) ->
...(3)> caller = self
...(3)> spawn(fn query_def ->
...(3)> send(caller, {:query_result, run_query.(query_def)})
...(3)> end)
...(3)> end
iex(4)> Enum.each(1..5, &async_query.("query #{&1}"))
:ok
新创建的子进程给主进程的mailbox发送了消息,主进程再把这些消息读取出来
iex(7)> get_result = fn ->
...(7)> receive do
...(7)> {:query_result, result} -> result
...(7)> end
...(7)> end
iex(10)> get_result.()
"query 1 result"
iex(11)> get_result.()
"query 2 result"
iex(12)> get_result.()
"query 3 result"
客户端服务端状态
通过message和receive可以实现进程间的通信,通常把主进程当成客户端进程,新创建的进程当成服务端进程。那么cs之间的进程通信会涉及到一下状态(state)的操作。所谓的server process 是指一些长时间监听消息的进程,就像服务器进程一样永远运行,处于一个无限循环当中,监听客户端的消息,处理消息。
receive结构会将它mailbox,模式匹配不成功的时候会block进程。可是一旦mailbox中的messge消费完了,receive的监听也就结束了,进程会结束。因此需要在message消费完毕之后仍然运行进程。使用while结构很容易实现这样的程序逻辑,elixir没有循环,可是有递归。
下面以数据库服务为例来做说明。其基本结构如下:
defmodule DatabaseServer do
def start do
spawn(&loop/0)
end
defp loop do
receive do
# pass
end
loop
end
end
DatabaseServer模块实现了一个服务器循环进程loop,给客户端(主进程,调用者)提供了一个启动入口start函数。这个函数将会创建一个服务端进程,用于监听客户端的发送的消息,韩寒处理返回。有人可能有以为,start和loop都是模块中的函数,分别运行在不同进程中。其实模块和进程本身没有特别的关系,模块就是函数的集合,这些函数可以运行在进程中,仅此而已。后期关于类似的实现,可以用到更高级的gen_server。
接下来实现loop中的逻辑,以及数据库的服务端和客户端的查询方法。
defmodule DatabaseServer do
def start do
spawn(&loop/0)
end
def run_async(server_pid, query_def) do
send(server_pid, {:run_query, self, query_def})
end
defp loop do
receive do
{:run_query, caller, query_def} -> send(caller, {:query_result, run_query(query_def)})
end
loop
end
defp run_query(query_def) do
:timer.sleep(2000)
"#{query_def} result"
end
end
run_query 为服务端的查询方法,run_async为客户端的查询方法,run_async将查询信息和自身的pid发给服务端,服务端匹配之后查询处理,然后再给客户端pid发送查询结果。客户端同样也使用receive结构pull查询结果:
defmodule DatabaseServer do
def start do
spawn(&loop/0)
end
def run_async(server_pid, query_def) do
send(server_pid, {:run_query, self, query_def})
end
def get_result do
receive do
{:query_result, result} -> result
after 5000 ->
{:error, :timeout}
end
end
defp loop do
receive do
{:run_query, caller, query_def} -> send(caller, {:query_result, run_query(query_def)})
end
loop
end
defp run_query(query_def) do
:timer.sleep(2000)
"#{query_def} result"
end
end
运行测试
iex(1)> server_pid = DatabaseServer.start
#PID<0.63.0>
iex(2)> DatabaseServer.run_async(server_pid, "query 1")
{:run_query, #PID<0.61.0>, "query 1"}
iex(3)> DatabaseServer.get_result
"query 1 result"
iex(4)> DatabaseServer.run_async(server_pid, "query 2")
{:run_query, #PID<0.61.0>, "query 2"}
iex(5)> DatabaseServer.get_result
"query 2 result"
iex(6)> DatabaseServer.get_result
{:error, :timeout}
把 timer.sleep 改成 10s后, 进程客户端马上就返回并且监听服务端的返回,可以服务端异步长时间处理,不能马上返回,客户端超时断开了。第二次调用get_result的时候,此时服务端已经处理完毕,并发送结果给客户端的mailbox。
iex(2)> DatabaseServer.run_async(server_pid, "query 1")
{:run_query, #PID<0.61.0>, "query 1"}
iex(3)> DatabaseServer.get_result
{:error, :timeout}
iex(4)> DatabaseServer.get_result
"query 1 result"
服务端进程都是顺序的
尽管实现了服务端进程来处理查询请求,可是服务端进程监听的是自己进程的mailbos,消费消息却是顺序的。如果客户端调用十个查询请求,服务端同样需要执行10秒。为了避免这样的情况,一个简单的处理就是每一个请求实现一个服务端进程,也就是服务端实现一个进程池。面对大量的客户端就能处理了。等等,你一定以为实现进程池是一个夸张的做法,毕竟直觉上进程的创建和销毁十分耗资源。感谢Erlang的并发模式,我们可以在Elixir中轻而易举的创建大量的进程,这个进程和操作系统进程概念不一样,它甚至比操作系统的线程还要轻量级。
下面演示进程池的用法:
iex(1)> pool = 1..100 |> Enum.map(fn _ -> DatabaseServer.start end)
[#PID<0.64.0>, #PID<0.65.0>, #PID<0.66.0>, #PID<0.67.0>, #PID<0.68.0>,
#PID<0.69.0>, #PID<0.70.0>, #PID<0.71.0>, #PID<0.72.0>, #PID<0.73.0>,
#PID<0.74.0>, #PID<0.75.0>, #PID<0.76.0>, #PID<0.77.0>, #PID<0.78.0>,
#PID<0.79.0>, #PID<0.80.0>, #PID<0.81.0>, #PID<0.82.0>, #PID<0.83.0>,
#PID<0.84.0>, #PID<0.85.0>, #PID<0.86.0>, #PID<0.87.0>, #PID<0.88.0>,
#PID<0.89.0>, #PID<0.90.0>, #PID<0.91.0>, #PID<0.92.0>, #PID<0.93.0>,
#PID<0.94.0>, #PID<0.95.0>, #PID<0.96.0>, #PID<0.97.0>, #PID<0.98.0>,
#PID<0.99.0>, #PID<0.100.0>, #PID<0.101.0>, #PID<0.102.0>, #PID<0.103.0>,
#PID<0.104.0>, #PID<0.105.0>, #PID<0.106.0>, #PID<0.107.0>, #PID<0.108.0>,
#PID<0.109.0>, #PID<0.110.0>, #PID<0.111.0>, #PID<0.112.0>, #PID<0.113.0>, ...]
iex(2)> 1..5 |>
...(2)> Enum.each(fn query_def ->
...(2)> server_pid = Enum.at(pool, :random.uniform(100) - 1)
...(2)> DatabaseServer.run_async(server_pid, query_def)
...(2)> end)
:ok
iex(3)> 1..5 |>
...(3)> Enum.map(fn(_) -> DatabaseServer.get_result end)
["3 result", "5 result", "4 result", "1 result", "2 result"]
运行的结果中,并没有超过十秒,而是很快就返回了结果。
状态
设想一下,如果需要跟数据库服务交互的时候,首先当然是需要建立一个连接。连接就必须保持socket能够正确的工作。因此也需要在进程中保持状态,可以修改loop函数实现。
defmodule DatabaseServer do
def start do
spawn(fn ->
connection = :random.uniform(1000)
loop(connection)
end)
end
def run_async(server_pid, query_def) do
send(server_pid, {:run_query, self, query_def})
end
def get_result do
receive do
{:query_result, result} -> result
after 5000 ->
{:error, :timeout}
end
end
defp loop(connection) do
receive do
{:run_query, from_pid, query_def} ->
query_result = run_query(connection, query_def)
send(from_pid, {:query_result, query_result})
end
loop(connection)
end
defp run_query(connection, query_def) do
:timer.sleep(2000)
"Connection #{connetion}: #{query_def} result"
end
end
iex(1)> server_pid = DatabaseServer.start
#PID<0.63.0>
iex(2)> DatabaseServer.run_async(server_pid, "query 1")
{:run_query, #PID<0.61.0>, "query 1"}
iex(3)> DatabaseServer.get_result
"Connection 444: query 1 result"
iex(4)> DatabaseServer.run_async(server_pid, "query 2")
{:run_query, #PID<0.61.0>, "query 2"}
iex(5)> DatabaseServer.get_result
"Connection 444: query 2 result"
start 函数中创建了一些连接,然后loop中把这个状态传递到进程执行代码的地方。从iex的结果可以看出,这个状态一直被保持,两次请求服务,都是同一个连接的状态。实际的服务器环境中,往往状态不是一层不变的。此时我们需要更新状态。一个简单的技巧就是在loop函数中更新状态。
def loop(state) do
new_state = receive do # 捕捉新状态
msg1 -> ...
msg2 -> ...
end
loop(new_state) # 更新状态
end
下面实现一个计算器服务来阐明状态更新技巧。
defmodule Calculator do
def start do
spawn(fn ->loop(0) end)
end
def loop(current_value) do
new_value = receive do
{:value, caller} ->
send(caller, {:response, current_value})
current_value
{:add, value} -> current_value + value
{:sub, value} -> current_value - value
{:mul, value} -> current_value * value
{:div, value} -> current_value / value
invalid_request ->
IO.puts "invalid request #{inspect invalid_request}"
current_value
end
loop(new_value)
end
def value(server_pid) do
send(server_pid, {:value, self})
receive do
{:response, value} -> value
end
end
def add(server_pid, value), do: send(server_pid, {:add, value})
def sub(server_pid, value), do: send(server_pid, {:sub, value})
def mul(server_pid, value), do: send(server_pid, {:mul, value})
def div(server_pid, value), do: send(server_pid, {:div, value})
end
iex(1)> calculator_pid = Calculator.start
#PID<0.63.0>
iex(2)> Calculator.value(calculator_pid)
0
iex(3)> Calculator.add(calculator_pid, 10)
{:add, 10}
iex(4)> Calculator.sub(calculator_pid, 5)
{:sub, 5}
iex(5)> Calculator.mul(calculator_pid, 3)
{:mul, 3}
iex(6)> Calculator.div(calculator_pid, 5)
{:div, 5}
iex(7)> Calculator.value(calculator_pid)
3.0
通常情况下,状态远远比一个数字复杂。不过技术手段都是一样的,只需要在loop函数中操作状态即可。当应用的状态变得复杂的时候,是非有必要对代码进行组织。服务端进程的模块可以剥离出来专注请求的处理。下面针对之前的todo应用,使用多进程进行改下一下:
defmodule TodoServer do
def start do
spawn(fn -> loop(TodoList.new) end)
end
defp loop(todo_list) do
new_todo_list = receive do
message -> process_message(todo_list, message)
end
loop(new_todo_list)
end
def process_message(todo_list, {:add_entry, new_entry}) do
TodoList.add_entry(todo_list, new_entry)
end
def process_message(todo_list, {:entries, caller, date}) do
send(caller, {:todo_entries, TodoList.entries(todo_list, date)})
todo_list
end
def add_entry(todo_server, new_entry) do
send(todo_server, {:add_entry, new_entry})
end
def entries(todo_server, date) do
send(todo_server, {:entries, self, date})
receive do
{:todo_entries, entries} -> entries
after 5000 ->
{:error, :timeout}
end
end
end
调用方式如下:
iex(1)> todo_server = TodoServer.start
#PID<0.66.0>
iex(2)> TodoServer.add_entry(todo_server,
...(2)> %{date: {2013, 12, 19}, title: "Dentist"})
{:add_entry, %{date: {2013, 12, 19}, title: "Dentist"}}
iex(3)> TodoServer.entries(todo_server, {2013, 12, 19})
[%{date: {2013, 12, 19}, id: 1, title: "Dentist"}]
iex(8)> TodoServer.add_entry(todo_server,
...(8)> %{date: {2013, 12, 20}, title: "Shopping"})
{:add_entry, %{date: {2013, 12, 20}, title: "Shopping"}}
iex(9)> TodoServer.entries(todo_server, {2013, 12, 19})
[%{date: {2013, 12, 19}, id: 1, title: "Dentist"}]
iex(10)> TodoServer.entries(todo_server, {2013, 12, 20})
[%{date: {2013, 12, 20}, id: 2, title: "Shopping"}]
iex(11)> TodoServer.add_entry(todo_server,
...(11)> %{date: {2013, 12, 19}, title: "Movies"})
{:add_entry, %{date: {2013, 12, 19}, title: "Movies"}}
iex(12)> TodoServer.entries(todo_server, {2013, 12, 19})
[%{date: {2013, 12, 19}, id: 3, title: "Movies"},
%{date: {2013, 12, 19}, id: 1, title: "Dentist"}]
这样的调用方式,客户端需要知道开启的后台进程号。如果这个过程隐藏在模块中,岂不是更简洁。elixir提供了Process 模块的register函数,实现了针对进程的设置别名的应用。
其用法如下:
iex(1)> Process.register(self, :some_name)
iex(2)> send(:some_name, :msg)
iex(3)> receive do
msg -> IO.puts "received #{msg}"
end
received msg
修改TodoServer如下:
defmodule TodoServer do
def start do
pid = spawn(fn -> loop(TodoList.new) end)
Process.register(pid, :todo_server)
end
defp loop(todo_list) do
new_todo_list = receive do
message -> process_message(todo_list, message)
end
loop(new_todo_list)
end
def process_message(todo_list, {:add_entry, new_entry}) do
TodoList.add_entry(todo_list, new_entry)
end
def process_message(todo_list, {:entries, caller, date}) do
send(caller, {:todo_entries, TodoList.entries(todo_list, date)})
todo_list
end
def add_entry(new_entry) do
send(:todo_server, {:add_entry, new_entry})
end
def entries(date) do
send(:todo_server, {:entries, self, date})
receive do
{:todo_entries, entries} -> entries
after 5000 ->
{:error, :timeout}
end
end
end
调用方式如下:
iex(1)> TodoServer.start
true
iex(2)> TodoServer.add_entry(%{date: {2013, 12, 19}, title: "Dentist"})
{:add_entry, %{date: {2013, 12, 19}, title: "Dentist"}}
iex(3)> TodoServer.add_entry(%{date: {2013, 12, 20}, title: "Shopping"})
{:add_entry, %{date: {2013, 12, 20}, title: "Shopping"}}
iex(4)> TodoServer.add_entry(%{date: {2013, 12, 19}, title: "Movies"})
{:add_entry, %{date: {2013, 12, 19}, title: "Movies"}}
iex(5)> TodoServer.entries({2013, 12, 19})
[%{date: {2013, 12, 19}, id: 3, title: "Movies"},
%{date: {2013, 12, 19}, id: 1, title: "Dentist"}]
可见,后台执行任务的进程,相对客户端被隐藏啦。