ruby并发编程

ruby并发编程一般使用Thread实现,但是Thread默认使用时通过共享内存的使用的,即在子线程和主线程(或其他子线程)是get/set同一套变量的,不使用锁则会因数据竞争导致执行不可控,执行结果不正确:

counter = 0

threads = 10.times.map do
  Thread.new do
    1000.times do
      counter += 1  # 非原子操作:读、加、写
    end
  end
end

threads.each(&:join)
puts counter  # 期望是 10000,但结果可能小于 10000

但ruby的GIL保证只有一个线程执行,我运行了很久都没遇到小于10000的情况(但不能保证不会遇到)

使用锁又会导致性能低下,以及死锁等问题(多个共享资源、多个锁的情况);并发执行多个且有分支控制时,也会导致代码逻辑过于复杂,容易出bug且难以调试。

concurrent-ruby是一个并发编程的工具集,可以使用其提供的并发原语,方便地实现多线程编程。

Concurrent::Async

Async模块是一种将简单但强大的异步功能混合到任何普通的旧式 Ruby 对象或类中的方法,将每个对象变成一个简单的 Actor。方法调用在后台线程上处理。调用者可以在后台进行处理时自由执行其他操作。

require 'concurrent-ruby'
class A
  # 引入Async module
  include Concurrent::Async

  def say_it(word)
    raise 'a is wrong' if word == 'a'

    sleep(1)
    puts "say #{word}"
    true
  end
end
# 异步方式调用
a = A.new.async.say_it('a') #异步执行
# value可传递timeout参数->超时秒数,执行到此时如果异步任务执行完,则返回结果,正在执行,则等待(阻塞)
puts a.value # 调用失败时 value为nil,如果是value!方法直接抛出异常
puts a.reason # error

b = A.new.async.say_it('b')
puts b.value # true
puts b.reason # 错误为nil

# 阻塞方式调用
c = A.new.await.say_it('c')
puts c

执行的结果是一个IVar对象,能够检查执行状态,返回结果等

Concurrent::ScheduledTask & Concurrent::TimerTask

ScheduledTask为在指定的延迟后执行

require 'concurrent-ruby'
task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' } # 2秒延时
puts task.class
puts task.state         #=> :unscheduled
task.execute # 开始执行
puts task.state         #=> pending

# wait for it...
sleep(3)

puts task.unscheduled? #=> false
puts task.pending?     #=> false
puts task.fulfilled?   #=> true
puts task.rejected?    #=> false
puts task.value        #=> 'What does the fox say?'

TimerTask运行一个定期执行任务

require 'concurrent-ruby'

# execution_interval 执行间隔(秒数),默认60秒
# interval_type 间隔方式默认fixed_delay
#               fixed_delay: 上一次执行结束和下一次执行开始之间的间隔
#               fixed_rate:上一次执行开始到下一次开始执行的间隔(如果执行时间超过间隔,则下一次执行将在前一次执行完成后立即开始。不会并发运行)
#                 
task = Concurrent::TimerTask.new(execution_interval: 1, interval_type: :fixed_rate){ t =  Time.now.sec; raise "aaa" if t % 5 == 0 ; puts t; t }

# 观察者类
class TaskObserver
  # time 执行时间
  # result 执行结果
  # ex 异常
  def update(time, result, ex)
    if result
      print "(#{time}) Execution successfully returned #{result}\n"
    else
      print "(#{time}) Execution failed with error #{ex}\n"
    end
  end
end

# 添加观察者
task.add_observer(TaskObserver.new) # 调用其update方法

task.execute # 开始执行

# 异步执行,防止主线程结束
gets
# 关闭
task.shutdown

Concurrent::Promises

提供优雅的方式实现处理异步计算和任务链式执行。

require 'concurrent-ruby'

x =  Concurrent::Promises.
    future(2) { |v| raise 'aa' }.
    # 顺序执行
    then(&:succ).then{|x| x -= 1}.
    # 异常时
    rescue { |error| 999 }
puts x.result.inspect # 3



# 分支执行
head    = Concurrent::Promises.fulfilled_future(-1) # 
branch1 = head.then(&:abs).then(&:succ) # 分支1(绝对值 -> +1)
branch2 = head.then(&:succ).then(&:abs) # 分支2(+1 -> 绝对值)

puts head.value # -1
puts branch1.value # 2
puts branch2.value # 0

# 压缩分支
puts (branch1 & branch2).value.inspect # [2, 0]
# 任意分支
puts (branch1.then{raise 'a'} | branch2).value.inspect # 0



线程池:限制线程数量

require 'concurrent-ruby'

pool = Concurrent::FixedThreadPool.new(5) # 最大5 threads

# 可以指定最大,最小线程数,回收空闲时间,
# pool = Concurrent::ThreadPoolExecutor.new(
#    min_threads: 5, # 最小线程数
#    max_threads: 5, # 最大线程数
#    idletime: 60, # 回收空闲时间
#    max_queue: 0 # 最大队列大小
#    fallback_policy: :abort # 等待队列满时策略:abort(异常),discard(丢弃),caller_runs(调用者线程执行)
# )
# 使用线程池执行异步任务
promises = (1..10).map do |i|
  # 在pool上执行
  Concurrent::Promises.future_on(pool, i) do |i|
    sleep 1 # 模拟耗时操作
    raise "x" if i %2 == 0
    puts "Task #{i} completed by #{Thread.current.object_id}"
    i * 2
  end
