任务超时退出
使请求超时就不再继续了,直接抛出超时错误,避免等太久。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import functools from concurrent import futures import time
executor = futures.ThreadPoolExecutor(1)
def timeout(seconds): def decorator(func): @functools.wraps(func) def wrapper(*args, **kw): future = executor.submit(func, *args, **kw) return future.result(timeout=seconds) return wrapper return decorator
@timeout(1) def task(a, b): time.sleep(1.2) return a+b
task(2, 3)
|
日志记录
记录部分函数的执行时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import time import functools def log(func): @functools.wraps(func) def wrapper(*args, **kwargs): start = time.perf_counter() res = func(*args, **kwargs) end = time.perf_counter() print(f'函数 {func.__name__} 耗时 {(end - start) * 1000} ms') return res return wrapper
@log def now(): print('2021-7-1')
now()
|
缓存
如果经常调用一个函数,而且参数经常会产生重复,如果把结果缓存起来,下次调用同样参数时就会节省处理时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import math import random import time from functools import lru_cache
@lru_cache() def task(x): time.sleep(0.01) return round(math.log(x**3 / 15), 4)
local_time = time.time() for i in range(500): task(random.randrange(5, 10)) print(time.time() - local_time)
|
约束某个函数的可执行次数
有时我们希望程序中的某个函数在整个程序的生命周期中只执行一次或N次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import functools
class allow_count: def __init__(self, count): self.count = count self.i = 0
def __call__(self, func): @functools.wraps(func) def wrapper(*args, **kw): if self.i >= self.count: return self.i += 1 return func(*args, **kw) return wrapper
@allow_count(3) def job(x): x += 1 return x
for i in range(5): print(job(i))
|