python 函数超时停止装饰器

转载自https://draapho.github.io/2016/11/28/1622-python-time/

实用的例子

time.sleep 单线程阻塞延时

import time

def time_sleep():
 for i in range(10):
 print i
 time.sleep(1)       # delay 1s, not that accurate

if __name__ == "__main__":
 start = time.time()
 time_sleep()
 end = time.time()
 print "run time: {}".format(end - start)

time.time 单线程非阻塞延时/超时

通过比较时间戳实现, 多用于循环中的延时/超时判断

import time

def time_compare():
 timeout = time.time() + 10  # 10s delay
 for i in range(20):
 print i
 time.sleep(1)
 if timeout < time.time(): # compare the timestamps
 break
 print "time out !"

if __name__ == "__main__":
 start = time.time()
 time_compare()
 end = time.time()
 print "run time: {}".format(end - start)

threading.Timer 多线程非阻塞延时

这个例子中, 会先执行完 threading_main. 5s后, 才会执行 threading_sub
子线程函数可以带参 threading.Timer(interval, function, args=[], kwargs={})

import threading

def threading_main():
 print "main thread: start"
 thrd = threading.Timer(5.0, threading_sub, args = ["sub thread"])
 thrd.start()
 print "main thread: end"

def threading_sub(name):
 print name + ": hello"

if __name__ == "__main__":
 start = time.time()
 threading_main()
 end = time.time()
 print "run time: {}".format(end - start)

threading.Timer + threading.join 多线程阻塞延时

使用 join 语句, 让主线程等待子线程完成后才继续执行
子线程函数可以带参 threading.Timer(interval, function, args=[], kwargs={})

import threading

def threading_main():
 print "main thread: start"
 thrd = threading.Timer(5.0, threading_sub, args = ["sub thread"])
 thrd.start()
 print "main thread: wait"
 thrd.join()     # add this line
 # thrd.join(timeout=2)  # just wait 2s then continue
 print "main thread: end"

def threading_sub(name):
 print name + ": hello"

if __name__ == "__main__":
 start = time.time()
 threading_main()
 end = time.time()
 print "run time: {}".format(end - start)

装饰器

装饰器, 使用KThread,.localtrace结束线程. (通用性最好, 性能较低)

import threading

class Timeout(Exception):
 """function run timeout"""

class KThread(threading.Thread):

 def __init__(self, *args, **kwargs):
 threading.Thread.__init__(self, *args, **kwargs)
 self.killed = False

 def start(self):
 """Start the thread."""
 self.__run_backup = self.run
 # Force the Thread to install our trace.
 self.run = self.__run
 threading.Thread.start(self)

 def __run(self):
 """Hacked run function, which installs the trace."""
 sys.settrace(self.globaltrace)
 self.__run_backup()
 self.run = self.__run_backup

 def globaltrace(self, frame, why, arg):
 if why == 'call':
 return self.localtrace
 else:
 return None

 def localtrace(self, frame, why, arg):
 if self.killed:
 if why == 'line':
 raise SystemExit()
 return self.localtrace

 def kill(self):
 self.killed = True

def timeout(timeout, default=None, try_except=False):
 """Timeout decorator, parameter in timeout."""
 def timeout_decorator(func):
 def new_func(oldfunc, result, oldfunc_args, oldfunc_kwargs):
 result.append(oldfunc(*oldfunc_args, **oldfunc_kwargs))

 """Wrap the original function."""
 def func_wrapper(*args, **kwargs):
 result = []
 # create new args for _new_func, because we want to get the func
 # return val to result list
 new_kwargs = {
 'oldfunc': func,
 'result': result,
 'oldfunc_args': args,
 'oldfunc_kwargs': kwargs
 }

 thd = KThread(target=new_func, args=(), kwargs=new_kwargs)
 thd.start()
 thd.join(timeout)
 # timeout or finished?
 isAlive = thd.isAlive()
 thd.kill()

 if isAlive:
 if try_except is True:
 raise Timeout("{} Timeout: {} seconds.".format(func, timeout))
 return default
 else:
 return result[0]

 func_wrapper.__name__ = func.__name__
 func_wrapper.__doc__ = func.__doc__
 return func_wrapper

 return timeout_decorator

