【Python】多进程队列问题

最近在使用Python多进程时,遇到一个偶现队列死锁问题:
process1 是生产者,进程池中有两个进程process2_1,process2_2是消费者。通过两个队列Q1和Q2来传输数据,具体问题如下:

用的Queue包是:

from multiprocessing import Queue

Process1代码如下:

    def run(self):
      thread.start_new_thread(self.__recv_msg_from_main,("recv_main_process", "11"))
      # start a video
      video_name = "video" + str(ConfigManager.get_client_key())
      video_capture = WebcamVideoStream(ConfigManager.get_sources(),
                                        ConfigManager.get_width(),
                                        ConfigManager.get_height()).start()
      while True:
          if self.__gate_open:
              time.sleep(0.04)
              ret, origin_frame = video_capture.read()

              if not ret:
                  logger.warn("capture failed!")
                  continue

              logger.debug("video capture __input_q.size: %d, __output_q.size: %d", self.__input_queue.qsize(), self.__output_queue.qsize())

              # frame pre handle
              frame = self.__frame_pre_handle(origin_frame)

              self.__in_queue.put((self.__operation_id, self.__frame_id, frame))
              self.__frame_id += 1

              if ConfigManager.get_show_video() == 1:
                  try:
                      frame_id, output_frame = self.__out_queue.get(False, 1)
                  except Empty:
                      logger.debug("video capture output queue is empty")
                      continue

                  cv2.imshow(video_name, output_frame)
                  if cv2.waitKey(1) & 0xFF == ord('q'):
                      logger.warn("exit for waitKey!")
                      break

          else:
              time.sleep(1)
              logger.fatal("video capture process exit abnomal !!!")

Process2_1,Process2_2代码如下:

def worker(input_q, detect_q, update_q):
  # Load a (frozen) Tensorflow model into memory.
  detection_graph = tf.Graph()
  with detection_graph.as_default():
      od_graph_def = tf.GraphDef()
      with tf.gfile.GFile(PathManager.get_ckpt_path(), 'rb') as fid:
          serialized_graph = fid.read()
          od_graph_def.ParseFromString(serialized_graph)
          tf.import_graph_def(od_graph_def, name='')

      sess = tf.Session(graph=detection_graph)

  category_index = ComponentManager.get_category_index()
  width = ConfigManager.get_width()
  height = ConfigManager.get_height()

  while True:
      operation_id, frame_id, frame = input_q.get()
      frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
      logger.debug("video parse in_queue_size: %d", n, input_q.qsize())

      updated_frame, score, classes, boxes = parse_origin_video_frame(frame_rgb,
                                                                      sess,
                                                                      detection_graph,
                                                                      category_index)
      items = item_detect(score, classes, boxes, category_index)
      if items:
          valid_frame = Frame(operation_id, frame_id)
          for item in items:
              logger.debug("frame_id: %d, item[%d %s %d (%d %d)]", frame_id, item.get_id(),
                           item.get_name(), item.get_score(), width - item.get_x(), height - item.get_y())
              valid_frame.add_item(item)
          detect_q.put(valid_frame)

      # for imshow
      if ConfigManager.get_show_video() == 1:
          output_rgb = cv2.cvtColor(updated_frame, cv2.COLOR_RGB2BGR)
          update_q.put((frame_id, output_rgb))
  sess.close()

class VideoParserProcess(object):
  def __init__(self, in_queue, detected_queue, update_queue):
      self.__video_name = 'Video' + str(ConfigManager.get_sources())
      self.__in_queue = in_queue
      self.__detected_queue = detected_queue
      self.__update_queue = update_queue

  def run(self):
      self.__pool = Pool(ConfigManager.get_worker_num(), worker, (self.__in_queue, self.__detected_queue, self.__update_queue))

  def destory(self):
      self.__pool.terminate()

上面代码中用到了Queue类的三个方法:put,get,qsize
源码如下:

  def put(self, obj, block=True, timeout=None):
      assert not self._closed
      if not self._sem.acquire(block, timeout):
          raise Full

      self._notempty.acquire()
      try:
          if self._thread is None:
              self._start_thread()
          self._buffer.append(obj)
          self._notempty.notify()
      finally:
          self._notempty.release()

  def get(self, block=True, timeout=None):
      if block and timeout is None:
          self._rlock.acquire()
          try:
              res = self._recv()
              self._sem.release()
              return res
          finally:
              self._rlock.release()

      else:
          if block:
              deadline = time.time() + timeout
          if not self._rlock.acquire(block, timeout):
              raise Empty
          try:
              if block:
                  timeout = deadline - time.time()
                  if not self._poll(timeout):
                      raise Empty
              elif not self._poll():
                  raise Empty
              res = self._recv()
              self._sem.release()
              return res
          finally:
              self._rlock.release()

  def qsize(self):
      # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
      return self._maxsize - self._sem._semlock._get_value()

几个接口都有锁,实际把打印qsize的地方,都去掉,问题就没复现。。 待继续看看源码。初入python,要学的还很多。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,776评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,527评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,361评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,430评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,511评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,544评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,561评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,315评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,763评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,070评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,235评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,911评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,554评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,173评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,424评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,106评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,103评论 2 352

推荐阅读更多精彩内容

  • Python 面向对象Python从设计之初就已经是一门面向对象的语言,正因为如此,在Python中创建一个类和对...
    顺毛阅读 4,212评论 4 16
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,987评论 25 707
  • 进程和线程 进程线程的区别1、进程是什么?是具有一定独立功能的程序、它是系统进行资源分配和调度的一个独立单位,重点...
    HeartGo阅读 1,204评论 0 4
  • 人,真的要学着承担。当自己一天天的成长,我们所扮演的角色也愈来愈多。其中也难免会有些自己不擅长、不喜欢,甚至不习惯...
    909354b7e446阅读 144评论 0 0
  • 我风尘仆仆来朝拜您, 站立在您的牌坊下, 徜徉在您的街路里。 引马石上留下的凹坑深深浅浅, 客栈院里支撑的朽木斑驳...
    SS中阅读 318评论 9 10