Rails多线程和DB connection pool

在rails的多线程编程中,connection pool(连接池)是一种常见解决方案,在一些IO操作中,connection pool能够降低多线程操作的并发成本,提高并发能力。常见的有开源项目:https://github.com/mperham/connection_pool,比如在rails项目中使用redis就可以设置连接池:

$redis = ConnectionPool.new(size: 5, timeout: 5) { Redis.connect }
$redis.with do |conn|
  conn.sadd('foo', 1)
  conn.smembers('foo')
end

还有一个比较常见的连接池使用是在rails项目的database配置中,config/database.yml:

production:
  url:  <%= ENV["DATABASE_URL"] %>
  pool: <%= ENV["DB_POOL"] || ENV['MAX_THREADS'] || 5 %>

刚好我们的一个项目在这个Database的connection pool上,碰到了一些问题,这里稍微做一些探讨。

问题

我们的项目中有一个并行的需求,因为要对某一时间段数据做大量的查询和聚合运算,查询次数会比较大,对数据库单个SQL做优化后查询时间还是比较长,鉴于是DB IO阻塞操作,所以这里使用了多线程并行查询,优化整体请求时间,我们用了多线程之后,请求处理时间得到优化,但是并发能力反倒下降,常常遇到报错:

ActiveRecord::ConnectionTimeoutError - could not obtain a database connection within 5 seconds. The max pool size is currently 5; consider increasing it

技术栈

这里先对项目做个大致的介绍,该项目使用的框架是Ruby on Rails,数据库是postgreSQL,model层则使用的原生的ActiveRecord,多线程并行部分是在一个model中使用,业务是对一些日期中每天的每个设备的不同指标的读数做一些运算放到数组中,并行则是在这个库:https://github.com/grosser/parallel 的帮助下实现:

Parallel.each(dates, in_threads: 10) do |day| # 第一层循环:对dates中的每天循环执行,并且调用Parallel开启10线程
  date = Date.parse day
  time_s, time_e = generate_time(day, from, to)
  averages = devices.map do |device_id| #第二层循环:对devices循环
     average_by_time_range(device_id, time_s, time_e) #调用查询方法
  end
  daily_average_results[day] = {
    date: day,
    data: averages
  }
end


def average_by_time_range(device_id, time_s, time_e)
  average = {device_id: device_id, four_hours_lost: false, data_complete: data_complete}
  average_sql = "SELECT AVG(pm2p5) \"pm2p5\", AVG(co2) \"co2\", AVG(tvoc) \"tvoc\"  FROM monitor_hourly_readings WHERE device_id = #{device_id}  AND reading_time BETWEEN '#{time_s}' AND '#{time_e}' "
  results = ActiveRecord::Base.connection.execute(average_sql)[0] #调用sql
  [:pm2p5, :co2, :tvoc].each do |indicator_name|
    average_reading = results[indicator_name.to_s]
    if !average_reading
      average[indicator_name] = nil
    elsif check_four_hours_lost?(device_id, indicator_name ,time_s, time_e) #调用判断方法
      average[indicator_name] = nil
      average[:four_hours_lost] = true
    else
      average[indicator_name] = average_reading.to_f
    end

  end
  average
end

def check_four_hours_lost?(device_id, indicator_name ,time_s, time_e)
  four_hours_failed_sql = ["SELECT COUNT(*) FROM ( SELECT *, reading_time - LAG(reading_time)
    OVER (order by reading_time) AS change FROM monitor_hourly_readings WHERE device_id = ?
    AND  reading_time >= ? AND reading_time <= ? AND #{indicator_name} IS NOT NULL) AS inner_table WHERE change >= interval '4 hours'",
    device_id, time_s, time_e]
  count_by_sql(four_hours_failed_sql) > 0#调用sql查询
end

这一段代码开始用parallel实现10个thread来对一段时间中每天的结果做查询和计算,具体每天的查询则是在方法 average_by_time_range 中实现,即ActiveRecord::Base.connection.execute() 执行了一段SQL。同时, average_by_time_range 中调用了 check_four_hours_lost? 方法完成一段SQL查询的判断, check_four_hours_lost? 中使用了ActiveRecord内建的 count_by_sql 方法。所以这里在两个方法中分别调用了数据库,并且嵌在了dates和devices两层循环之中,所以查询次数相对较大。

解决

面对如上问题做了一些简单的尝试去修改代码和测试,按照思路依次做了如下修改:

  1. 修改config
    在发现ActiveRecord::ConnectionTimeoutError报错常出现之后,最暴力的解决方案就是加大了config/database.yml中的pool数字,因为是内部工具项目,对并发的要求不是很高,一开始database.yml中的pool数字是10,这里修改到常见的25,测试之下并发能力有提升但是有限。如此小的并发下继续会报错,所以觉得最大的问题还是在并行使用中的错误用法了。
  2. 降低pool

