# 4. 🔑 使用框架的各种代码示例(学习funboost用法最重要的章节) 框架极其简单并且自由,只有一个boost装饰器的参数学习, 实际上这个章节所有的例子都是调整了一下boost的参数而已。 有一点要说明的是框架的消息中间件的ip 端口 密码 等配置是在你第一次随意运行代码时候,在你当前项目的根目录下生成的 funboost_config.py 按需设置。 所有例子的发布和消费都没必须写在同一个py文件,(除了使用python 自带语言queue),因为是使用中间件解耦的消息,好多人误以为发布和消费必须在同一个python文件。 ## 4.0 框架最重要的@boost装饰器的入参格式说明 ### 4.0.1 老的 @booost 直接传入多个参数方式 以下是老的入参方式: @boost(queue_test_f01', qps=0.2,broker_kind=BrokerEnum.REDIS_ACK_ABLE,) 40.0版本之前是老的入参方式,直接在@booost传很多个参数,40.0版本之后你仍然可以这么传参,但是不太推荐,因为不能代码补全函数入参了. ### 4.0.2 新的@ boost 只传入一个 pydantic Model BoostParams 类型或子类 的入参 新的@ boost 只传入一个 BoostParams 类型或子类 的入参 ,入参类型是 非常流行的 pydantic包的 model类型. @boost(BoosterParams(queue_name='queue_test_f01', qps=0.2,broker_kind=BrokerEnum.REDIS_ACK_ABLE,)) 因为采用pydantic,可以在框架开发时候,减少很多一大推重复入参声明,因为作者很注重代码补全, 作者不想直接 *args **kwargs暴露给用户,这种会导致用户不知道应该传参什么,pycharm也无法补全,所以需要大量重复的声明,每次加入参,都需要很多地方去修改, 所以改为使用流行的 pydantic 包来实现入参,fastapi的入参声明就是使用pydantic,非常棒. ### 4.0.3 pydantic model 的 BoosterParams 的入参pycharm下自动补全 ``` 因为BoosterParams这个pydantic model 类没有 __init__(self,一堆参数) ,而是把类变量,转化成实例变量, 所以直接对BoostParams传参是无法代码补全的,需要用户在pycharm的Plugins安装一个pydantic的插件,这样就能敲击入参自动补全入参名字了. pydatinc pycharm编程代码补全,请安装 pydantic插件, 在pycharm的 file -> settings -> Plugins -> 输入 pydantic 搜索, 点击安装 pydantic 插件. ```  ### 4.0.4 关于很多funboost 例子的@boost 使用直接入参,没有使用 pydantic Model类型的BoostParams 因为是兼容老的写法的,老的直接入参仍然可以正常运行,所以例子中没有修改成 @boost(BoosterParams(...)) 入参方式, 用户知道就行. ### 4.0.5 自定义子类继承 BoosterParams,使得每次少传参 ```python import logging import time from funboost import boost, BrokerEnum, BoosterParams class BoosterParamsMy(BoosterParams): # 传这个类就可以少每次都亲自指定使用rabbitmq作为消息队列,和重试改为4次,和消费发布日志写入自定义.log文件。 broker_kind : str = BrokerEnum.RABBITMQ max_retry_times : int =4 log_level :int = logging.DEBUG log_filename : str ='自定义.log' @boost(boost_params=BoosterParamsMy(queue_name='task_queue_name1d', qps=3,)) def task_fun(x, y): print(f'{x} + {y} = {x + y}') time.sleep(3) # 框架会自动并发绕开这个阻塞,无论函数内部随机耗时多久都能自动调节并发达到每秒运行 3 次 这个 task_fun 函数的目的。 @boost(boost_params=BoosterParamsMy(queue_name='task_queue_name1d', qps=10,)) def task_fun2(x, y): print(f'{x} - {y} = {x - y}') time.sleep(3) # 框架会自动并发绕开这个阻塞,无论函数内部随机耗时多久都能自动调节并发达到每秒运行 10 次 这个 task_fun 函数的目的。 if __name__ == "__main__": task_fun.consume() # 消费者启动循环调度并发消费任务 task_fun2.consume() for i in range(10): task_fun.push(i, y=i * 2) # 发布者发布任务 task_fun2.push(i,i*10) ``` ## 4.1 装饰器方式调度函数 ```python from funboost import boost, BrokerEnum,BoosterParams # qps可以指定每秒运行多少次,可以设置0.001到10000随意。 # broker_kind 指定使用什么中间件,如果用redis,就需要在 funboost_config.py 设置redis相关的配置。 @boost(BoosterParams(queue_name='queue_test_f01', qps=0.2, broker_kind=BrokerEnum.REDIS_ACK_ABLE,)) # qps 0.2表示每5秒运行一次函数,broker_kind=2表示使用redis作中间件。 def add(a, b): print(a + b) if __name__ == '__main__': for i in range(10, 20): add.publish(dict(a=i, b=i * 2)) # 使用add.publish 发布任务 add.push(i, b=i * 2) # 使用add.push 发布任务 add.consume() # 使用add.consume 消费任务 # add.multi_process_consume(4) # 这是开启4进程 叠加 细粒度(协程/线程)并发,速度更强。 ``` ## 4.2c 在函数内部无限次按照队列名动态生成booster(消费者、生产者) (非装饰器方式) BoostersManager.build_booster 方法。 用法: booster = BoostersManager.build_booster(BoosterParams(queue_name=queue_name, qps=0.2, consuming_function=add)) 1)如果是在函数内部按照不同的queue_name无限次动态生成booster,不能按照以下写代码 ```python from funboost import boost,Booster,ConcurrentModeEnum,BoosterParams def add(a, b): print(a + b) def my_push(quue_name,a,b): booster = boost(BoosterParams(queue_name=quue_name, qps=0.2,concurrent_mode= ConcurrentModeEnum.THREADING))(add) # type: Booster # 上面这行代码太惨了,在push函数里面无数次创建生产者、消费者和消息队列连接,造成cpu 内存和消息队列服务端压力巨大。 booster.push(a,b) for i in range(1000000): queue_namx = f'queue_{i%10}' my_push(queue_namx,i,i*2) ``` ``` 看到有的人这样写代码,这样太惨了,会使python内存和cpu高,会对消息队列服务器产生巨大压力。调用100万次push函数,生成100万次消费者 生产者,对消息队列中间件创建100万次连接。 这样写代码太惨了,会发生悲剧。 4.2b的代码例子是在全局变量里面只生成了一次booster,性能没问题。而上面这个代码是在push函数里面实例化100万次 Consumer和Publisher,太悲催了。 如果你需要动态按照队列名生成生产者消费者,根据入参发布到不同的队列名中,可以自己写个字典判断队列名对应的booster有没有创建过,也可以使用框架提供的 build_booster 函数。 ``` 2)如果是在函数内部无限次动态生成booster,应该使用 BoostersManager.build_booster ``` build_booster 是创建或者直接从全局变量字典中获取booster对象。 如果当前进程没有这个queue_name对应的booster对象就创建,有则直接使用已创建的booster对象。 下面假设动态生成10个队列名的booster对象,发布100万次消息不需要对消息队列中间件创建100万次连接。 ``` ```python from funboost import Booster, BoostersManager,BoosterParams def add(a, b): print(a + b) def my_push(queue_name, a, b): booster = BoostersManager.build_booster(BoosterParams(queue_name=queue_name, qps=0.2, consuming_function=add)) # type: Booster # build_booster 这种就不会无数次去创建 消息队列连接了。有则直接使用,没有则创建。 booster.push(a, b) if __name__ == '__main__': for i in range(1000000): queue_namex = f'queue_{i % 10}' # 动态的发布消息到 queue_0 queue_1 queue_2 queue_3 .... queue_9 队列中。 my_push(queue_namex, i, i * 2) for j in range(10): # 启动 queue_0 queue_1 queue_2 queue_3 .... queue_9 队列的消费者进行消费。 booster = BoostersManager.build_booster(BoosterParams(queue_name=f'queue_{j}', qps=0.2, consuming_function=add)) # type: Booster booster.consume() ``` ## 4.2d 框架的 BoostersManager boosters管理介绍 虽然funboost没有显式的需要你实例化一个app对象,但背后有BoostersManager来登记了所有booster 例如用户可以通过 BoostersManager 来知道你声明了哪些队列名. ``` 所有@boost的或者 BoostersManager.build_booster 创建的booster都会登记到 BoostersManager.pid_queue_name__booster_map这里来 用户可以看到声明了哪些队列名 BoosterDiscovery(project_root_path: typing.Union[PathLike, str], booster_dirs: typing.List[typing.Union[PathLike, str]], max_depth=1, py_file_re_str: str = None).auto_discovery() 可以扫描python文件夹自动导入模块,找到@boost函数 BoostersManager.get_or_create_booster_by_queue_name 可以根据队列名创建或者获得booster ``` ### 4.2d.2 使用 BoostersManager 一次性启动所有队列消费, (无需亲自 fun1.consume() fun2.consume() fun100.consume()) 假设: 代码文件夹结构如下:  具体完整代码可见: [https://github.com/ydf0509/funboost/tree/master/test_frame/test_boosters_manager](https://github.com/ydf0509/funboost/tree/master/test_frame/test_boosters_manager) mod1.py和mod2.py 文件一共有3个消费函数,如果用户不想亲自使用如下方式按需一个个函数的亲自启动消费,而是想粗暴的启动所有消费函数.那么可以使用 BoostersManager的 consume_all 或者 BoostersManager.mp_consume_all(3) 这样启动. ```python mod1.fun1.consume() mod2.fun2a.consume() mod2.fun2b.consume() ``` ```python from pathlib import Path import queue_names from funboost import BoostersManager, BoosterDiscovery # import mod1, mod2 # 这个是必须导入的,可以不用,但必须导入,这样BoostersManager才能知道相关模块中的@boost装饰器,或者用下面的 BoosterDiscovery.auto_discovery()来自动导入m1和m2模块. if __name__ == '__main__': """ 有的人不想这样写代码,一个个的函数亲自 .consume() 来启动消费,可以使用BoostersManager相关的方法来启动某些队列或者启动所有队列. mod1.fun1.consume() mod2.fun2a.consume() mod2.fun2b.consume() """ BoosterDiscovery(project_root_path=Path(__file__).parent.parent.parent, booster_dirs=[Path(__file__).parent]).auto_discovery() # 这个放在main里面运行,防止无限懵逼死循环 # 选择启动哪些队列名消费 # BoostersManager.consume(queue_names.q_test_queue_manager1,queue_names.q_test_queue_manager2a) # 选择启动哪些队列名消费,每个队列设置不同的消费进程数量 # BoostersManager.mp_consume(**{queue_names.q_test_queue_manager1: 2, queue_names.q_test_queue_manager2a: 3}) # 启动所有队列名消费,在同一个进程内消费 BoostersManager.consume_all() # 启动所有队列名消费,每个队列启动单独的n个进程消费 # BoostersManager.mp_consume_all(2) ``` ### 4.2d.3 使用 BoostersManager ,通过 consume_group 启动一组消费函数 `BoostersManager.consume_group(booster_group)` 通过 `booster_group`消费分组, 启动消费 例如一组函数装饰器都写 `BoosterParams(booster_group='group1')` ,那么 `BoostersManager.consume_group('group1')` 会启动这组函数消费。 主要是取代用户手动写 `f1.consume()` `f2.consume()` 这样需要多次亲自手写逐个启动相关消费函数; 也避免了 `BoostersManager.consume_all()` 太过于粗暴,会启动不相关的多余消费函数. 也避免了 `BoostersManager.consume('q1', 'q2', ...)` 亲自写queue_name列表来启动消费函数. booster_group 代码演示: ```python """ 演示使用 BoostersManager.consume_group($booster_group) 启动消费组 BoostersManager.consume_group(booster_group=GROUP1_NAME) 相当于是内部执行了 f1.consume() f2.consume() 这种分多次启动消费函数, 因为f1和f2的booster_group都是GROUP1_NAME,所以会被启动消费组, f3的booster_group是None,所以不会被启动消费 """ import time from funboost import boost, BoosterParams, BoostersManager, ConcurrentModeEnum from funboost.utils.ctrl_c_end import ctrl_c_recv GROUP1_NAME = "my_group1" # 自定义的参数类,继承BoosterParams,用于减少每个消费函数装饰器的重复相同入参个数, # 不用相关函数都重复写 booster_group 入参. class MyGroup1BoosterParams(BoosterParams): concurrent_mode: str = ConcurrentModeEnum.SINGLE_THREAD booster_group: str = GROUP1_NAME # 指定消费分组名字 @boost( MyGroup1BoosterParams( # 使用了自定义类 MyGroup1BoosterParams ,所以 f1的booster_group 会自动指定为 GROUP1_NAME queue_name="queue_test_consume_gq1", ) ) def f1(x): time.sleep(2) print(f"f1 {x}") @boost( MyGroup1BoosterParams( # 使用了自定义类 MyGroup1BoosterParams ,所以 f2的booster_group 会自动指定为 GROUP1_NAME queue_name="queue_test_consume_gq2", ) ) def f2(x): time.sleep(2) print(f"f2 {x}") @boost( BoosterParams( # 没有使用自定义类 MyGroup1BoosterParams 而是直接传入 BoosterParams,所以 f3 的booster_group 是 None queue_name="queue_test_consume_gq3", concurrent_mode=ConcurrentModeEnum.SINGLE_THREAD, ) ) def f3(x): time.sleep(2) print(f"f3 {x}") if __name__ == "__main__": for i in range(10): f1.push(i) f2.push(i) f3.push(i) # f1.consume() # 分多次启动消费函数,如果嫌麻烦觉得需要一个一个启动有关函数,可以 BoostersManager.consume_group 一次启动一个分组的所有函数的消费 # f2.consume() BoostersManager.consume_group( GROUP1_NAME ) # 当前进程内启动消费组 GROUP1_NAME , 内部相当于是执行了 f1.consume() f2.consume() # BoostersManager.multi_process_consume_group(GROUP1_NAME,2) # 多进程启动消费组 ctrl_c_recv() ``` **重点说明**: `BoostersManager.consume_group($booster_group)`启动消费一组函数,如果@boost的函数是在多个不同的文件模块中,需要先人工import一下这些模块,或者用 BoosterDiscovery().auto_discovery() 来自动导入这些模块,不然无法自动知道 $booster_group 组 有哪些booster/消费函数。 ### 4.2d.4 使用 BoostersManager ,通过queue_name 得到 booster对象 BoostersManager.get_booster(queue_name) 通过queue_name 获取 booster(被@boost装饰的函数) ## 4.2e funboost 支持实例方法、类方法、静态方法、普通函数 4种类型,作为消费函数的例子 funboost 在 2024年6月新增支持了实例方法、类方法作为消费函数 ,见文档4.32章节 ## 4.3a 演示如何解决多个步骤的消费函数 看这个例子,step1函数中不仅可以给step2发布任务,也可以给step1自身发布任务。 qps规定了step1每2秒执行一次,step2每秒执行3次。 ```python import time from funboost import boost, BrokerEnum,BoosterParams @boost(BoosterParams(queue_name='queue_test_step1', qps=0.5, broker_kind=BrokerEnum.LOCAL_PYTHON_QUEUE)) def step1(x): print(f'x 的值是 {x}') if x == 0: for i in range(1, 300): step1.pub(dict(x=x + i)) for j in range(10): step2.push(x * 100 + j) # push是直接发送多个参数,pub是发布一个字典 time.sleep(10) @boost(BoosterParams(queue_name='queue_test_step2', qps=3, broker_kind=BrokerEnum.LOCAL_PYTHON_QUEUE)) def step2(y): print(f'y 的值是 {y}') time.sleep(10) if __name__ == '__main__': # step1.clear() step1.push(0) # 给step1的队列推送任务。 step1.consume() # 可以连续启动两个消费者,因为conusme是启动独立线程里面while 1调度的,不会阻塞主线程,所以可以连续运行多个启动消费。 step2.consume() ``` ## 4.3.b 演示多个函数消费者使用同一个线程池 ```python from funboost import boost,BoosterParams from funboost.concurrent_pool.flexible_thread_pool import FlexibleThreadPool """ 这个是演示多个不同的函数消费者,使用同一个全局的并发池。 如果一次性启动的函数过多,使用这种方式避免每个消费者创建各自的并发池,减少线程/协程资源浪费。 """ # 总共那个有5种并发池,用户随便选。 pool = FlexibleThreadPool(300) # 指定多个消费者使用同一个线程池, # @boost('test_f1_queue', specify_concurrent_pool=pool, qps=3) # 旧写法,直接在@boost传各种参数 @boost(BoosterParams(queue_name='test_f1_queue', specify_concurrent_pool=pool, qps=3)) # 新写法在BoosterParams传各种参数 def f1(x): print(f'x : {x}') @boost(BoosterParams(queue_name='test_f2_queue', specify_concurrent_pool=pool, qps=2)) def f2(y): print(f'y : {y}') @boost(BoosterParams(queue_name='test_f3_queue', specify_concurrent_pool=pool)) def f3(m, n): print(f'm : {m} , n : {n}') if __name__ == '__main__': for i in range(1000): f1.push(i) f2.push(i) f3.push(i, 1 * 2) f1.consume() f2.consume() f3.consume() ``` ## 4.3c 演示清空消息队列和获取消息队列中的消息数量 ``` f2.clear() 清空消息队列 f2.get_message_count() 获取消息队中的消息数量, 不能使用 f2.get_message_count() =0 来判断消息队列没任务了以为该函数的所有消息被消费完成了,本地内存队列存储了 一部分消息和正在执行的也有一部分消息,如果要判断消费完成了,应该使用4.17章节的 判断函数运行完所有任务,再执行后续操作。 ``` ```python @boost(BoosterParams(queue_name='test_queue77g', log_level=10, broker_kind=BrokerEnum.REDIS_ACK_ABLE, qps=5, create_logger_file=False, is_show_message_get_from_broker=True, concurrent_mode=ConcurrentModeEnum.SINGLE_THREAD # specify_concurrent_pool= pool2, # concurrent_mode=ConcurrentModeEnum.SINGLE_THREAD, concurrent_num=3,is_send_consumer_heartbeat_to_redis=True,function_timeout=10, # function_result_status_persistance_conf=FunctionResultStatusPersistanceConfig(True,True) )) def f2(a, b): time.sleep(10) print(a, b) return a - b if __name__ == '__main__': f2.clear() for i in range(8): f2.push(i, i * 5) print(f2.get_message_count()) f2.clear() for i in range(20): f2.push(i, i * 2) print(f2.get_message_count()) ``` ## 4.4 演示funboost定时运行例子 ApsJobAdder(消费函数, job_store_kind='redis').add_push_job(....) 来添加定时任务. ApsJobAdder实例化时候,会默认自动启动定时器,用户可以设置实例化时候是否 顺便 is_auto_paused 暂停执行 定时任务.
警告!!! ApsJobAdder(消费函数, job_store_kind='redis').add_push_job(....) 实际上是做了2件事情, 分别是 启动定时器 aps_obj.start() 和 添加定时任务 aps_obj.add_job(). 不要以为只是添加定时任务 所以如果是 添加定时任务和启动消费是分开部署的, 一定记得要在消费脚本中加上启动定时器 启动消费中加上 ApsJobAdder(消费函数, job_store_kind='redis') 这样实例化就顺便启动了定时器. 如果你不启动定时器,那么即使你之前已经加到redis job_store的定时任务,也没有定时器来触发.定时运行消费演示,定时方式入参用法可以百度 apscheduler 定时包。 定时的语法和入参与本框架无关系,不是本框架发明的定时语法,具体的需要刻苦学习 最知名的 apscheduler 定时包 ,所有对定时使用或报错感到疑惑的都是因为用户不愿意刻苦学习 apscheduler 官方文档造成的, 和funboost框架毫无关系。 要想玩好定时请务必苦学 apscheduler 3.x 官方文档: [https://apscheduler.readthedocs.io/en/3.x/](https://apscheduler.readthedocs.io/en/3.x/) ### 4.4.0 funboost定时任务最基本原理说明 **funboost中的定时任务原理是:自动定时发布消息到消息队列,而非直接执行函数本身** ``` 定时执行funboost发送函数入参到消息队列,然后funboost框架持续消费消息队列中的任务,从而达到执行消费函数的目的。 而不是在当前程序定时执行消费函数本身。 例如funboost中 add_push_job 添加一个每隔3秒运行fun消费函数的, 本质是每隔3秒自动运行 fun.push() , 而不是每隔3秒运行 fun() 本身, 理解这点至关重要。 如果你理解了这个原理,那么funboost的定时任务就非常简单,你可以自己使用apscheudler原生包来添加定时任务, 而不是非得使用funboost框架的ApsJobAdder的add_push_job来添加定时任务。 如果你直接使用官方的 apscheduler对象, 假设 fun是@boost装饰的消费函数, apscheduler对象.add_job(fun,args=(1,2)) 你这是错误写法,除非你期望就是在当前程序执行add_job函数本身, 你应该写的是,要多加一个发送函数 def fun_push_msg(x,y): fun.push(x,y) apscheduler对象.add_job(fun_push_msg,args=(1,2)) 所以如果你用 apscheduler原生,那就要自己写一个发布消息的函数,add_job调用你定义的发布消息的函数。 有人认为为什么不能写成 apscheduler对象.add_job(fun.push,args=(1,2)) ,那就可以少写一个fun_push_msg函数, 你太年轻了,没实践采坑过就不知道。 要说明的是 fun.push 他是一个实例方法,他不是一个函数,方法是和对象绑定的,对象上面是有属性的不一定可序列化,所以apscheduler.add_job 是函数定时,没给你说是实例方法能定时啊。 所以apscheduler对象.add_job第一个入参必须是函数或者静态方法,不能是实例方法啊。 所以apscheduler对象.add_job(fun.push,args=(1,2))这种写法当然不行了。 而使用 funboost封装的 ApsJobAdder().add_push_job 就是为了帮你自动节约少写一个 fun_push_msg 这种函数。 ``` ### 4.4.1 funboost定时任务代码演示: ```python """ 2025年后定时任务现在推荐使用 ApsJobAdder 写法 ,用户不需要亲自选择使用 apscheduler对象来添加定时任务。 使用对apscheduler封装的ApsJobAdder比 直接使用 apscheduler 包的好处如下: 1、ApsJobAdder.add_push_job 来添加定时发布到消息队列的任务, 可以让用户少写一个 push_xx_fun_to_broker 的函数,用户不需要 apscheduler.add_job(push_xx_fun_to_broker,args=(1,)) , 而是 ApsJobAdder.add_push_job(xx_fun,args=(1,)) 2.ApsJobAdder在redis作为job_store时候,每个消费函数使用单独的 jobs_key ,每个消费函数使用独立的 apscheduler对象, 避免扫描定时任务互相干扰。 例如你只想启动fun1的定时任务,而不像启动fun2的定时任务,更能单独控制。 3. ApsJobAdder在redis作为job_store时候 ,_process_jobs 使用了 redis分布式锁, 解决经典头疼的 apschduler实例建议 只在一个进程启动一次, 现在可以在多机器多进程随意反复启动多次 apscheduler对象,不会造成定时任务执行重复。 """ from funboost import boost, BrokerEnum,ctrl_c_recv,BoosterParams,ApsJobAdder # 定义任务处理函数 @boost(BoosterParams(queue_name='sum_queue5', broker_kind=BrokerEnum.REDIS)) def sum_two_numbers(x, y): result = x + y print(f'The sum of {x} and {y} is {result}') @boost(BoosterParams(queue_name='data_queue5', broker_kind=BrokerEnum.REDIS)) def show_msg(data): print(f'data: {data}') if __name__ == '__main__': # 启动消费者 sum_two_numbers.consume() show_msg.consume() # 发布任务 sum_two_numbers.push(3, 5) sum_two_numbers.push(10, 20) show_msg.push('hello world') # 使用ApsJobAdder添加定时任务, 里面的定时语法,和apscheduler是一样的,用户需要自己熟悉知名框架apscheduler的add_job定时入参 # ApsJobAdder 类可以多次重复实例化,内部对每一个消费函数使用一个单独的apscheduler对象,避免扫描与当前关心的消费函数不相干的redis jobstore中的定时任务 # 方式1:指定日期执行一次, # ApsJobAdder(sum_two_numbers, job_store_kind='redis').aps_obj.start(paused=False) ApsJobAdder(sum_two_numbers, job_store_kind='redis').add_push_job( trigger='date', run_date='2025-01-17 23:25:40', args=(7, 8), replace_existing=True, # 如果写个id,就不能重复添加相同id的定时任务了,要使用replace_existing来替换之前的定时任务id id='date_job1' ) # 方式2:固定间隔执行,使用内存作为apscheduler的 job_store ApsJobAdder(sum_two_numbers, job_store_kind='redis').add_push_job( trigger='interval', seconds=5, args=(4, 6), id='interval_job1', replace_existing=True ) # 方式3:使用cron表达式定时执行 ApsJobAdder(sum_two_numbers, job_store_kind='redis').add_push_job( trigger='cron', day_of_week='*', hour=23, minute=49, second=50, kwargs={"x":50,"y":60}, replace_existing=True, id='cron_job1') # 延时使用内存作为apscheduler的 job_store ,因为是内存,这种定时任务计划就不能持久化。 ApsJobAdder(show_msg, job_store_kind='memory').add_push_job( trigger='interval', seconds=20, args=('hi python',) ) ctrl_c_recv() # 这个是阻止代码主线程结束,这在background类型的apscheduler很重要,否则会报错提示主线程已退出。 当然,你也可以在末尾加 time.sleep 来阻止主线结束。 ``` funboost 定时语法说明: ``` funboost中定时任务用法是: ApsJobAdder(消费函数,).add_push_job(trigger='interval',.....) 原生 apscheduler 添加定时任务用法例子是: scheduler.add_job(my_job, 'interval', seconds=3, id='my_interval_job') 相比较而言,funboost中定时推荐你使用 ApsJobAdder(消费函数,).add_push_job , 注意是 add_push_job 而非 add_job , 并且add_push_job去掉了第一个入参func , add_push_job 其他入参和 apscheduler的add_job一模一样。 用户有兴趣可以看 ApsJobAdder 源码,他只是基于 apscheduler 的一个非常简单易懂的包装而已。 ```
python3.9及以上 定时任务报错 RuntimeError: cannot schedule new futures after interpreter shutdown 我们是需要使得主线程其他任务不结束, 看10.2章节文档,在你脚本的最后一行加个 while 1: time.sleep(100) , 阻止主线程退出就好了。 或者结尾加个 ctrl_c_recv() (先from funboost import ctrl_c_recv)### 4.4.2 ApsJobAdder对象.aps_obj 核心对象说明 ApsJobAdder对象.aps_obj 是官方 apscheduler.BackgroundScheduler类型对象 你可以亲自使用 ApsJobAdder(sum_two_numbers, job_store_kind='redis',is_auto_start=False,).aps_obj 来精细化操作这个对象。 **例如删除一个定时任务:** ApsJobAdder(sum_two_numbers, job_store_kind='redis',is_auto_start=False,).aps_obj.remove_job('你指定的job_id') **删除所有定时任务:** ApsJobAdder(sum_two_numbers, job_store_kind='redis',is_auto_start=False,).aps_obj.remove_all_jobs() **例如查看所有定时计划** ApsJobAdder(sum_two_numbers, job_store_kind='redis',is_auto_start=False,).aps_obj.get_jobs() **aps_obj的其他方法我不再啰嗦了,都是知名三方包apscheduler的Scheduler类型的方法,本质原因是你不愿意苦学apscheduler,这些用法功能和funboost自身源码毫无关系。** funboost的对apscheduler包的轻度二次包装,是为了简化添加 “push到消息队列” 的定时任务,用户完全可以直接使用原生 apscheduler。 但你要把”发布消息到消息队列“作为定时任务,而不是把“执行函数本身逻辑”作为定时任务。 ### 4.4.3 演示在python web中定时任务的添加 (添加和执行定时任务分在不同的py脚本中) web中去添加和修改定时任务,web单独部署一次。 funboost后台异步任务单独部署一次,建议定时器需要随着消费一起启动。 **在web接口代码中添加定时任务计划,但可以不执行定时任务,设置is_auto_paused=True** **在启动消费的脚本中,要明确启动定时器,ApsJobAdder(fun_sum,job_store_kind='redis',) ,ApsJobAdder类默认就是 is_auto_start=True is_auto_paused=False** web中添加定时任务demo连接: [https://github.com/ydf0509/funboost/tree/master/test_frame/funboost_aps_web_demo](https://github.com/ydf0509/funboost/tree/master/test_frame/funboost_aps_web_demo) ``` 演示在python web中定时任务的添加,添加定时任务的脚本和启动消费的脚本不在同一个py文件中,一定要注意务必要启动定时任务apschduler对象,这是最关键的。 web中需要启动定时器,is_auto_start=True,但你可以选择暂停执行定时任务is_auto_paused=True,在启动消费的那里去写启动和执行定时任务. ApsJobAdder(消费函数, job_store_kind='redis', is_auto_start=True,is_auto_paused=True) (ps:当然可以多个地方都 is_auto_start=True,is_auto_paused=True 这样启动定时器, 因为funboost已经继承优化了原生apscheduler类,不怕你多次重复部署apscheduler定时器造成重复执行相同的定时任务 有兴趣的用户可以看 https://github.com/ydf0509/funboost/blob/master/funboost/timing_job/apscheduler_use_redis_store.py 的 FunboostBackgroundSchedulerProcessJobsWithinRedisLock 的 _process_jobs 方法, 这个是防止apscheduler多次部署导致重复执行定时任务的根本核心解决. ) ``` ###### 4.4.3.2 web_app.py 是web应用,负责添加定时任务到redis中。此处使用flask框架演示, django fastapi同理,不需要我一一举例子。 ``` 因为funboost是自由无拘无束的,不需要 django-funboost flask-funboost fastapi-funboost 插件。 只有坑爹难用的celery才需要django-celery flask-celery fastapi-celery 三方插件来帮助用户简化适配各种web框架使用, funboost压根不需要这种适配各种web框架的插件。 ``` ###### 4.4.3.3 run_consume.py 是启动消费 和 启动apschduler定时器的脚本 ```python ApsJobAdder(fun_sum,job_store_kind='redis',) #负责启动apschduler对象,apschduler对象会扫描redis中的定时任务,并执行定时任务,定时任务的功能就是定时push消息到消息队列中。 fun_sum.consume() # 启动消费消息队列中的消息 ```
警告!!!你不要只启动fun_sum.consume() 而不启动apschduler对象, 否则apschduler对象不会扫描redis中已添加好的定时任务,就不会自动定时的push消息到消息队列中。## 4.4.4 新增的 支持 aps_obj.add_job 添加定时任务(可以不用 add_push_job) (2025-08) **第一性原理:** 由于 Booster 对象在2025-08,新增支持了pickle 序列化和反序列化,所以可以支持 ```aps_obj.add_job($booster对象.push,...)``` 这种写法. 具体看下面代码中的文字注释: ```python """ 此脚本是演示,由于2025-08 Booster 对象支持了 pickle序列化后, 可以支持 aps_obj_sum_two_numbers2.add_job( sum_two_numbers2.push,...) 这种写法. 这样用户可以了解定时任的本质是 push到消息队列,而不是直接执行函数自身. 用户看这个脚本主要是需要对比 add_push_job 和 add_job 的区别. """ from funboost import boost, BrokerEnum,ctrl_c_recv,BoosterParams,ApsJobAdder # 定义任务处理函数 @boost(BoosterParams(queue_name='sum_queue552', broker_kind=BrokerEnum.REDIS)) def sum_two_numbers2(x, y): result = x + y print(f'The sum of {x} and {y} is {result}') if __name__ == '__main__': # ApsJobAdder(sum_two_numbers2, job_store_kind='redis').add_push_job( # trigger='interval', # seconds=5, # args=(4, 6), # replace_existing=True, # id='interval_job501', # ) aps_job_adder_sum_two_numbers2 = ApsJobAdder(sum_two_numbers2, job_store_kind='redis',is_auto_paused=False) aps_obj_sum_two_numbers2 =aps_job_adder_sum_two_numbers2.aps_obj aps_obj_sum_two_numbers2.remove_all_jobs() # 可选,删除sum_two_numbers2所有已添加的定时任务 # 原来推荐的添加定时任务方式 add_push_job aps_job_adder_sum_two_numbers2.add_push_job( # ApsJobAdder.add_push_job 不需要传递第一个入参func,job函数 trigger='interval', seconds=5, args=(4, 6), replace_existing=True, id='interval_job503', ) """ 2025-08后 现在可以直接使用用户熟悉的 add_job ,第一个入参func传递 $消费函数.push 当使用redis 这种数据库而非memory作为 apscheduler 的 jobstore 时候,apscheduler.add_job 需要pickle序列化 第一个入参 func, sum_two_numbers2.push 是一个实例方法, Booster对象属性链路上有 threading.Lock 和socket 这些类型, 导致不可pickle序列化,所以原来需要使用 ApsJobAdder.add_push_job 曲线救国. 由于现在新增添加了 booster 支持pickle 序列化和反序列化,所以可以支持 sum_two_numbers2.push 实例方法 作为job函数. (ps:有兴趣的可以看 funboost/core/booster.py 的 Booster 的 __getstate__ 和 __setstate__ 的实现方式,是怎么支持pickle的,很巧妙) """ aps_obj_sum_two_numbers2.add_job( func = sum_two_numbers2.push, # aps_obj.add_job 是 原生的,需要传递第一个入参func ,sum_two_numbers2.push trigger='interval', seconds=5, args=(40, 60), replace_existing=True, id='interval_job504', ) ctrl_c_recv() ``` ## 4.5 多进程并发 + 多线程/协程,代码例子。 ff.multi_process_start(2) 就是代表启动2个独立进程并发 + 叠加 asyncio、gevent、eventlet、threding 、single_thread 细粒度并发,
完整的除了asyncio并发,包括 aio_push 和 asyncio 来等待获取结果,请看4b.3 章节
p> [ funboost + 全asyncio 编程生态演示](https://funboost.readthedocs.io/zh-cn/latest/articles/c4b.html#b-3-funboost-asyncio) ``` 因为 async_result= fun.push() ,默认返回的是 AsyncResult 类型对象,里面的方法都是同步语法。 async_result.result 是一个耗时的函数, 解释一下result, 是property装饰的所以不用 async_result.result() 有的人直接在async def 的异步函数里面 print (async_result.result),如果消费函数消费需要耗时5秒, 那么意味rpc获取结果至少需要5秒才能返回,你这样写代码会发生灭顶之灾,asyncio生态流程里面一旦异步需要处处异步。 所以新增了 AioAsyncResult 类,和用户本来的asyncio编程生态更好的搭配。 ``` 服务端求和脚本还是4.6.1 两数求和不变,这里演示asyncio生态下的获取rpc结果脚本 ```python import asyncio from funboost import AioAsyncResult from test_frame.test_rpc.test_consume import add async def process_result(status_and_result: dict): """ :param status_and_result: 一个字典包括了函数入参、函数结果、函数是否运行成功、函数运行异常类型 """ await asyncio.sleep(1) print(status_and_result) async def test_get_result(i): async_result = add.push(i, i * 2) aio_async_result = AioAsyncResult(task_id=async_result.task_id) # 这里要使用asyncio语法的类,更方便的配合asyncio异步编程生态 print(await aio_async_result.result) # 注意这里有个await,如果不await就是打印一个协程对象,不会得到结果。这是asyncio的基本语法,需要用户精通asyncio。 print(await aio_async_result.status_and_result) await aio_async_result.set_callback(process_result) # 你也可以编排任务到loop中 if __name__ == '__main__': loop = asyncio.get_event_loop() for j in range(100): loop.create_task(test_get_result(j)) loop.run_forever() ``` ```text async_result = add.push(i, i * 2) async_result 的类型是AsyncResult,是同步场景下的类。这个Async不是指的asyncio语法异步,是生产者消费者模式整体大的概念上的异步,不是指的python asyncio语法异步。 aio_async_result = AioAsyncResult(task_id=async_result.task_id) ,这个是asyncio语法类AioAsyncResult,这个类里面的耗时io的方法全都是async def的, 这种更好的配合用户当前已经是 asyncio 编程生态。因为在asyncio编程生态中,在一个loop里面,要全部异步,只要一个是同步阻塞的方法,整个loop中其他协程任务完了个蛋, 也就是常说的一旦异步要处处异步,不要在一串流程中一会儿调用asyncio的耗时函数,一会调用同步耗时函数,这样是个悲剧,懂的都懂这句话。 ``` ### 4.6.7 从mongo中获取函数执行结果 首先 boost装饰器中设置函数状态结果持久化后,会保存函数的状态和结果到mongodb中。 function_result_status_persistance_conf=FunctionResultStatusPersistanceConfig(is_save_status=True,is_save_result=True,expire_seconds=500000) 使用 ResultFromMongo 类获取函数结果 ```python # 以非等待方式获取mongo中函数的结果。 import time from funboost import ResultFromMongo async_result = add.push(10, 20) task_id = async_result.task_id time.sleep(2) print(ResultFromMongo(task_id).get_status_and_result()) print(ResultFromMongo('test_queue77h6_result:764a1ba2-14eb-49e2-9209-ac83fc5db1e8').get_status_and_result()) print(ResultFromMongo('test_queue77h6_result:5cdb4386-44cc-452f-97f4-9e5d2882a7c1').get_result()) ``` ## 4.7 演示qps控频 演示框架的qps控频功能 此框架对函数耗时随机波动很大的控频精确度达到96%以上 此框架对耗时恒定的函数控频精确度达到99.9% 在指定rate_limit 超过 20/s 时候,celery对耗时恒定的函数控频精确度60%左右,下面的代码会演示这两个框架的控频精准度对比。此框架针对不同指定不同qps频次大小的时候做了不同的三种处理方式。 框架的控频是直接基于代码计数,而非使用redis 的incr计数,因为python操作一次redis指令要消耗800行代码左右, 如果所有任务都高频率incr很消耗python脚本的cpu也对redis服务端产生灾难压力。 例如假设有50个不同的函数,分别都要做好几千qps的控频,如果采用incr计数,光是incr指令每秒就要操作10万次redis, 所以如果用redis的incr计数控频就是个灾难,redis incr的计数只适合 1到10大小的qps,不适合 0.01 qps 和 1000 qps这样的任务。 同时此框架也能很方便的达到 5万 qps的目的,装饰器设置qps=50000 和 is_using_distributed_frequency_control=True, 然后只需要部署很多个进程 + 多台机器,框架通过redis统计活跃消费者数量,来自动调节每台机器的qps,框架的分布式控频开销非常十分低, 因为分布式控频使用的仍然不是redis的incr计数,而是基于每个消费者的心跳来统计活跃消费者数量,然后给每个进程分配qps的,依然基于本地代码计数。 例如部署100个进程(如果机器是128核的,一台机器足以,或者20台8核机器也可以) 以20台8核机器为例子,如果把机器减少到15台或增加机器到30台,随便减少部署的机器数量或者随便增加机器的数量, 代码都不需要做任何改动和重新部署,框架能够自动调节来保持持续5万次每秒来执行函数,不用担心部署多了30台机器,实际运行qps就变成了10几万。 (前提是不要把机器减少到10台以下,因为这里假设这个函数是一个稍微耗cpu耗时的函数,要保证所有资源硬件加起来有实力支撑5万次每秒执行函数) 每台机器都运行 test_fun.multi_process_conusme(8),只要10台以上1000台以下随意随时随地增大减小运行机器数量, 代码不需要做任何修改变化,就能很方便的达到每秒运行5万次函数的目的。### 4.7.1 演示qps控频和自适应扩大和减小并发数量。 ``` 通过不同的时间观察控制台,可以发现无论f2这个函数需要耗时多久(无论是函数耗时需要远小于1秒还是远大于1秒),框架都能精确控制每秒刚好运行2次。 当函数耗时很小的时候,只需要很少的线程就能自动控制函数每秒运行2次。 当函数突然需要耗时很大的时候,智能线程池会自动启动更多的线程来达到每秒运行2次的目的。 当函数耗时从需要耗时很大变成只需要耗时很小的时候,智能线程池会自动缩小线程数量。 总之是围绕qps恒定,会自动变幻线程数量,做到既不多开浪费cpu切换,也不少开造成执行速度慢。 ``` ```python import time import threading from funboost import boost, BrokerEnum, ConcurrentModeEnum, BoosterParams t_start = time.time() @boost(BoosterParams(queue_name='queue_test2_qps', qps=2, broker_kind=BrokerEnum.PERSISTQUEUE, concurrent_mode=ConcurrentModeEnum.THREADING, concurrent_num=600)) def f2(a, b): """ 这个例子是测试函数耗时是动态变化的,这样就不可能通过提前设置参数预估函数固定耗时和搞鬼了。看看能不能实现qps稳定和线程池自动扩大自动缩小 要说明的是打印的线程数量也包含了框架启动时候几个其他的线程,所以数量不是刚好和所需的线程计算一样的。 """ result = a + b sleep_time = 0.01 if time.time() - t_start > 60: # 先测试函数耗时慢慢变大了,框架能不能按需自动增大线程数量 sleep_time = 7 if time.time() - t_start > 120: sleep_time = 30 if time.time() - t_start > 240: # 最后把函数耗时又减小,看看框架能不能自动缩小线程数量。 sleep_time = 0.8 if time.time() - t_start > 300: sleep_time = None print( f'{time.strftime("%H:%M:%S")} ,当前线程数量是 {threading.active_count()}, {a} + {b} 的结果是 {result}, sleep {sleep_time} 秒') if sleep_time is not None: time.sleep(sleep_time) # 模拟做某事需要阻塞n秒种,必须用并发绕过此阻塞。 return result if __name__ == '__main__': f2.clear() for i in range(1000): f2.push(i, i * 2) f2.consume() ``` ### 4.7.2 此框架对固定耗时的任务,持续控频精确度高于99.9% 4.7.2 此框架对固定耗时的任务,持续控频精确度高于99.9%,远超celery的rate_limit 60%控频的精确度。 ``` 对于耗时恒定的函数,此框架精确控频率精确度达到99.9%,celery控频相当不准确,最多到达60%左右,两框架同样是做简单的加法然后sleep0.7秒,都设置500并发100qps。 ``` ```python @boost(BoosterParams(queue_name='test_queue66', broker_kind=BrokerEnum.REDIS, qps=100)) def f(x, y): print(f''' {int(time.time())} 计算 {x} + {y} = {x + y}''') time.sleep(0.7) return x + y @celery_app.task(name='求和啊', rate_limit='100/s') def add(a, b): print(f'{int(time.time())} 计算 {a} + {b} 得到的结果是 {a + b}') time.sleep(0.7) return a + b ``` ```text # 在pycahrm控制台搜索 某一秒的时间戳 + 计算 作为关键字查询,分布式函数调度框架启动5秒后,以后持续每一秒都是100次,未出现过99和101的现象。 在pycahrm控制台搜索 某一秒的时间戳 + 计算 作为关键字查询,celery框架,每一秒求和次数都是飘忽不定的,而且是六十几徘徊, 如果celery能控制到95至105次每秒徘徊波动还能接受,类似现象还有celery设置rate_limit=50/s,实际32次每秒徘徊, 设置rate_limit=30/s,实际18-22次每秒徘徊,可见celery的控频相当差。 设置rate_limit=10/s,实际7-10次每秒徘徊,大部分时候是9次,当rate_limit大于20时候就越来越相差大了,可见celery的控频相当差。 ``` ### 4.7.3 对函数耗时随机性大的控频功能证明 对函数耗时随机性大的控频功能证明,使用外网连接远程broker,持续qps控频。 ``` 设置函数的qps为100,来调度需要消耗任意随机时长的函数,能够做到持续精确控频,频率误差小。 如果设置每秒精确运行超过500000次以上的固定频率,前提是cpu够强机器数量多, 设置qps=50000,并指定is_using_distributed_frequency_control=True(只有这样才是分布式全局控频,默认是基于单个进程的控频),。 如果任务不是很重型很耗cpu,此框架单个消费进程可以控制每秒运行次数的qps 从0.01到1000很容易。 当设置qps为0.01时候,指定的是每100秒运行1次,qps为100指的是每一秒运行100次。 ``` ```python import time import random from funboost import boost, BrokerEnum, BoosterParams @boost(BoosterParams(queue_name='test_rabbit_queue7', broker_kind=BrokerEnum.RABBITMQ_AMQPSTORM, qps=100, log_level=20)) def test_fun(x): # time.sleep(2.9) # sleep时间随机从0.1毫秒到5秒任意徘徊。传统的恒定并发数量的线程池对未知的耗时任务,持续100次每秒的精确控频无能为力。 # 但此框架只要简单设置一个qps就自动达到了这个目的。 random_sleep = random.randrange(1, 50000) / 10000 time.sleep(random_sleep) print(x, random_sleep) if __name__ == '__main__': test_fun.consume() # test_fun.multi_process_consume(3) ``` 分布式函数调度框架对耗时波动大的函数持续控频曲线  ### 4.7.4 分布式全局控频和单个消费者控频区别 ``` @boost中指定 is_using_distributed_frequency_control=True 则启用分布式全局控频,是跨进程跨python解释器跨服务器的全局控频。 否则是基于当前消费者的控频。 例如 你设置的qps是100,如果你不设置全局控频,run_consume.py 脚本中启动 fun.consume() ,如果你反复启动5次这个 run_consume.py, 如果不设置分布式控频,那么5个独立的脚本运行,频率总共会达到 500次每秒,因为你部署了5个脚本。 同理你如果用 fun.multi_process_consume(4)启动了4个进程消费,那么就是4个消费者,总qps也会达到400次每秒。 这个控频方式是看你需求了。 如果设置了 is_using_distributed_frequency_control=True,那就会使用每个消费者发送到redis的心跳来统计总消费者个数。 如果你部署了2次,那么每个消费者会平分qps,每个消费者是变成50qps,总共100qps。 如果你部署了5次,那么每个消费者会平分qps,每个消费者是变成20qps,总共100qps。 如果你中途关闭2个消费者,变成了3个消费者,每个消费者是变成 33.33qps,总共100qps。(框架qps支持小数,0.1qps表示每10秒执行1次) ``` ## 4.8 再次说明qps能做什么,qps为什么流弊?常规并发方式无法完成的需求是什么? 以模拟请求一个flask接口为例子,我要求每一秒都持续精确完成8次请求,即控制台每1秒都持续得到8次flask接口返回的hello world结果。 ### 4.8.1 下面讲讲常规并发手段为什么对8qps控频需求无能为力? 不用框架也可以实现8并发, 例如Threadpool设置8线程很容易,但不用框架实现8qps不仅难而且烦 ``` 虽然 框架能自动实现 单线程 ,多线程, gevent , eventlet ,asyncio ,多进程 并发 , 多进程 + 单线程 ,多进程 + 多线程,多进程 + gevent, 多进程 + eventlet ,多进程 + asyncio 的组合并发 可以说囊括了python界的一切知名的并发模型,能做到这一点就很方便强大了 但是 此框架还额外能实现qps控频,能够实现函数每秒运行次数的精确控制,我觉得这个功能也很实用,甚至比上面的那些并发用起来还实用。 ``` ``` 这个代码是模拟常规并发手段无法达到每秒持续精确运行8次函数(请求flask接口8次)的目的。 下面的代码使用8个线程并发运行 request_flask_api 函数, 当flask_veiw_mock 接口耗时0.1秒时候,在python输出控制台可以看到,10秒钟就运行结束了,控制台每秒打印了80次hello world,严重超频10倍了不符合需求 当flask_veiw_mock 接口耗时刚好精确等于1秒时候,在python输出控制台可以看到,100秒钟运行结束了,控制台每秒打印了8次hello world,只有当接口耗时刚好精确等于1秒时候,并发数量才符合qps需求 当flask_veiw_mock 接口耗时10秒时候,在python输出控制台可以看到,需要1000秒钟运行结束,控制台每隔10秒打印8次hello world,严重不符合持续每秒打印8次的目的。 由此可见,用设置并发数量来达到每秒请求8次flask的目的非常困难,99.99%的情况下服务端没那么巧刚好耗时1秒。 ``` 天真的人会说根据函数耗时大小,来设置并发数量,这可行吗? ``` 有人会说,为了达到8qps目的,当函数里面sleep 0.1 时候他开1线程,那你这样仍然超频啊,你是每1秒钟打印10次了超过了8次。 当sleep 0.01 时候,为了达到8qps目的,就算你开1线程,那不是每1秒钟打印100次hello?你不会想到开0.08个线程个线程来实现控频吧? 当sleep 10秒时候,为了8qps目的,你开80线程,那这样是控制台每隔10秒打印80次hello,我要求的是控制台每一秒都打印8次hello,没告诉你是每隔10秒打印80次hello吧?还是没达到目的。 如果函数里面是sleep 0.005 0.07 0.09 1.3 2.7 7 11 13这些不规则无法整除的数字?请问你是如何一一计算精确开多少线程来达到8qps的? 如果flask网站接口昨天是3秒的响应时间,今天变成了0.1秒的响应时间,你的线程池数量不做变化,代码不进行重启,请问你如何做到自适应无视请求耗时,一直持续8qps的目的? 固定并发数量大小就是定速巡航不够智能前车减速会撞车,qps自适应控频那就是acc自适应巡航了,自动调整极端智能,压根无需提前测算预估函数需要耗时多久(接口响应耗时多久)。 所以综上所述,如果你有控频需求,你想用并发数量来达到控频目的,那是不可能的。 ``` 有些人到现在还没明白并发数量和qps(每秒执行多少次)之间的区别,并发数量只有在函数耗时刚好精确等于1秒时候才等于qps。 ``` ```python import time from concurrent.futures import ThreadPoolExecutor def flask_veiw_mock(x): # time.sleep(0.1) # 通过不同的sleep大小来模拟服务端响应需要消耗的时间 # time.sleep(1) # 通过不同的sleep大小来模拟服务端响应需要消耗的时间 time.sleep(10) # 通过不同的sleep大小来模拟服务端响应需要消耗的时间 return f"hello world {x}" def request_flask_api(x): response = flask_veiw_mock(x) print(time.strftime("%H:%M:%S"), ' ', response) if __name__ == '__main__': with ThreadPoolExecutor(8) as pool: for i in range(800): pool.submit(request_flask_api,i) ``` 截图是当 flask_veiw_mock 耗时为10秒时候,控制台是每隔10秒打印8次 hello world,没达到每一秒都打印8次的目的
完整的除了asyncio并发,包括 aio_push 和 asyncio 来等待获取结果,请看4b.3 章节
p> [ funboost + 全asyncio 编程生态演示](https://funboost.readthedocs.io/zh-cn/latest/articles/c4b.html#b-3-funboost-asyncio) ### 4.12.1 concurrent_mode=ConcurrentModeEnum.ASYNC 运行协程 concurrent_mode=ConcurrentModeEnum.ASYNC是一个loop中真异步运行协程 见7.8的demo介绍, ```python import asyncio from funboost import boost, ConcurrentModeEnum, BoosterParams @boost(BoosterParams(queue_name='async_queue', concurrent_mode=ConcurrentModeEnum.ASYNC)) async def f(): await asyncio.sleep(2) ``` 这种方式是@boost装饰在async def定义的函数上面。 celery不支持直接调度执行async def定义的函数,但此框架是直接支持asyncio并发的。 ### 4.12.2 concurrent_mode=ConcurrentModeEnum.THREADING 运行asyncio协程 concurrent_mode=ConcurrentModeEnum.THREADING 在每个线程都创建独立的loop,每个协程运行在不同的loop中 见7.38的demo介绍, ```python import asyncio from funboost import boost, ConcurrentModeEnum, BoosterParams @boost(BoosterParams(queue_name='threading_async_queue', concurrent_mode=ConcurrentModeEnum.THREADING)) async def f(): await asyncio.sleep(2) ``` 这种方式就是临时为每一个协程创建一个 loop,loop是一次性的。 ``` funboot的asyncio 并发模式是真asyncio , 是在同一个loop中并发的运行多个协程对象。 伪 async 并发是多线程中每个线程临时 loop = asyncio.new_event_loop() ,然后 loop.run_until_complete() , 这种就是假的,每隔协程都运行在不同的loop中。 ``` ConcurrentModeEnum.THREADING 照样可以运行async def的函数。 这种当然是伪asyncio的,是临时 创建一个 loop。 看你咋想的喜欢真asyncio还是假的,这种也可以单进程1秒钟运行2000次asyncio的函数 ## 4.13 跨项目怎么发布任务或者获取函数执行结果(即不定义@boost消费函数就发送消息)? 别的语言项目或者别的python项目手动发布消息到中间件,让分布式函数调度框架消费任务,这个是增加的一种很强大的功能,用户可以自定义发布者和消费者,注册到框架中。boost装饰器就能自动使用你的消费者类和发布者类了。 这个功能很好很强,能彻底解决框架的流程逻辑不符合你的期望时候,用户能够自定义一些细节。需要用户有一定的python语法基础和面向对象 设计模式才能用这个功能。 为什么增加这个功能,是由于总是有不符合用户期望的细节,用户如果要定制就要修改源码这样不方便,现在有了这就可以自由定制扩展了
用户自定义的类可以继承 AbstractConsumer ,这种方式适合扩展支持新的中间件种类。``` register_custom_broker有两个用途 1 是给用户提供一种方式新增消息队列中间件种类,(funboost框架支持了所有知名类型消息队列中间件或模拟中间件,这个用途的可能性比较少) 2 可以对已有中间件类型的消费者 发布者类继承重写符合自己意愿的,这样就不需要修改项目的源代码了,这种用法非常的强大自由,可以满足一切用户的特殊定制想法。 因为用户可以使用到self成员变量和通过重写使用其中的函数内部局部变量,能够做到更精细化的特殊定制。这个用途很强大自由灵活定制。 用法例如 register_custom_broker(BROKER_KIND_LIST, ListPublisher, ListConsumer) # 核心,这就是将自己写的类注册到框架中,框架可以自动使用用户的类,这样用户无需修改框架的源代码了。 ``` 以下为4个扩展或定制的代码例子: [继承AbstractConsumer基类 ,自定义扩展使用list作为消息队列](https://github.com/ydf0509/funboost/tree/master/test_frame/test_custom_broker/test_custom_list_as_broker.py) [继承AbstractConsumer基类 ,自定义扩展使用deque作为消息队列](https://github.com/ydf0509/funboost/tree/master/test_frame/test_custom_broker/test_custom_deque_as_broker.py) [继承AbstractConsumer的子类 ,自定义扩展使用redis实现先进后出 后进先出,总是优先消费最晚发布的消息的例子](https://github.com/ydf0509/funboost/tree/master/test_frame/test_custom_broker/test_custom_redis_consume_latest_publish_msg_broker.py) [继承AbstractConsumer的子类 ,自定义扩展重写消费者最核心控制运行函数的 _run方法的逻辑的例子](https://github.com/ydf0509/funboost/tree/master/test_frame/test_custom_broker/rewrite_run.py) ## 4.21b 【完全自由定制扩展(方式2)】,使用 consumer_override_cls 和 publisher_override_cls 来自定义消费者 发布者。 **方式2,使用装饰器的入参 consumer_override_cls 和 publisher_override_cls** ### 4.21b.1 重写某些方法的例子 下面的例子是自定义一个MyConsumer,传给 consumer_override_cls,MyConsumer可以继承自AbstractConsumer或者他的子类。 这个代码会让用户自定义记录函数的消费结果,可以重写AbstractConsumer的任意所有方法和属性,所以用户完全可以自由定义重写。 同理,通过指定 publisher_override_cls 一个自定义的 Publisher类,用户可以重写或自定义发布者。 下面例子是重写实现记录函数消费状态方法,所以只需要重写 user_custom_record_process_info_func , 你就能实现 celery的类似 on_sucess 和 on_failure ```python import time from funboost import boost, BrokerEnum, BoosterParams, AbstractConsumer, FunctionResultStatus import random import time from funboost import boost, BrokerEnum, FunctionResultStatusPersistanceConfig, BoosterParams, ConcurrentModeEnum, AbstractConsumer, FunctionResultStatus class MyConsumer(AbstractConsumer): def user_custom_record_process_info_func(self, current_function_result_status: FunctionResultStatus): print('使用指定的consumer_override_cls来自定义或重写方法') if current_function_result_status.success is True: print(f'入参 {current_function_result_status.params} 成功了,结果是: {current_function_result_status.result},模拟发个微信通知') else: print(f'入参 {current_function_result_status.params} 失败了,原因是: {current_function_result_status.exception},模拟发个邮件') self.logger.debug(current_function_result_status.get_status_dict()) # 给用户打印下current_function_result_status有哪些字段信息。 @boost(BoosterParams(queue_name='test_redis_ack_use_timeout_queue', broker_kind=BrokerEnum.REDIS, concurrent_mode=ConcurrentModeEnum.SINGLE_THREAD, log_level=10, consumer_override_cls=MyConsumer, is_show_message_get_from_broker=True)) def cost_long_time_fun(x): print(f'start {x}') time.sleep(2) if random.random()>0.5: raise ValueError('模拟函数运行出错') print(f'end {x}') return x*2 if __name__ == '__main__': for i in range(100): cost_long_time_fun.push(i) cost_long_time_fun.consume() ``` 上面例子是重写了父类方法,轻度的自定义某个方法. 也可以重量级自定义用于新增消息队列中间件种类,例如下面的代码,使用 list 模拟消息队列. ### 4.21b.2 完全实现新增中间件类型. 使用 list 列表作为 消息队列的中间件 实现, 通过指定 consumer_override_cls 和 publisher_override_cls 为用户自定义的类来实现新增消息队列种类。 这里只是用例子演示怎么使用 consumer_override_cls 和 publisher_override_cls 来开发新的消息队列种类,增加到 funboost, 不是真的推荐用户在生产大规模使用 list 结构作为消息队列中间件. ```python import threading import json import time from collections import defaultdict from funboost import boost, BrokerEnum, BoosterParams, EmptyConsumer, EmptyPublisher queue_name__list_map = defaultdict(list) list_lock = threading.Lock() ''' 使用 list 列表作为 消息队列的中间件 实现, 通过指定 consumer_override_cls 和 publisher_override_cls 为用户自定义的类来实现新增消息队列种类。 这里只是用例子演示怎么使用 consumer_override_cls 和 publisher_override_cls 来开发新的消息队列种类,增加到 funboost, 不是真的推荐用户在生产大规模使用 list 结构作为消息队列中间件. ''' class MyListConsumer(EmptyConsumer): def custom_init(self): self.list: list = queue_name__list_map[self.queue_name] def _dispatch_task(self): while True: try: with list_lock: msg = self.list.pop() self._submit_task({'body': msg}) except IndexError: time.sleep(0.1) def _confirm_consume(self, kw): """ 这里是演示,所以搞简单一点,不实现确认消费 """ pass def _requeue(self, kw): with list_lock: self.list.append(kw['body']) class MyListPublisher(EmptyPublisher): def custom_init(self): self.list: list = queue_name__list_map[self.queue_name] def _publish_impl(self, msg: str): with list_lock: self.list.append(msg) def clear(self): with list_lock: self.list.clear() def get_message_count(self): with list_lock: return len(self.list) def close(self): pass ''' 完全重新自定义增加中间件时候,broker_kind 建议指定为 BrokerEnum.EMPTY ''' @boost(BoosterParams(queue_name='test_define_list_queue', broker_kind=BrokerEnum.EMPTY, # 完全重新自定义新增中间件时候,broker_kind 请指定 BrokerEnum.EMPTY concurrent_num=10, consumer_override_cls=MyListConsumer, publisher_override_cls=MyListPublisher, is_show_message_get_from_broker=True)) def cost_long_time_fun(x): print(f'start {x}') time.sleep(20) print(f'end {x}') if __name__ == '__main__': for i in range(100): cost_long_time_fun.push(i) cost_long_time_fun.consume() ``` ### 4.21b.3 funboost的用户自定义扩展比celery更容易,更彻底 因为funboost的扩展是使用经典oop,用户可以完全100%修改定制funboost任何细节,不需要我亲自提前预判预留暴露几百个用户可能需要用到的钩子。 funboost现在很少预留用户级钩子,唯一的就是 boost 装饰器 有个 `user_custom_record_process_info_func` 钩子,给用户提供记录函数结果状态的,但是这不是必需品,用户完全可以使用4.21介绍的扩展方式,通过`consumer_override_cls`重写父类的这个`user_custom_record_process_info_func` 空方法或者直接重写父类的_run方法,这更oop,写法更一致。 celery的扩展就很垃圾了,必须依赖框架自身提前预留暴露了相关钩子或者信号机制,如果你有个奇葩独特的自定义需求,但是celery没给你暴露相关钩子或者信号机制,你只能对celery的源码进行修改或者使用猴子补丁来动态替换源码了。 你可以看看 funboost 和 celery 怎么扩展 `opentelemetry` 来实现全链路可视化追踪的。见文档 4b.7.3 章节。 **-> ### 4b.7.3 从opentelemetry功能集成, 侧面证明 funboost 对普通用户扩展性比celery强10倍。** **在文档4b.2c章节,也演示了通过指定 consumer_override_cls 来消费任意格式的消息,用户可以和4.21章节一起阅读** ## 4.21c 不想吃苦?让ai来帮你扩展funboost中间件或者定制运行逻辑 `funboost`用户级别可以完全自由定制扩展的,提供 `register_custom_broker` 和 `override_cls` 两种用户级别自由扩展定制。 如果用户想增加或者修改怎么操作broker中间件或者定制funboost逻辑,并且不想亲自吃苦看4.21章节 和 `funboost/md_for_ai/如何扩展增加新的中间件.md`,那么用户可以让 ai 来高效实现, ai写得扩展比我自己写的还好。 ### 4.21c.2 假设你要使用python内置的list作为 funboost的消息队列: 你让ai来写增加中间件或者定制逻辑的方式就是如下 promote: 你先上传 `funboost_all_docs_and_codes.md` 这个文档给ai,并使用如下提示词 ``` 你帮我实现 python 内置的 list 作为 funboost的消息队列。 注意: 1.你要阅读 `funboost_all_docs_and_codes.md`中的4.21章节,`4.21 funboost完全自由定制扩展` 2.你要阅读 `funboost_all_docs_and_codes.md` 中的 `funboost/md_for_ai/如何扩展增加新的中间件.md` 3.你要参考已有的funboost扩展代码实现 `funboost/contrib/override_publisher_consumer_cls` 和 `funboost/contrib/register_custom_broker_contrib` 文件夹中的代码。 4.你要阅读 `funboost/consumers/base_consumer.py`的AbstractConsumer基类逻辑 5.你要阅读 `funboost/publishers/base_publisher.py`的AbstractPublisher基类逻辑 ``` #### 4.21c.2.2 说个ai使用技巧秘密,网页版ai大模型生成funboost代码 吊打编程 ide中的 ai大模型和claude code 现在的ai编程ide 使用 `rag` 和 `agentic search`。 `rag` 幻觉高,推理差。 `agentic search` 有时候搜索关键字不准,而且需要连续轮番检索和ai多轮交互,速度太慢了,效率低,还要充会员。 2026年很多ai大模型已经有1000K上下文了,就是俗称的百万上下文。而 `funboost_all_docs_and_codes.md` 是900k上下文,ai刚好能应付。 **荐几个原生 百万上下文的ai:** - `https://chat.deepseek.com` deepseek 2026年升级的mhc 和engram机制, 果然名不虚传,面对100万上下文,网页版免费无限使用,而且又快又准。速度比qwen的快太多了。 - `https://aistudio.google.com/` google ai studio 中的gemini pro 很准确,缺点是2026年后开始限制使用次数了。 注意不要使用 `https://gemini.google.com/` 官网中的 gemini,这个虽然通用使用gemini pro,幻觉比 google ai studio 中高太多了。 - `https://chat.qwen.ai/` qwen 2026年后也支持100万上下文了,但是目前体验的qwen3.5plus准确度可以,但回答速度太慢,受不了。还是deepseek强。 注意不要使用 `https://www.qianwen.com` 通义官网中的qwen模型,通义官网的qwen实测幻觉率比国际版千问网页高很多。 通义官网的ai不好,具体表现在我上传了完整的funboost教程和源码markdown后,他还非要自作聪明的去自动联网搜索funboost在互联网上的内容,因为funboost互联网内容少,所以自动联网反而不好。 **在ai大模型deepseek的网页中生成 funboost 代码截图:**  ## 4.23 演示funboost框架是如何代替用户手写调用线程池的 为什么框架介绍中说有了funboost,再也无需用户手动操作线程和线程池ThradPoolExecutor以及multiprossing.Process()了。 手动使用线程池写法 ```python import time from concurrent.futures import ThreadPoolExecutor def f(x, y): print(f'{x} + {y} = {x + y}') time.sleep(10) if __name__ == '__main__': pool = ThreadPoolExecutor(5) for i in range(100): pool.submit(f, i, i * 2) ``` funboost取代手写调用线程池 ```python import time from funboost import boost, BrokerEnum, BoosterParams @boost(BoosterParams(queue_name='test1', broker_kind=BrokerEnum.MEMORY_QUEUE, concurrent_num=5)) def f(x, y): print(f'{x} + {y} = {x + y}') time.sleep(10) if __name__ == '__main__': f.consume() for i in range(100): f.push(i, i * 2) ``` ```text 这两个的效果是一样的,都是使用内存queue来保存待运行的任务,都是使用5线程并发运行f函数的。 funboost还能开启多进程,取代用户手写 Process(target=fx),所以有了funboost,用户无需手写开启线程 进程。 如果用户希望任务保存到redis中先,而不是保存在python内存queue中,那就使用funboost比调用ThreadPoolExecutor方便多了。 再比如用户希望每秒运行完成10次f函数(控制台每秒都打印10次求和结果),而不是开启10线程来运行f函数,funboost则远方便于ThreadPoolExecutor ``` ## 4.24 设置消费函数重试次数 ```python import random import time from funboost import boost, BrokerEnum, BoosterParams @boost(BoosterParams(queue_name='test_queue9', broker_kind=BrokerEnum.REDIS_ACK_ABLE, max_retry_times=5)) def add(a, b): time.sleep(2) if random.random() >0.5: raise ValueError('模拟消费函数可能出错') return a + b if __name__ == '__main__': for i in range(1,1000): add.push(i,i*2) add.consume() ``` ``` 通过设置 max_retry_times 的值,可以设置最大重试次数,函数如果出错了,立即将参数重试max_retry_times次,如果重试次数达到指定的max_retry_times,就执行确认消费了。 ``` ### 4.24.1 抛出ExceptionForRequeue类型错误,消息立即重回消息队列 ```python @boost(BoosterParams(queue_name='test_rpc_queue', is_using_rpc_mode=True, broker_kind=BrokerEnum.REDIS_ACK_ABLE, qps=100, max_retry_times=5)) def add(a, b): time.sleep(2) if random.random() >0.5: raise ExceptionForRequeue('模拟消费函数可能出错,抛出ExceptionForRequeue类型的错误可以使消息立即重回消息队列') return a + b if __name__ == '__main__': add.consume() ``` ``` 消费函数中 raise ExceptionForRequeue ,会使消息立即重回消息队列的尾部。 如果函数运行该消息的出错概率使100%,就要慎重了, 避免函数运行这个消息一直出错一直重回消息队列,无限蒙蔽死循环消费这个入参。 ``` ### 4.24.2 抛出 ExceptionForPushToDlxqueue 类型错误,消息发送到单独另外的死信队列中 ```python @boost(BoosterParams(queue_name='test_rpc_queue', is_using_rpc_mode=True, broker_kind=BrokerEnum.REDIS_ACK_ABLE, qps=100, max_retry_times=5)) def add(a, b): time.sleep(2) if random.random() >0.5: raise ExceptionForPushToDlxqueue('模拟消费函数可能出错,抛出ExceptionForPushToDlxqueue类型的错误,可以使消息发送到单独的死信队列中') return a + b if __name__ == '__main__': add.consume() ``` ``` 消费函数中 raise ExceptionForPushToDlxqueue ,可以使消息发送到单独的死信队列中,死信队列的名字是正常队列的名字 + _dlx ``` ### 4.24.3 设置is_push_to_dlx_queue_when_retry_max_times,重试到max_retry_times最大次数没成功发送到死信队列 ``` @boost 装饰器 设置 is_push_to_dlx_queue_when_retry_max_times = True, 则函数运行消息出错达到max_retry_times最大重试次数后仍没成功,确认消费,同时发送到死信队列中。 ``` ### 4.24.4 (内置辅助)将一个消息队列中的消息转移到另一个队列 ``` 可以用于死信队列转移到正常队列。 ``` ```python from funboost.contrib.queue2queue import consume_and_push_to_another_queue,multi_prcocess_queue2queue if __name__ == '__main__': # 一次转移一个队列,使用单进程 # consume_and_push_to_another_queue('test_queue77h3', BrokerEnum.RABBITMQ_AMQPSTORM, # 'test_queue77h3_dlx', BrokerEnum.RABBITMQ_AMQPSTORM, # log_level_int=logging.INFO, exit_script_when_finish=True) # 转移多个队列,并使用多进程。 multi_prcocess_queue2queue([['test_queue77h5', BrokerEnum.RABBITMQ_AMQPSTORM, 'test_queue77h4', BrokerEnum.RABBITMQ_AMQPSTORM]], log_level_int=logging.INFO, exit_script_when_finish=True, n=6) ``` ### 4.24.5 funboost 高级重试:指数退避重试 > [!TIP] > funboost 默认的重试是简单的 **“立即重试”**。在许多生产环境下,为了避免对下游系统造成压力,通常需要 **指数退避(Exponential Backoff)** 重试机制。 通过装饰器的 `advanced_retry_config` 的 `retry_mode` 设置 `sleep` 或者 `requeue` 可以设置重试模式。 > **优势说明**:funboost 重试的 `requeue` 模式完爆各种 Python 三方包的 `retry` 装饰器。因为指数退避重试的间隔很大并且会越来越大,不适合在装饰器中简单粗暴地 `sleep`,那样会导致长时间阻塞霸占线程或协程,降低系统吞吐量。所以需要使用重回队列的方式,用 APScheduler 来调度何时再次重试。 开启高级重试需要设置: - `is_using_advanced_retry=True` - `advanced_retry_config={...}` #### 4.24.5.1 `advanced_retry_config` 参数详解 | 参数名称 | 类型 | 说明 | | :--- | :--- | :--- | | `retry_mode` | `str` | **重试模式**:
也可以继承自框架中已有的 AbstractConsumer 的子类,这种适合对逻辑进行调整,或者增加打印什么的 。 test_frame/test_custom_broker/test_custom_redis_consume_latest_publish_msg_broker.py 就是继承自 AbstractConsumer 的子类。
要写成 `类.方法.push(对象,入参1,入参2,入参3=值3,入参4=值4,入参5=值5,)` 不能写成 `实例对象.方法.push(入参1,入参2,入参3=值3,入参4=值4,入参5=值5,)`- **使用类方法作为funboost任务切记发布形式:**
要写成 `类.方法.push(类,入参1,入参2,入参3=值3,入参4=值4,入参5=值5,)` 不能写成 `类.方法.push(入参1,入参2,入参3=值3,入参4=值4,入参5=值5,)`- **使用静态方法作为funboost任务**
无需任何注意,因为@classmethod 方法的第一个入参 不是self又不是cls, 所以 类.方法.push(入参1,入参2,入参3=值3,入参4=值4,入参5=值5,) 是正确的,无需注意。- **思考!:** ``` 假设你不把 self 发到消息队列里面去,任务取出来时候咋知道 self的成员变量们是什么? 例如 : def instance_method(self, y): print(self.x + y) # 这个求和用到了实例属性和方法入参求和,证明为什么push发布消息时候要传递self,因为两数字求和,不仅和入参y有关系,还和self的x成员变量有关系呢。 ``` ### 4.32.1 funboost 支持实例方法和类方法作为消费函数的原理 funboost 适配 实例方法和类方法的实现原理讲一下: 假设类如下: ```python class Myclass: m = 1 def __init__(self, x): self.obj_init_params: dict = ClsHelper.get_obj_init_params_for_funboost(copy.copy(locals())) # 这行重要,如果实例方法作为消费函数,那么必须定义obj_init_params_for_funboost保存对象的 __init__ 入参,用于还原生成对象。 # self.obj_init_params_for_funboost= {'x':x} # 上面这行相当于这行,如果__init__入参太多,一个个的写到字典麻烦,可以使用上面的方式获取__init__入参字典。 self.x = x @boost(BoosterParams(queue_name='instance_method_queue', is_show_message_get_from_broker=True, )) def instance_method(self, y): print(self.x + y) # 这个求和用到了实例属性和方法入参求和,证明为什么发布消息时候要传递self。 # @classmethod @BoosterParams(queue_name='class_method_queue', is_show_message_get_from_broker=True, ) def class_method(cls, y): print(cls.m + y) ``` ``` 对于类方法: 通过方法所属的类名和模块名,反射得到类,但不能直接把类本身作为消息的函数入参,发布到消息队列, 因为采用的是json序列化消息,所以入参不能包括在定义类型和在定义类型的对象, funboost发布消息时候使用字典代替了类方法的第一个入参 cls, funboost在消费消息时候,把消息的第一个入参cls,替换成类本身,这样就是生成真正的函数入参,然后再去调用类方法。 发布消息: Myclass.class_method.push(Myclass,2) 对于实例方法: funboost发布消息时候使用字典代替了实例方法的第一个入参 self,把 对象的实例化时候的入参字典就是对象的 obj_init_params 属性,放到这个字典里面去了。 消费时候,从消息队列获取入参后,使用 obj_init_params 这个字典作为对象实例化的入参,重新生成一个对象,然后把这个对象替换实例方法的第一个入参self,再去调用实例方法。 对象必须定义 obj_init_params 属性,保存对象 __init__ 时候的入参字典,供消费时候重新生成对象。 因为 1+2=3,是不仅和instance_method的入参y有关系,还和对象本身的x属性也有关系。 发布消息: Myclass.instance_method(Myclass(1),2) !!!特别需要注意的是实例方法作为消费函数的时候,对象必须定义 obj_init_params 属性保存初始化入参,并且 __init__ 的入参同样必须是基础类型,能被json序列化的, 不能是自定义类型和对象。 ``` ### 4.32.2 funboost 支持实例方法、类方法、静态方法、普通函数 4种类型,作为消费函数的例子 下面代码完整的演示了 实例方法、类方法、静态方法、普通函数 4种类型,作为消费函数的例子,请务必注意看代码和注释说明。 !!!请务必注意观察 实例方法和类方法的push的形式和普通函数的push第一个入参的区别 ```python3 import copy from funboost import BoosterParams, boost from funboost.constant import FunctionKind from funboost.utils.class_utils import ClsHelper class Myclass: m = 1 def __init__(self, x): # 这行重要,如果实例方法作为消费函数,那么必须定义obj_init_params_for_funboost保存对象的 __init__ 入参,用于还原生成对象。 self.obj_init_params: dict = ClsHelper.get_obj_init_params_for_funboost(copy.copy(locals())) # self.obj_init_params = {'x':x} # 上面这行相当于这行,如果__init__入参太多,一个个的写到字典麻烦,可以使用上面的方式获取__init__入参字典。 self.x = x @boost(BoosterParams(queue_name='instance_method_queue', is_show_message_get_from_broker=True, )) def instance_method(self, y): print(self.x + y) # 这个求和用到了实例属性和方法入参求和,证明为什么发布消息时候要传递self。 # @classmethod @BoosterParams(queue_name='class_method_queue', is_show_message_get_from_broker=True, ) def class_method(cls, y): print(cls.m + y) @staticmethod @BoosterParams(queue_name='static_method_queue', is_show_message_get_from_broker=True) def static_method(y): print(y) @BoosterParams(queue_name='common_fun_queue', is_show_message_get_from_broker=True) def common_f(y): print(y) if __name__ == '__main__': for i in range(6, 10): Myclass.instance_method.push(Myclass(i), i * 2) # 注意发布形式,实例方法发布消息不能写成 Myclass(i).push(i * 2) 只发布self之后的入参, self也必须传递。 Myclass.instance_method.consume() for i in range(6, 10): Myclass.class_method.push(Myclass,i * 2) # 注意发布形式,不是 Myclass.class_method.push(i * 2) , 而是应该写 Myclass.class_method.push(Myclass,i * 2),cls也要传 Myclass.class_method.consume() for i in range(10): Myclass.static_method.push(i * 2) # 不需要注意发布形式,和 普通函数的发布一样 Myclass.static_method.consume() for i in range(10): common_f.push(i * 2) common_f.consume() ``` 请注意看下面运行截图中的消息,self 在消息队列中间件中使用 json来表达了。消费运行时候重新根据obj_init_params和类名、文件名,生成Myclass类型的对象。 ``` 向instance_method_queue 队列,推送消息 耗时0.001秒 {'self': {'first_param_name': 'self', 'obj_init_params': {'x': 6}, 'cls_name': 'Myclass', 'cls_file': 'D:/codes/funboost/test_frame/test_instancemothed_funboost/test_method_consume.py'}, 'y': 12} ```  ## 4.33 @boost设置is_auto_start_consuming_message,自动启动消费。 ``` @BoosterParams(queue_name="q1", is_auto_start_consuming_message=True) def f(x): 这样写后,自动启动消费,不需要 用户手动的写 f.consume() 来启动消费。 ``` ```python import time from funboost import BoosterParams, BrokerEnum @BoosterParams(queue_name="test_instead_thread_queue", broker_kind=BrokerEnum.MEMORY_QUEUE, concurrent_num=10, is_auto_start_consuming_message=True) # is_auto_start_consuming_message 这里设置为了True def f(x): time.sleep(3) print(x) if __name__ == '__main__': for i in range(100): f.push(i) #### f.conusme() #is_auto_start_consuming_message=True后, 这一行代码不需要,不需要手动 f.consume() 来启动消费。 ``` ## 4.34 pyinstaller 打包 funboost项目为exe 的说明 见独立demo项目 https://github.com/ydf0509/funboost_pyinstaller 里面有报错解决说明,funboost打包很容易。 ## 4.35 演示 funboost 的函数入参过滤功能 ```python @BoosterParams(queue_name='queue_test2', qps=6, broker_kind=BrokerEnum.REDIS, do_task_filtering=True, # 这个是设置是否开启任务入参过滤 task_filtering_expire_seconds=3600, # 这个是可以设置任务入参过滤过期时间,例如1小时内查询了深圳天气,在1小时内再查会被过滤,因为1小时内已经查询过了,而1小时后查询的深圳天气,则不会被过滤。 concurrent_num=1) def f2(a, b): sleep_time = 1 result = a + b print(f'消费此消息 {a} + {b} 中。。。。。,此次需要消耗 {sleep_time} 秒') time.sleep(sleep_time) # 模拟做某事需要阻塞n秒种,必须用并发绕过此阻塞。 print(f'{a} + {b} 的结果是 {result}') return result @BoosterParams(queue_name='queue_test3', qps=6, broker_kind=BrokerEnum.REDIS, do_task_filtering=True, # 设置开启消息过滤 concurrent_mode=ConcurrentModeEnum.SINGLE_THREAD) def f3(a, b): sleep_time = 1 result = a + b print(f'消费此消息 {a} + {b} 中。。。。。,此次需要消耗 {sleep_time} 秒') time.sleep(sleep_time) # 模拟做某事需要阻塞n秒种,必须用并发绕过此阻塞。 print(f'{a} + {b} 的结果是 {result}') return result if __name__ == '__main__': pass # print(f2.__name__) # f2.clear() f2.consume() f3.consume() for i in range(200): f2.push(i, i * 2) # 如果不传递filter_str, 默认是 把 所有入参 a和b,排序后作为json都加入到过滤中 f3.publish(msg={'a':i,'b':i*2},task_options=TaskOptions(filter_str=str(i))) # 这个是仅仅把 a 作为过滤条件,例如函数入参 userid username sex ,通常按照userid 过滤足以, 不需要username sex也一起过滤,可以节约redis内存。 time.sleep(5) # 因为 funboost 是确认消费完成后才加入过滤。如果消息耗时很长,且并发很大,且两个相同入参的消息连续挨着,第二个还会执行,所以这里演示sleep一下。 for i in range(200): f2.push(i, i * 2) f3.publish(msg={'a':i,'b':i*2},task_options=TaskOptions(filter_str=str(i))) ctrl_c_recv() ``` ### 4.35.2 警告!!!: funboost的 rpc功能和 函数入参过滤过滤 不要同时使用 如果你设置了 do_task_filtering=True , 则说明开启了函数入参过滤功能。 如果是你设置了 is_using_rpc_mode=True , 则说明开启了 rpc功能。 当远程使用rpc功能获取命中过滤的入参,因为函数入参命中了过滤,所以不会执行函数,rpc第二次请求系统入参得不到函数结果,所以两者不能提示使用。 funboost的 do_task_filtering 只是保存了函数入参对应的过滤key,并没有保存函数入参对应的结果到redis。 如果rpc和防止相同入参重复运行两个功能要同时使用, 请使用 nb_cache + rpc , 不要要funboost自带的 do_task_filtering ## 4.35c 使用 nb_cache 作为缓存装饰器 ### 4.35c.1 为什么 要用 nb_cache 而不是 funboost 自带的 do_task_filtering funboost的 rpc和 do_task_filtering 不能同时使用,因为 do_task_filtering 是侧重防止重复运行,并没有持久化保存函数入参对应的结果。 而 nb_cache 是侧重缓存,不仅保存了去重key还保存了key对应的函数结果。 nb_cache 也是我发明的,是python中功能最强大最全面的缓存装饰器,光源码就高达3000行,远超普通人写的20行左右的函数结果缓存装饰器。 funboost 是侧重消息消费,不能为了实现复杂的缓存功能,而在funboost源码代码加几千行实现复杂的缓存,导致项目太臃肿和缓存功能耦合了。所以我开发了 nb_cache三方包。 nb_cache 可以单独使用,也可以在 funboost中使用 ### 4.35c.2 nb_cache 功能比 funboost自带的 do_task_filtering 全面太多了 nb_cache 可以解决 缓存击穿 缓存雪崩 缓存穿透 三大经典缓存问题 nb_cache 可以实现 内存 + redis 两级缓存 nb_cache 可以实现熔断和降级 nb_cache 可以实现复杂的限流,例如针对函数入参的精确限流 (funboost的qps是针对队列消息的整体控制频率,不能实现例如对每个ip 和 userid的精确限流) #### 4.35c.2.2 举个例子 nb_cache 的防止缓存击穿为什么吊打 funboost的 do_task_filtering 例如函数运行查询 123这个入参的热点数据,假设需要耗时5分钟才能运行完查询函数。 但是在5分钟内有10万个消息都是查询123这个key对应的热点数据,那么do_task_filtering因为在5分钟内一次都没运行完成查询123,所以没有命中123这个需要过滤,所以会运行10万次查询123这个key对应的热点数据。 nb_cache的cache装饰器 可以设置lock=True, 会先把123这个key加锁,然后得到锁了才能去运行函数,这样就可以防止缓存击穿了。 ### 4.35c.2 如何在 funboost 中使用 nb_cache **安装 nb_cache** `pip install nb_cache` **nb_cache地址:** [https://github.com/ydf0509/nb_cache](https://github.com/ydf0509/nb_cache) #### 4.35c.2.1 方式一,@boost 和 @cache 叠加使用 @boost叠加别的装饰器时候,需要设置 `should_check_publish_func_params = False` , 因为装饰器叠加时候,获取不到原始函数入参签名信息。 ```python from nb_cache import Cache redis_cache = Cache().setup("redis://localhost:6379/0", prefix="myapp") @boost(BoosterParams( queue_name='queue_test',concurrent_num=10, is_using_rpc_mode=True, broker_kind=BrokerEnum.REDIS_ACK_ABLE, # 装饰器叠加时候,获取不到原始函数入参签名信息,所以需要设置 should_check_publish_func_params = False should_check_publish_func_params = False , )) @redis_cache.cache( ttl=300, # 缓存300秒 key="user:{user_id}", # 可以灵活自定义函数入参生成过滤key,如果函数是复杂的多个入参时候指定key特别有用。 lock=True, # 运行入参时候,限先锁,防止缓存击穿。防止由于没命中过滤而导致同时都运行这个相同的入参 ) def get_user(user_id): pass # 查询user_id ``` #### 4.35c.2.2 方式二 cache 装饰器传给 funboost 的 `consuming_function_decorator` 将cache装饰器传给 `funboost` 的 `consuming_function_decorator 此种方式好处是 不需要设置 `should_check_publish_func_params = False`,能正常校验发布的消息是否合法。 ```python from nb_cache import Cache # dual 模式是 Redis + 内存 2级缓存模式,具备 内存性能更强悍 + redis持久化优势。非常推荐 dual模式。 # memory_size=1000: 内存最多存 1000 条; local_ttl=30: 内存层 TTL 30秒 dual_cache = Cache().setup("dual://localhost:6379/0?memory_size=1000&local_ttl=30",prefix="myapp") @boost(BoosterParams( queue_name='queue_test',concurrent_num=10, is_using_rpc_mode=True, broker_kind=BrokerEnum.REDIS_ACK_ABLE, consuming_function_decorator = dual_cache.cache( ttl=100, # 缓存100秒 key="user:{user_id}", # 可以灵活自定义函数入参生成过滤key,如果函数是复杂的多个入参时候指定key特别有用。 ) )) def get_user(user_id): pass # 查询user_id ``` ## 4.36 演示`funboost`入参可以是自定义类型(不可json序列化的类型的入参,自动使用pickle)(2025-07新增支持) 以前作者不愿意支持消费函数入参是自定义类型,2025-07 之后支持了. 就是现在消费函数的入参可以是 字符串 数字 列表 字典 以外的自定义类型, def func1(a:MyClass,b:str,c:MyPydanticModel) 现在可以. 原理: ``` 消息整体还是一个json,但是对于不可序列化的那些入参字段key对应的value, 会用pickle序列化成字符串(非bytes)替代. str(pickle.dumps(obj_x)) 当运行函数之前,会对不可json序列化的那些入参的value,使用 pickle.loads(ast.literal_eval(para_pickle_str)) 转成对象 ``` ```python """ 此demo演示funboost新增支持了pickle序列化, 当用户的消费函数入参不是基本类型,而是自定义类型时候,funboost能自动识别,并将相关字段使用pickle序列化成字符串. 当消费函数运行时,funboost能自动将 不可json序列化的那些字段的pickle字符串反序列化成对象,并赋值给消费函数入参. """ from pydantic import BaseModel from funboost import (boost, BoosterParams, BrokerEnum, ctrl_c_recv, fct) class MyClass: def __init__(self,x,y): self.x = x self.y = y def change(self,n): self.x +=n self.y +=n def __str__(self): return f'
之前文档中反复说了@boost消费函数入参只能是基本类型,那些地方的文档还没改过来, 但是现在以这个4.36章节的说明为准,2025-07月以后已经支持消费函数入参是自定义类型了. 但是非必要,用户把消费函数入参还是设计成基本类型更好,json序列化后看得更清楚,消息体积也小. 因为 pickle 序列化也不是万能的,例如threading.Lock socket对象 等等都不能pickle序列化 (用户可以问ai python中哪些类型不可pickle序列化), 只要一个对象的属性链路上,某个属性是这些类型就不能pickle序列化,这些是python pickle基本常识和经验,不能违反突破本质原理. pickle序列化不稳固,如果 a 是 Myclass 类型对象,消息发布后你又把 Myclass 类名改了,或者把类移到到了另一个模块, 或者模块改名了,那么pickle反序列化就会失败,所以应该尽量使用简单基本类型作为 funboost 消费函数的入参。 funboost是支持pickle序列化,但不是鼓励你 消费函数入参设计成传参自定义类型对象,导致自动使用pickle序列化.## 4.37 funboost 启动消费函数的方式大全(再次集中总结) 以下是 **9 种**主要的启动消费方式总结: ### 4.37.1. 基础启动 (最常用) 这是最基本的启动方式,在当前进程中启动消费(默认使用多线程并发)。 * **语法**: `task_fun.consume()` * **特点**: 非阻塞主线程(后台线程运行)。 * **适用场景**: 开发调试、容器化部署单进程应用。 ```python @boost('queue_1') def task_fun(x): pass if __name__ == '__main__': task_fun.consume() # 需要阻塞主线程,否则主线程结束子线程也会退出 ctrl_c_recv() ``` ### 4.37.2. 多进程叠加并发启动 (高性能推荐) 启动 N 个独立的进程,每个进程内部再进行多线程/协程并发。这是 `funboost` 性能炸裂的核心模式。 * **语法**: `task_fun.multi_process_consume(n)` (简写 `task_fun.mp_consume(n)`) * **特点**: 充分利用多核 CPU,突破 GIL 限制。 * **适用场景**: 生产环境、高吞吐量任务、CPU 密集型任务。 ```python if __name__ == '__main__': # 启动 4 个进程,假设默认并发是 50 线程 # 那么总并发能力 = 4 * 50 = 200 task_fun.multi_process_consume(4) ctrl_c_recv() ``` ### 4.37.3. 自动启动 (懒人模式) 在装饰器参数中设置,定义函数时即自动启动消费。 * **语法**: `@boost(..., is_auto_start_consuming_message=True)` * **特点**: 无需手写 `.consume()` 代码。 * **适用场景**: 简单的脚本、测试代码。 ```python @boost(BoosterParams(queue_name='q1', is_auto_start_consuming_message=True)) def task_fun(x): pass # 代码运行到这里时,消费已经自动在后台开始了 if __name__ == '__main__': while 1: time.sleep(10) ``` ### 4.37.4. 批量启动所有消费者(最粗暴无脑) 一次性启动项目中所有已加载(已经Imported的消费函数所在模块)。 * **语法**: `BoostersManager.consume_all_queues()` 或简写 `BoostersManager.consume_all` * **特点**: 适合一个脚本管理多个队列的场景。 * **变体**: `BoostersManager.multi_process_consume_all_queues(n)` 或简写 `BoostersManager.mp_consume_all(n)` (所有队列都开启 N 个进程)。 ```python from funboost import BoostersManager # 假设你 import 了很多个消费函数模块 import tasks_a import tasks_b if __name__ == '__main__': # 一键启动所有 BoostersManager.consume_all() ctrl_c_recv() ``` ### 4.37.5. 分组启动 (Group Start) 只启动属于特定业务组(`booster_group`)的消费者。 * **语法**: `BoostersManager.consume_group("group_name")` * **配置**: 需要在装饰器中设置 `BoosterParams(..., booster_group='my_group')`。 * **适用场景**: 一个大项目中包含多个子系统,希望按业务模块分开启动消费。 ```python # 在定义时分组 @boost(BoosterParams(queue_name='q1', booster_group='group_A')) def task1(x): pass # 在入口启动 if __name__ == '__main__': BoostersManager.consume_group('group_A') ``` ### 4.37.6. 命令行启动 (CLI) 不修改 Python 代码,直接通过 Shell 命令启动。 * **语法**: `python funboost_cli_user.py consume queue_name` * **特点**: 运维友好,适合配合 Supervisor 等工具管理。 * **变体**: * 启动单个: `python funboost_cli_user.py consume queue1` * 启动多个: `python funboost_cli_user.py consume queue1 queue2` * 多进程启动: `python funboost_cli_user.py mp_consume --queue1=2 --queue2=4` ### 4.37.7. 远程自动部署启动 (Fabric) 在本地代码中直接操作远程服务器,上传代码并启动消费。 * **语法**: `task_fun.fabric_deploy(host, port, user, password, ...)` * **特点**: 内置自动化运维,无需 Jenkins/CI 即可将函数部署到远程 Linux 服务器运行。 ```python # 一键把当前函数部署到 192.168.1.100 上并启动 2 个进程消费 task_fun.fabric_deploy('192.168.1.100', 22, 'root', 'pwd', ...,process_num=2) ``` ### 4.37.8. Celery 模式启动 (特殊) 如果 `broker_kind` 设置为 `BrokerEnum.CELERY`,则 funboost 只是外壳,底层由 Celery 驱动。 * **语法**: `CeleryHelper.realy_start_celery_worker()` * **特点**: 启动的是 Celery 的 Worker,完全复用 Celery 的生态。 ## 4.38 MemoryFunboostPool 和 FunboostPool 的使用 `funboost` 提供了 `MemoryFunboostPool` 和 `FunboostPool` 两个任务池类,它们的 `submit` 和 `map` 方法 API 完全兼容 `concurrent.futures.ThreadPoolExecutor`。您可以轻松地将项目中的线程池替换为它们,从而获得智能线程伸缩、支持异步函数、QPS 控频、分布式部署等高级能力。 ### 4.38.1 MemoryFunboostPool:内存增强型任务池 `MemoryFunboostPool` 固定使用内存队列 (`MEMORY_QUEUE`) 驱动,且固定 `max_retry_times=0`,行为与 `ThreadPoolExecutor` 最为接近。它是一个开箱即用的增强型线程池,可直接替代原生线程池。 **4.38.1.1 基础用法** ```python from funboost.core.funboost_pool import MemoryFunboostPool def add(a, b): return a + b # 实例化任务池 pool = MemoryFunboostPool(10, qps=20,) # 提交单个任务 future = pool.submit(add, 5, 3) print(future.result()) # 输出: 8 # 使用 map 批量提交 results = pool.map(add, [1, 2, 3], [4, 5, 6]) print(list(results)) # 输出: [5, 7, 9] ``` **4.38.1.2 参数说明** | 参数名 | 类型 | 默认值 | 说明 | | :--- | :--- | :--- | :--- | | `concurrent_num` | `int` | `4` | 最大并发线程数。 | | `qps` | `int` | `100` | 每秒任务处理速率限制。 | | `is_future_direct_ret_result` | `bool` | `True` | `future.result()` 直接返回业务结果还是返回 `FunctionResultStatus` 对象。 | ### 4.38.2 FunboostPool:全能与分布式任务池 `FunboostPool` 继承自 `MemoryFunboostPool`,通过 `BoosterParams` 进行配置,既可以使用内存队列作为增强型本地任务池,也可以使用外部消息队列(如 Redis)实现分布式任务调度,并可使用 `funboost` 的全部控制功能(如重试、去重、持久化等)。 **4.38.2.1 基础用法:本地内存队列** 此用法明确指定 `broker_kind=BrokerEnum.MEMORY_QUEUE`,使 `FunboostPool` 工作在本地内存模式,但可以配置更多控制参数(如重试次数)。 ```python from funboost.core.funboost_pool import FunboostPool from funboost import BoosterParams, BrokerEnum def double_value(x): return x * 2 # 配置本地内存队列任务池 params = BoosterParams( queue_name="local_pool", broker_kind=BrokerEnum.MEMORY_QUEUE, # 明确指定为内存队列 concurrent_num=10, qps=30, max_retry_times=2 # 可配置重试次数 ) pool = FunboostPool(params, ) # 提交任务 future = pool.submit(double_value, 10) print(future.result()) # 输出: 20 ``` **4.38.2.2 进阶用法:Redis 分布式队列** ```python from funboost.core.funboost_pool import FunboostPool from funboost import BoosterParams, BrokerEnum def process_data(data_id): return f"处理结果: {data_id}" # 配置 Redis 分布式任务池 params = BoosterParams( queue_name="distributed_pool", broker_kind=BrokerEnum.REDIS_ACK_ABLE, # 使用 Redis 作为消息队列 concurrent_num=20, qps=50, max_retry_times=3, # do_task_filtering=True # 过滤功能不能和rpc功能一起使用。要使用nb_cache替代funboost自身的do_task_filtering ) pool = FunboostPool(params, is_need_result=True) for i in range(10): future = pool.submit(process_data, i) print(future.result()) # 即使用分布式中间件,结果也能通过 future.result()来获取。 # 此时任务已发布到 Redis,可由任意数量的消费者进程共同处理 ``` **4.38.2.3 参数说明** | 参数名 | 类型 | 默认值 | 说明 | | :--- | :--- | :--- | :--- | | `booster_params` | `BoosterParams` | - | **核心参数**。配置任务队列、中间件类型及所有控制功能。 | | `is_need_result` | `bool` | `False` | 是否需要关心返回任务结果。 | | `is_future_direct_ret_result` | `bool` | `True` | 控制 `future.result()` 的返回类型。 | ### 4.38.3 选择指南 | 特性 | MemoryFunboostPool | FunboostPool | | :--- | :--- | :--- | | **底层驱动** | 固定内存队列 (`MEMORY_QUEUE`) | 可配置,支持内存队列或 Redis/RabbitMQ 等 | | **适用场景** | 单进程内高性能并发,直接替代原生线程池 | 单机增强型或分布式复杂任务调度 | | **配置灵活度** | 低(仅并发数、QPS) | 高(支持所有 `BoosterParams` 参数) | | **API 兼容** | 完全兼容 `ThreadPoolExecutor` | 完全兼容 `ThreadPoolExecutor` | * **`MemoryFunboostPool`**:快速替代原生线程池,获得更智能的并发控制,行为最贴近 `ThreadPoolExecutor`。 * **`FunboostPool`**:构建具备持久化、高可用、分布式和精细化控制能力的任务系统。 ## 4.100 使用funboost时候对框架的疑问和猜测,使用控制变量法 **第一性原理:** 要使用 初中学的最最基本的 “控制变量法” 思想 ,抽象精简成一个 `time.sleep() print('hello')` 的demo来验证你的想法。 funboost 的任务控制功能30多个,只要是用户能想得到的功能funboost全都有,对funboost的任何任务控制参数的猜测验证,100%一定都可以抽象精简成一个 `time.sleep() print('hello')` 的demo来验证你的想法。 只有这样你才好验证你的质疑,不然放在你的业务代码中去验证,你又难以模拟控制业务函数期望发生的情况。 ```python from funboost import boost, BoosterParams @boost(BoosterParams(queue_name='test_queue', ... # 用户修改各种boost的任务控制参数测试你怀疑的控制效果 )) def f(x): time.sleep(10) # 用户修改sleep大小测试因函数耗时造成的猜测 print(f'hello: {x}') return x ``` 举个例子,有用户怀疑并发数量不生效: 用户设置concurrent_num=3,怀疑funboost实际是10个并发运行函数,这太容易验证了,抽象精简成一个 `time.sleep(10) print('hello')` 的demo。 你只需要简单的数一下控制台每隔10秒打印多少次 'hello' 就知道是多少线程在运行函数了。 ``` 比如你怀疑funboost重试次数不生效,你说你的mysql插入数据时候网络连接报错了但是funboost没给你重试,你现在又不方便模拟mysql网络断开极小概率事件, 那你就写个 raise Exception('模拟出错') 的函数,看funboost会不会重试运行就好了。 因为funboost是执行函数,不会改变用户函数内部的代码逻辑。 有的人极端笨脑筋,不知道使用控制变量法写个精简demo验证,仅需不到10行代码而已。连初中生都知道的控制变量法做实验猜测,却到现在这样思维都忘了。 ``` ``` 有的人老是一开始学习funboost就用复杂业务函数逻辑来运行,不好调试,不方便表示自己的用法。 应该把自己的想法抽象成 1个简单的 包含 time.sleep 的函数,用简单demo才比较方便表示自己的疑惑和验证自己的猜测。 用户修改boost装饰器的参数 和 函数的sleep 大小 来 测试你想要验证你对框架功能的猜测。 例如你要测试确认消费,框架的broker_kind 为 redis_ack_able是否能做到消息确认消费不丢失消息,那你就可以发布20个任务,并启动消费, 消费函数里面time.sleep(200),然后你在第100秒时候突然把正在运行的消费脚本强行关闭,你就能看到消息被其他消费进程或机器拿运行了。 或者你只有一个脚本在运行,当你下次重新启动脚本时候这些消息也会被消费。 例如你要测试框架是不是能并发运行,那么运行下面的f函数,你设置10线程,那应该每50秒能打印求和10次,设置并发模式为single_thread那么每50秒能打印求和1次。 ``` ```python from funboost import boost, BoosterParams @boost(BoosterParams(queue_name='test_queue')) # 用户修改boost的参数测试你想要的控制效果 def f(x,y): time.sleep(50) # 用户修改sleep大小测试因函数耗时造成的猜测 print(f'{x} + {y} = {x + y}') return x +y if __name__ == '__main__': for i in range(1000): time.sleep(0.2) f.push(i,i*2) f.consume() ``` ### 4.100.b 举个例子,验证测试框架的超时杀死 function_timeout参数的作用 ``` 有的人老是问超时杀死是不是杀死进程,杀死python脚本。 问的太不用大脑了,默认的 task_fun.consume() 是单进程多线程启动的,如果是杀进程和脚本,那部署脚本相当于自杀结束了,这可能吗? 把脚本杀死了,那就永远无法再消费了,框架怎么可能这么设计为,因为一个函数入参超时而退出程序。 做出这种猜测就不应该了,而且用户自己测试验证这个想法很难吗。 例如下面的求和函数,里面写个sleep,然后设置 function_timeout=20, 框架的各个控制功能都太容易通过写一个简单的sleep 求和函数demo来测试了。 ``` 测试脚本: ```python import random import time from funboost import boost, BoosterParams @boost(BoosterParams(queue_name='test_timeout', concurrent_num=5, function_timeout=20, max_retry_times=4)) def add(x,y): t_sleep = random.randint(10, 30) print(f'计算 {x} + {y} 中。。。。,需要耗费 {t_sleep} 秒时间') time.sleep(t_sleep) print(f'执行 {x} + {y} 的结果是 {x+y} ') return x+y if __name__ == '__main__': for i in range(100): add.push(i,i*2) add.consume() ``` 超时运行的截图  从运行来看就知道了,funboost的function_timeout超时杀死功能,是针对一个正在运行的函数执行参数,是杀死运行中的函数,使函数运行中断结束, 不继续往下运行函数了,不会把自身脚本整个杀死。所以对funboost提供的功能不用猜测,只需要写demo测试就可以了。 ## 4.200 [分布式函数调度框架qq群] 现在新建一个qq群 189603256