if __name__ == "__main__":
 import time

 @timeout(5)
 def count(name):
 for i in range(10):
 print("{}: {}".format(name, i))
 time.sleep(1)
 return "finished"

 try:
 print count("thread1")
 print count("thread2")
 except Timeout as e:
 print e

将上面的例子, 改为函数调用模式, 这样timeout参数可灵活设置!

import threading

class Timeout(Exception):
 """function run timeout"""

class KThread(threading.Thread):

 def __init__(self, *args, **kwargs):
 threading.Thread.__init__(self, *args, **kwargs)
 self.killed = False

 def start(self):
 """Start the thread."""
 self.__run_backup = self.run
 # Force the Thread to install our trace.
 self.run = self.__run
 threading.Thread.start(self)

 def __run(self):
 """Hacked run function, which installs the trace."""
 sys.settrace(self.globaltrace)
 self.__run_backup()
 self.run = self.__run_backup

 def globaltrace(self, frame, why, arg):
 if why == 'call':
 return self.localtrace
 else:
 return None

 def localtrace(self, frame, why, arg):
 if self.killed:
 if why == 'line':
 raise SystemExit()
 return self.localtrace

 def kill(self):
 self.killed = True

def timeout_call(timeout, func, args=(), kwargs=None, default=None, try_except=False):
 def new_func(oldfunc, result, oldfunc_args, oldfunc_kwargs):
 result.append(oldfunc(*oldfunc_args, **oldfunc_kwargs))

 result = []
 kwargs = {} if kwargs is None else kwargs
 # create new args for _new_func, because we want to get the func
 # return val to result list
 new_kwargs = {
 'oldfunc': func,
 'result': result,
 'oldfunc_args': args,
 'oldfunc_kwargs': kwargs
 }

 thd = KThread(target=new_func, args=(), kwargs=new_kwargs)
 thd.start()
 thd.join(timeout)
 # timeout or finished?
 isAlive = thd.isAlive()
 thd.kill()

 if isAlive:
 if try_except is True:
 raise Timeout("{} Timeout: {} seconds.".format(func, timeout))
 return default
 else:
 return result[0]

if __name__ == "__main__":
 import time

 def count(name):
 for i in range(10):
 print("{}: {}".format(name, i))
 time.sleep(1)
 return "finished"

 try:
 print timeout_call(5, count, ["thread1"])
 print timeout_call(5, count, ["thread2"])
 except Timeout as e:
 print e

装饰器, 使用thread.interrupt_main()结束线程. (仅可用于主线程)

import threading

def timeout_quit(fn_name):
 thread.interrupt_main()     # raises KeyboardInterrupt

def timeout(s):
 '''
 use as decorator to exit process if
 function takes longer than s seconds
 '''
 def outer(fn):
 def inner(*args, **kwargs):
 timer = threading.Timer(s, timeout_quit, args=[fn.__name__])
 timer.start()
 try:
 result = fn(*args, **kwargs)
 finally:
 timer.cancel()
 return result
 return inner
 return outer

if __name__ == "__main__":
 import time

 @timeout(5)
 def processNum(num):
 time.sleep(2)
 return num

 try:
 print processNum(1)
 except KeyboardInterrupt:
 print "timeout"

学习过程中的例子

threading.Timer + threading.join 多线程阻塞延时

使用 join 语句, 让主线程等待子线程完成后才继续执行
子线程函数可以带参 threading.Timer(interval, function, args=[], kwargs={})

import threading

def threading_main():
 print "main thread: start"
 thrd = threading.Timer(5.0, threading_sub, args = ["sub thread"])
 thrd.start()
 print "main thread: wait"
 thrd.join()     # add this line
 # thrd.join(timeout=2)  # just wait 2s then continue
 print "main thread: end"

def threading_sub(name):
 print name + ": hello"

if __name__ == "__main__":
 start = time.time()
 threading_main()
 end = time.time()
 print "run time: {}".format(end - start)

join(timeout=10) 多进程超时判断

multiprocessing的本质是进程, 但是提供了类似于threading的一系列方法.
使用 multiprocessing.terminate 语句, 让主线程可以杀死子线程
子进程函数可以带参 multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})
multiprocessing 没有 Timer() 方法的, 无法方便的延时执行.