所以在加大pool数字效果不明显的情况下考虑是否应该降低并行线程的数量,顺便也做了一些简单测试,发现在服务器上10线程相对4线程数据库IO效率提高有限,这里就将thread降低到了4:

Parallel.each(dates, in_threads: 4) do |day| 
  ...
end

做相应测试,确实并发能力提高不少,但是依旧不能满意,在使用时候还是会发生ActiveRecord::ConnectionTimeoutError

  1. 使用with_conneciton
    查看log,看问题发生的位置,是在手动调用sql的那句上:
 results = ActiveRecord::Base.connection.execute(average_sql)[0] #调用sql

感觉在对parallel的使用上还是存在问题,或者 ActiveRecord::Base.connection的使用会有特殊要求,回到parallel的文档上,查看有没有在parallel中使用ActiveRecord的具体说明,果然,还是要认真看文档的, parallel中给出了使用如何搭配使用ActiveRecord的sample:

# reproducibly fixes things (spec/cases/map_with_ar.rb)
Parallel.each(User.all, in_processes: 8) do |user|
  user.update_attribute(:some_attribute, some_value)
end
User.connection.reconnect!

# maybe helps: explicitly use connection pool
Parallel.each(User.all, in_threads: 8) do |user|
  ActiveRecord::Base.connection_pool.with_connection do
    user.update_attribute(:some_attribute, some_value)
  end
end

# maybe helps: reconnect once inside every fork
Parallel.each(User.all, in_processes: 8) do |user|
  @reconnected ||= User.connection.reconnect! || true
  user.update_attribute(:some_attribute, some_value)
end

这段sample里可以看到,parallel使用时候还是要对ActiveRecord的connection做特殊处理的,比如 reconnect! 方法或者放在 with_connection 方法。因此考虑原因,是否因为在使用ActiveRecord::Base.connection的时候得手动关闭connection,查看ActiveRecord的connection_pool实现的源码。
ActiveRecord没有使用之前提到的开源的connection pool,而是自己实现了一套connection pool,class的层级结构大致如下:

  • class ConnectionPool
    • class Queue
    • class Reaper

这里的Queue 主要功能是用来存放链接池中创建的connections

def initialize(lock = Monitor.new)
  @lock = lock
  @cond = @lock.new_cond
  @num_waiting = 0
  @queue = []
end

可以看到,Queue使用Monitor作为线程锁来保证多线程的thread safe,使用数组来存放connections,以数组的push和slice方法实现先进先出的存储堆结构,在此之上实现了add, clear, remove等功能方法,其中值得注意的是实现了基于timeout的轮询机制:

def poll(timeout = nil)
  synchronize do
    if timeout
      no_wait_poll || wait_poll(timeout)
    else
      no_wait_poll
    end
  end
end


# Waits on the queue up to +timeout+ seconds, then removes and
# returns the head of the queue.
def wait_poll(timeout)
  @num_waiting += 1

  t0 = Time.now
  elapsed = 0
  loop do
    @cond.wait(timeout - elapsed)

    return remove if any?

    elapsed = Time.now - t0
    if elapsed >= timeout
      msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' %
        [timeout, elapsed]
      raise ConnectionTimeoutError, msg
    end
  end
ensure
  @num_waiting -= 1
end

然后Reaper这个类的作用就比较简单了,会在每个connection_pool实例化的时候另起一个线程,用来定时清除未被正常释放的connection,整个实现才一二十行:

  # Every +frequency+ seconds, the reaper will call +reap+ on +pool+.
  # A reaper instantiated with a nil frequency will never reap the
  # connection pool.
  #
  # Configure the frequency by setting "reaping_frequency" in your
  # database yaml file.
  class Reaper
    attr_reader :pool, :frequency

    def initialize(pool, frequency)
      @pool      = pool
      @frequency = frequency
    end

    def run
      return unless frequency
      Thread.new(frequency, pool) { |t, p|
        while true
          sleep t
          p.reap
        end
      }
    end
  end

这里比较关键的是 frequency 变量,会在connection_pool实例化的时候传入,默认数值是5 seconds,所以对于没有正确释放的connection会在5秒后自动回收。

ConnectionPool类相对比较复杂,里面值得注意的几个方法

def connection
  # this is correctly done double-checked locking
  # (ThreadSafe::Cache's lookups have volatile semantics)
  @reserved_connections[current_connection_id] || synchronize do
    @reserved_connections[current_connection_id] ||= checkout
  end
