ruby并发编程一般使用Thread实现,但是Thread默认使用时通过共享内存
的使用的,即在子线程和主线程(或其他子线程)是get/set同一套变量的,不使用锁则会因数据竞争
导致执行不可控,执行结果不正确:
counter = 0
threads = 10.times.map do
Thread.new do
1000.times do
counter += 1 # 非原子操作:读、加、写
end
end
end
threads.each(&:join)
puts counter # 期望是 10000,但结果可能小于 10000
但ruby的GIL保证只有一个线程执行,我运行了很久都没遇到小于10000的情况(但不能保证不会遇到)
使用锁又会导致性能低下,以及死锁等问题(多个共享资源、多个锁的情况);并发执行多个且有分支控制时,也会导致代码逻辑过于复杂,容易出bug且难以调试。
concurrent-ruby
是一个并发编程的工具集,可以使用其提供的并发原语,方便地实现多线程编程。
Concurrent::Async
Async模块是一种将简单但强大的异步功能混合到任何普通的旧式 Ruby 对象或类中的方法,将每个对象变成一个简单的 Actor。方法调用在后台线程上处理。调用者可以在后台进行处理时自由执行其他操作。
require 'concurrent-ruby'
class A
# 引入Async module
include Concurrent::Async
def say_it(word)
raise 'a is wrong' if word == 'a'
sleep(1)
puts "say #{word}"
true
end
end
# 异步方式调用
a = A.new.async.say_it('a') #异步执行
# value可传递timeout参数->超时秒数,执行到此时如果异步任务执行完,则返回结果,正在执行,则等待(阻塞)
puts a.value # 调用失败时 value为nil,如果是value!方法直接抛出异常
puts a.reason # error
b = A.new.async.say_it('b')
puts b.value # true
puts b.reason # 错误为nil
# 阻塞方式调用
c = A.new.await.say_it('c')
puts c
执行的结果是一个IVar对象,能够检查执行状态,返回结果等
Concurrent::ScheduledTask & Concurrent::TimerTask
ScheduledTask
为在指定的延迟后执行
require 'concurrent-ruby'
task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' } # 2秒延时
puts task.class
puts task.state #=> :unscheduled
task.execute # 开始执行
puts task.state #=> pending
# wait for it...
sleep(3)
puts task.unscheduled? #=> false
puts task.pending? #=> false
puts task.fulfilled? #=> true
puts task.rejected? #=> false
puts task.value #=> 'What does the fox say?'
TimerTask
运行一个定期执行任务
require 'concurrent-ruby'
# execution_interval 执行间隔(秒数),默认60秒
# interval_type 间隔方式默认fixed_delay
# fixed_delay: 上一次执行结束和下一次执行开始之间的间隔
# fixed_rate:上一次执行开始到下一次开始执行的间隔(如果执行时间超过间隔,则下一次执行将在前一次执行完成后立即开始。不会并发运行)
#
task = Concurrent::TimerTask.new(execution_interval: 1, interval_type: :fixed_rate){ t = Time.now.sec; raise "aaa" if t % 5 == 0 ; puts t; t }
# 观察者类
class TaskObserver
# time 执行时间
# result 执行结果
# ex 异常
def update(time, result, ex)
if result
print "(#{time}) Execution successfully returned #{result}\n"
else
print "(#{time}) Execution failed with error #{ex}\n"
end
end
end
# 添加观察者
task.add_observer(TaskObserver.new) # 调用其update方法
task.execute # 开始执行
# 异步执行,防止主线程结束
gets
# 关闭
task.shutdown
Concurrent::Promises
提供优雅的方式实现处理异步计算和任务链式执行。
require 'concurrent-ruby'
x = Concurrent::Promises.
future(2) { |v| raise 'aa' }.
# 顺序执行
then(&:succ).then{|x| x -= 1}.
# 异常时
rescue { |error| 999 }
puts x.result.inspect # 3
# 分支执行
head = Concurrent::Promises.fulfilled_future(-1) #
branch1 = head.then(&:abs).then(&:succ) # 分支1(绝对值 -> +1)
branch2 = head.then(&:succ).then(&:abs) # 分支2(+1 -> 绝对值)
puts head.value # -1
puts branch1.value # 2
puts branch2.value # 0
# 压缩分支
puts (branch1 & branch2).value.inspect # [2, 0]
# 任意分支
puts (branch1.then{raise 'a'} | branch2).value.inspect # 0
线程池:限制线程数量
require 'concurrent-ruby'
pool = Concurrent::FixedThreadPool.new(5) # 最大5 threads
# 可以指定最大,最小线程数,回收空闲时间,
# pool = Concurrent::ThreadPoolExecutor.new(
# min_threads: 5, # 最小线程数
# max_threads: 5, # 最大线程数
# idletime: 60, # 回收空闲时间
# max_queue: 0 # 最大队列大小
# fallback_policy: :abort # 等待队列满时策略:abort(异常),discard(丢弃),caller_runs(调用者线程执行)
# )
# 使用线程池执行异步任务
promises = (1..10).map do |i|
# 在pool上执行
Concurrent::Promises.future_on(pool, i) do |i|
sleep 1 # 模拟耗时操作
raise "x" if i %2 == 0
puts "Task #{i} completed by #{Thread.current.object_id}"
i * 2
end
end
puts pool.running? # true
# 等待所有任务完成并获取结果
results = promises.map(&:value) # [2, nil, 6, nil, 10, nil, 14, nil, 18, nil]
puts pool.running? # true
puts "Results: #{results}"
# pool.wait_for_termination # 等待执行完关闭(但测试时报错,未知原因)
puts pool.running? # true
pool.shutdown # 需要手动关闭
go风格channel
实验版本edge才有的功能。
require 'concurrent-edge'
# 输入channel,容量为2
in_c = Concurrent::Channel.new(capacity: 2)
# 输出channel
out_c = Concurrent::Channel.new(capacity: 2)
# 写入数据
Concurrent::Channel.go do
10.times do |i|
in_c.put(i)
end
in_c.close
end
Concurrent::Channel.go do
loop do
# 读取channel数据
msg = ~ in_c
break if msg.nil? # close时发送的nil数据
out_c << (msg ** 2)
end
# 等效写法,each(忽略掉close的)
# in_c.each do |msg|
# out_c << msg ** 2
# end
out_c.close # 关闭channel
end
# 读取
loop do
v = ~out_c
break if v.nil?
puts v
end
Ractor
上面说的花里胡哨的,但是因为ruby的 GIL (Global Interpreter Lock),多线程其实在同一时间,只有一个在执行。
这就使多线程仅在IO阻塞时起到作用,多CPU其实是用不到的。
Ractor 是 Ruby 3 新引入的特性。Ractor 顾名思义是 Ruby 和 Actor 的组合词。Actor 模型是一个基于通讯的、非锁同步的并发模型。
NUM = 100000
THREAD_NUM = 4
BATCH_SIZE = NUM / THREAD_NUM
def ractor_run(num)
Ractor.new(num) do |start_index|
sum = (start_index...start_index + BATCH_SIZE).inject(0) do |sum, i|
sum += i ** 2
end
Ractor.yield(sum)
end
end
def thread_run(num)
Thread.new(num) do |start_index|
(start_index...start_index+BATCH_SIZE).inject(0) do |sum, i|
sum += i ** 2
end
end
end
def ractor_sum
THREAD_NUM.times.map do |i|
ractor_run(i * BATCH_SIZE)
end.map(&:take).sum
end
def thread_sum
THREAD_NUM.times.map do |i|
thread_run(i * BATCH_SIZE)
end.map{|t| t.join.value}.sum
end
def normal_sum
(0...NUM).inject(0) do |sum, i|
sum += i ** 2
end
end
puts thread_sum
puts ractor_sum
puts normal_sum
require 'benchmark'
Benchmark.bmbm do |x|
# sequential version
x.report('normal'){ 100.times{normal_sum} }
# parallel version with thread
x.report('thread'){ 100.times{thread_sum}}
# parallel version with ractors
x.report('ractor'){ 100.times{ractor_sum} }
end
我们看到 ractor对比thread和normal提升超过三倍,而thread对比normal甚至稍慢。
但Ractor也是有代价的,Ractor之间时数据隔离的:
- 只能通过消息传递(send 和 receive)进行通信。
- 数据传递时,必须是 深拷贝 或 不可变 的对象(如数字、符号等)。
ractor内发送消息(from 1),最后发送4
r1 = Ractor.new {Ractor.yield 'from 1';puts 'go on';4}
r1.take # from 1
# print go on
r1.take # get 4
ractor内接收消息,最后发送5
r1 = Ractor.new {msg = Ractor.receive;puts msg;5}
r1.send('ok')
# print ok
r1.take # get 5
take未能接收到消息时,阻塞
可共享的object_id是一样的
one = '3'.freeze
r = Ractor.new(one) do |one|
one
end
two = r.take
puts one.object_id # 60
puts two.object_id # 60
不可共享的object_id就不一样了(复制了一份)
one = []
r = Ractor.new(one) do |one|
one
end
two = r.take
puts one.object_id # 80
puts two.object_id # 100
move 移动,这会将 对象移动到接收方,使发送方无法访问它。
one = []
r = Ractor.new do
x = Ractor.receive
x
end
r.send(one, move: true)
two = r.take
puts two.object_id # 60
# 移动了,不能再使用
puts one.object_id # `method_missing': can not send any methods to a moved object (Ractor::MovedError)
结语
-
Concurrent::Async
,可以以简单的方式实现异步调用(或修改已有代码为异步方式), -
Concurrent::ScheduledTask & Concurrent::TimerTask
能够制定延时执行和定期执行 -
Concurrent::Promises
实现链式调用、分支处理 - thread pool 实现对并发的数量控制
- go风格channel实现消息机制的异步调用
- Ractor实现真正的并行。