end
puts pool.running? # true
# 等待所有任务完成并获取结果
results = promises.map(&:value) # [2, nil, 6, nil, 10, nil, 14, nil, 18, nil]
puts pool.running? # true
puts "Results: #{results}"

# pool.wait_for_termination # 等待执行完关闭(但测试时报错,未知原因)
puts pool.running? # true
pool.shutdown # 需要手动关闭

go风格channel

实验版本edge才有的功能。

require 'concurrent-edge'
# 输入channel,容量为2
in_c = Concurrent::Channel.new(capacity: 2)
# 输出channel
out_c = Concurrent::Channel.new(capacity: 2)

# 写入数据
Concurrent::Channel.go do
  10.times do |i|
    in_c.put(i)
  end
  in_c.close
end


Concurrent::Channel.go do
  loop do
    # 读取channel数据
    msg = ~ in_c
    break if msg.nil? # close时发送的nil数据

    out_c << (msg ** 2)
  end
  # 等效写法,each(忽略掉close的)
  # in_c.each do |msg|
  #  out_c << msg ** 2
  # end
  out_c.close # 关闭channel
end

# 读取
loop do
  v = ~out_c
  break if v.nil?
  puts v
end

Ractor

上面说的花里胡哨的,但是因为ruby的 GIL (Global Interpreter Lock),多线程其实在同一时间,只有一个在执行。
这就使多线程仅在IO阻塞时起到作用,多CPU其实是用不到的。
Ractor 是 Ruby 3 新引入的特性。Ractor 顾名思义是 Ruby 和 Actor 的组合词。Actor 模型是一个基于通讯的、非锁同步的并发模型。

NUM = 100000
THREAD_NUM = 4
BATCH_SIZE = NUM / THREAD_NUM 
def ractor_run(num)
  Ractor.new(num) do |start_index|
    sum = (start_index...start_index + BATCH_SIZE).inject(0) do |sum, i|
      sum += i ** 2
    end
    Ractor.yield(sum)
  end
end

def thread_run(num)
  Thread.new(num) do |start_index|
    (start_index...start_index+BATCH_SIZE).inject(0) do |sum, i|
      sum += i ** 2
    end
  end
end

def ractor_sum
  THREAD_NUM.times.map do |i|
    ractor_run(i * BATCH_SIZE)
  end.map(&:take).sum
end

def thread_sum
  THREAD_NUM.times.map do |i|
    thread_run(i * BATCH_SIZE)
  end.map{|t| t.join.value}.sum
end

def normal_sum
  (0...NUM).inject(0) do |sum, i|
    sum += i ** 2
  end
end

puts thread_sum

puts ractor_sum

puts normal_sum
require 'benchmark'

Benchmark.bmbm do |x|
  # sequential version
  x.report('normal'){ 100.times{normal_sum} }

  # parallel version with thread
  x.report('thread'){ 100.times{thread_sum}}
 
  # parallel version with ractors
  x.report('ractor'){ 100.times{ractor_sum} }
end

image.png

我们看到 ractor对比thread和normal提升超过三倍,而thread对比normal甚至稍慢。

但Ractor也是有代价的,Ractor之间时数据隔离的:

  • 只能通过消息传递(send 和 receive)进行通信。
  • 数据传递时,必须是 深拷贝 或 不可变 的对象(如数字、符号等)。

ractor内发送消息(from 1),最后发送4

r1 = Ractor.new {Ractor.yield 'from 1';puts 'go on';4}
 r1.take # from 1
# print go on
r1.take # get 4

ractor内接收消息,最后发送5

r1 = Ractor.new {msg = Ractor.receive;puts msg;5}
r1.send('ok') 
# print ok
r1.take # get 5

take未能接收到消息时,阻塞

可共享的object_id是一样的

one = '3'.freeze
r = Ractor.new(one) do |one|
  one
end

two = r.take
puts one.object_id # 60
puts two.object_id # 60

不可共享的object_id就不一样了(复制了一份)

one = []
r = Ractor.new(one) do |one|
  one
end

two = r.take
puts one.object_id # 80
puts two.object_id # 100

move 移动,这会将 对象移动到接收方,使发送方无法访问它。

one = []
r = Ractor.new do
  x = Ractor.receive
  x
end
r.send(one, move: true)

two = r.take
puts two.object_id # 60
# 移动了,不能再使用
puts one.object_id #  `method_missing': can not send any methods to a moved object (Ractor::MovedError)

结语

  • Concurrent::Async,可以以简单的方式实现异步调用(或修改已有代码为异步方式),
  • Concurrent::ScheduledTask & Concurrent::TimerTask能够制定延时执行和定期执行
  • Concurrent::Promises 实现链式调用、分支处理
  • thread pool 实现对并发的数量控制
  • go风格channel实现消息机制的异步调用
  • Ractor实现真正的并行。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,122评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,070评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,491评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,636评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,676评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,541评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,292评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,211评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,655评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,846评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,965评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,684评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,295评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,894评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,012评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,126评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,914评论 2 355

推荐阅读更多精彩内容