end

def with_connection
  connection_id = current_connection_id
  fresh_connection = true unless active_connection?
  yield connection
ensure
  release_connection(connection_id) if fresh_connection
end

def checkout
  synchronize do
    conn = acquire_connection
    conn.lease
    checkout_and_verify(conn)
  end
end

def checkin(conn)
  synchronize do
    conn.run_callbacks :checkin do
      conn.expire
    end

    release conn

    @available.add conn
  end
end

def acquire_connection
  if conn = @available.poll
    conn
  elsif @connections.size < @size
    checkout_new_connection
  else
    @available.poll(@checkout_timeout)
  end
end


def current_connection_id #:nodoc:
  Base.connection_id ||= Thread.current.object_id
end

从pool里获取可用connection的三个公开方法: connection, with_connection, checkout,而checkin 则是回收该线程的connection,每个connection是用所属线程的object_id来标记的,见current_connection_id方法。
这几个获取connection方法有细微区别,connection方法是获取当前线程的connection,没有则通过checkout创建,with_connection则是获取或者创建新后讲connection传入内block执行,并且用ensure语句保证在执行完成之后release这个connectioncheckout 方法则是通过调用私有方法:acquire_connection 获取新连接。
所以我们回到之前我们自己的业务代码,就发现我们使用的results = ActiveRecord::Base.connection.execute(average_sql)[0]是不会主动释放connection的,这样需要等到Reaper在五秒之后自动释放回收,所以对于多线程情况,我们应该将代码放在with_connection方法的block中,来保证 connection 及时的release,增加并行能力。所以我们将:

results = ActiveRecord::Base.connection.execute(average_sql)[0] 

修改为

ActiveRecord::Base.with_connection do |conn|
  results = conn.execute(average_sql)[0]
end
  1. count_by_sql方法
    通过上述修改,再次测试,发现在同样并行压力下依然会报错,但是报错位置从results = ActiveRecord::Base.connection.execute(average_sql)[0] 跳到 count_by_sql(four_hours_failed_sql) > 0 这个位置,这就奇怪了,本来觉得是因为手动调用ActiveRecord::Base.connection.execute()链接没有释放的原因,修改后怎么并发没有提升呢,就翻看了ActiveRecord的源码:
# ==== Parameters
#
# * +sql+ - An SQL statement which should return a count query from the database, see the example below.
#
#   Product.count_by_sql "SELECT COUNT(*) FROM sales s, customers c WHERE s.customer_id = c.id"
def count_by_sql(sql)
  sql = sanitize_conditions(sql)
  connection.select_value(sql, "#{name} Count").to_i
end

原来这里使用的也是connection方法,所以如上条,我们得把这里的查询也放进with_connection的block里面,所以最终代码是:

def average_by_time_range(device_id, time_s, time_e)
  average = {device_id: device_id, four_hours_lost: false, data_complete: data_complete}
  average_sql = "SELECT AVG(pm2p5) \"pm2p5\", AVG(co2) \"co2\", AVG(tvoc) \"tvoc\"  FROM monitor_hourly_readings WHERE device_id = #{device_id}  AND reading_time BETWEEN '#{time_s}' AND '#{time_e}' "
  ActiveRecord::Base.with_connection.do |conn|
    results = conn.execute(average_sql)[0] #调用sql
    [:pm2p5, :co2, :tvoc].each do |indicator_name|
      average_reading = results[indicator_name.to_s]
      if !average_reading
        average[indicator_name] = nil
      elsif check_four_hours_lost?(device_id, indicator_name ,time_s, time_e) #调用判断方法
        average[indicator_name] = nil
        average[:four_hours_lost] = true
      else
        average[indicator_name] = average_reading.to_f
      end

    end
  end
  average
end

经测试,确实有效提高了并发能力。

探讨

以上的测试因为没有完整定量测试报告,所以不给出具体数据了,然后这里想为什么ActiveRecord里面的find_by_sql, count_by_sql等默认使用connection方法而不是with_connection呢? 想一想可能也是为了优化操作效率,因为ActiveRecord的并行场景不常规,然后如果使用with_connection则反倒会增加每次的调用成本。然后另外一个问题是config/databse.yml中pool size该怎么设置呢,按照什么原则?比如server使用的puma,则可能对应于设置的thread数量,同时也得考虑sidekiq的concurrency数量。但是仅仅这样,为什么不尽量设置大一点的数字呢?可能是内存等资源的压力之间的平衡问题了,相必对于这个问题得再做深入的探讨和一些测试。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容