注意, 这里没有办法使用 threading 类来实现. 因为没有 terminate() 方法,
而如果用signal方法来结束线程, 有两个限制. 1, windows不支持. 2, 子线程不支持

import multiprocessing
import logging

def processing_main():
 print "main process: start"
 prcs = multiprocessing.Process(
 target=processing_sub, args=["sub process"])
 prcs.start()
 print "main process: wait"
 prcs.join(timeout=10)

 # If thread is still active
 if prcs.is_alive():
 print "main process: kill"
 prcs.terminate()
 prcs.join()
 print "main process: end"

def processing_sub(name):
 for i in range(100):
 # if use print, can not show immediately in the console.
 logging.error("{}: {}".format(name, i))
 time.sleep(1)

if __name__ == "__main__":
 start = time.time()
 processing_main()
 end = time.time()
 print "run time: {}".format(end - start)

multiprocessing.pool 实现超时判断

说说python下的 thread 和 process.
thread, 提供了signal结束方式, 但是windows不支持, 仅主线程可用! 换句话说, 终止线程很繁琐
process, 提供了terminate结束方式, 但是参数传递限制条件很多, (必须可以是pickle的…)

下面的代码是有问题的!!!

import functools

def timeout(timeout, default=None, try_except=False):
 """Timeout decorator, parameter in seconds."""
 def timeout_decorator(item):
 """Wrap the original function."""
 @functools.wraps(item)
 def func_wrapper(*args, **kwargs):
 """Closure for function."""
 pool = multiprocessing.pool.ThreadPool(processes=1)
 # pool = multiprocessing.pool.Pool(processes=1) ## raise error about pickle problem!!!
 try:
 async_result = pool.apply_async(item, args, kwargs)
 val = async_result.get(timeout)
 except multiprocessing.TimeoutError:
 pool.terminate() ## not work here, because it is acutally thread, not process!!!
 val = default
 if try_except is True:
 raise multiprocessing.TimeoutError
 else:
 pool.close()
 pool.join()
 return val
 return func_wrapper
 return timeout_decorator

if __name__ == "__main__":
 import time

 @timeout(5)
 def count(name):
 for i in range(10):
 print("{}: {}".format(name, i))
 time.sleep(1)
 return "finished"

 start = time.time()
 print count("thread1")
 print count("thread2")  ## you can find problem here, thread1 is still running...
 end = time.time()
 print "run time: {}".format(end - start)

def timeout_call(timeout, func, args=(), kwargs=None, default=None, try_except=False):
 kwargs = {} if kwargs is None else kwargs
 pool = multiprocessing.Pool(processes=1)
 try:
 async_result = pool.apply_async(func, args, kwargs)
 val = async_result.get(timeout)
 except multiprocessing.TimeoutError:
 pool.terminate()
 val = default
 if try_except is True:
 raise multiprocessing.TimeoutError
 else:
 pool.close()
 pool.join()
 return val

################### example ##########
import logging
import time

def count(name):
 for i in range(10):
 logging.error("{}: {}".format(name, i))
 time.sleep(1)
 return "finished"

if __name__ == "__main__":
 ## if count function is here, will raise error!!!

 start = time.time()
 print timeout_call(5, count, ["process1"])
 print timeout_call(5, count, ["process2"])
 end = time.time()
 print "run time: {}".format(end - start)

第三方方案

  • timeoutcontext 1.1.1
    • 基于signal实现, 不支持windows系统, 不支持子线程
  • timeout-decorator 0.3.2
    • signal或Multithreading可选
    • 使用signal时, 不支持windows, 不支持子线程
    • 使用Multithreading时, 无法返回不能pickle的数据(因为需要通过pickle来跨进程交换数据)
  • stopit 1.1.1
    • threading或signal可选
    • 计时误差太大, 不可接受(翻倍的误差)

使用gevent协程

参考 gevent程序员指南之超时
参考 gevent 延时、定时、超时、io等待、动态添加任务

from gevent import Timeout

time_to_wait = 5 # seconds

class TooLong(Exception):
    pass

with Timeout(time_to_wait, TooLong):
    gevent.sleep(10)

参考资料

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容