# 6.常见问题回答 第6章节是人工编辑,直接统一回复常见问题,编辑时候那时候还没ai大模型。 **2025.09.18 新增说明**: 这种简单问题 ,按文档第14章节,直接把 `funboost_all_docs_and_codes.md` 上传到 `google ai studio` 或者 `腾讯 ima` 知识库,然后提问,准确率很高,ai答案和我的想法基本一模一样. ## 6.0 最最最重要的第一个问题: "funboost这个框架怎么样呀,值得我学习使用吗?" **第一性原理:** 学习和使用一个不值得学习和用途狭窄的框架,就是在浪费时间.
说明:2024.06月以后新增支持了实例方法和类方法作为消费函数,但是这里面的说明仍然值得一看, 看你这里才知道支持实例方法和类方法作为消费函数有多么复杂和实现原理, 使用实例方法和类方法作为消费函数看4.32章节的文档为什么强调是函数调度框架不是类调度框架,不是方法调度框架?你代码里面使用了类,是不是和此框架水火不容了? 问的是consuming_function的值能不能是一个类或者一个实例方法。 ```text 答:一切对类的调用最后都是体现在对方法的调用。这个问题莫名其妙。 celery rq huery 框架都是针对函数。 调度函数而不是类是因为: 1)类实例化时候构造方法要传参,类的公有方法也要传参,这样就不确定要把中间件里面的参数哪些传给构造方法哪些传给普通方法了。 见5.8 2) 这种分布式一般要求是幂等的,传啥参数有固定的结果,函数是无依赖状态的。类是封装的带有状态,方法依赖了对象的实例属性。 3) 比如例子的add方法是一个是实例方法,看起来好像传个y的值就可以,实际是add要接受两个入参,一个是self,一个是y。如果把self推到消息队列,那就不好玩了。 对象的序列化浪费磁盘空间,浪费网速传输大体积消息,浪费cpu 序列化和反序列化。所以此框架的入参已近说明了, 仅仅支持能够被json序列化的东西,像普通的自定义类型的对象就不能被json序列化了。 celery也是这样的,演示的例子也是用函数(也可以是静态方法),而不是类或者实例方法, 这不是刻意要和celery一样,原因已经说了,自己好好体会好好想想原因吧。 框架如何调用你代码里面的类。 假设你的代码是: class A(): def __init__(x): self.x = x def add(self,y): return self.x + y 那么你不能 a =A(1) ; a.add.push(2),因为self也是入参之一,不能只发布y,要吧a对象(self)也发布进来。 add(2)的结果是不确定的,他是受到a对象的x属性的影响的,如果x的属性是100,那么a.add(2)的结果是102. 如果框架对实例方法,自动发布对象本身作为第一个入参到中间件,那么就需要采用pickle序列化,picke序列化对象, 消耗的cpu很大,占用的消息体积也很大,而且相当一大部分的对象压根无法支持pickle序列化。 无法支持序列化的对象我举个例子, import pickle import threading import redis class CannotPickleObject: def __init__(self): self._lock = threading.Lock() class CannotPickleObject2: def __init__(self): self._redis = redis.Redis() print(pickle.dumps(CannotPickleObject())) # 报错,因为lock对象无法pickle print(pickle.dumps(CannotPickleObject2())) # 报错,因为redis客户端对象也有一个属性是lock对象。 以上这两个对象如果你想序列化,那就是天方夜谭,不可能绝对不可能。 真实场景下,一个类的对象包含了很多属性,而属性指向另一个对象,另一个对象的属性指向下一个对象, 只要其中某一个属性的对象不可pickle序列化,那么此对象就无法pickle序列化。 pickle序列化并不是全能的,所以经常才出现python在win下的多进程启动报错, 因为windows开多进程需要序列化入参,但复杂的入参,例如不是简单的数字 字母,而是一个自定义对象, 万一这个对象无法序列化,那么win上启动多进程就会直接报错。 所以如果为了调度上面的class A的add方法,你需要再写一个函数 def your_task(x,y): return A(x).add(y) 然后把这个your_task函数传给框架就可以了。所以此框架和你在项目里面写类不是冲突的, 本人是100%推崇oop编程,非常鲜明的反对极端面向过程编程写代码,但是此框架鼓励你写函数而不是类+实例方法。 框架能支持@staticmethod装饰的静态方法,不支持实例方法,因为静态方法的第一个入参不是self。 如果对以上为什么不支持实例方法解释还是无法搞明白,主要是说明没静下心来仔细想想, 如果是你设计框架,你会怎么让框架支持实例方法? statckflow上提问,celery为什么不支持实例方法加@task https://stackoverflow.com/questions/39490052/how-to-make-any-method-from-view-model-as-celery-task celery的作者的回答是: You can create tasks out of methods. The bad thing about this is that the object itself gets passed around (because the state of the object in worker has to be same as the state of the caller) in order for it to be called, so you lose some flexibility. So your object has to be pickled every time, which is why I am against this solution. Of course this concerns only class methods, s tatic methods have no such problem. Another solution, which I like, is to create separate tasks.py or class based tasks and call the methods from within them. This way, you will have FULL control over Analytics object within your worker. 这段英文的意思和我上面解释的完全一样。所以主要是你没仔细思考想想为什么不支持实例方法。 ``` ## 6.6 是怎么调度一个函数的。 ``` 答:基本原理如下 def add(a,b): print(a + b) 从消息中间件里面取出参数{"a":1,"b":2} 然后使用 add(**{"a":1,"b":2}),就是这样运行函数的。 ``` ## 6.7 框架适用哪些场景? ``` 答:分布式 、并发、 控频、断点接续运行、定时、指定时间不运行、 消费确认、重试指定次数、重新入队、超时杀死、计算消费次数速度、预估消费时间、 函数运行日志记录、任务过滤、任务过期丢弃等数十种功能。 只需要其中的某一种功能就可以使用这。即使不用分布式,也可以使用python内置queue对象。 这就是给函数添加几十项控制的超级装饰器。是快速写代码的生产力保障。 适合一切耗时的函数,不管是cpu密集型 还是io密集型。 不适合的场景主要是: 比如你的函数非常简单,仅仅只需要1微妙 几十纳秒就能完成运行,比如做两数之和,print一下hello,这种就不是分需要使用这种框架了, 如果没有解耦的需求,直接调用这样的简单函数她不香吗,还加个消息队列在中间,那是多此一举。 ``` ## 6.8 怎么引入使用这个框架?门槛高不高? ``` 答:先写自己的函数(类)来实现业务逻辑需求,不需要思考怎么导入框架。 写好函数后把 函数和队列名字绑定传给消费框架就可以了。一行代码就能启动分布式消费。 在你的函数上面加@boost装饰器,执行 your_function.conusme() 就能自动消费。 所以即使你不想用这个框架了,你写的your_function函数代码并没有作废。 不需要去掉 @boost装饰器,函数也能正常直接运行. 所以不管是引入这个框架 、废弃使用这个框架、 换成celery框架,你项目的99%行 的业务代码都还是有用的,并没有成为废物。 别的框架如flask换django,scrapy换spider,代码形式就成了废物。 ``` ## 6.9 怎么写框架? ``` 答: 需要学习真oop和36种设计模式。唯有oop编程思想和设计模式,才能持续设计开发出新的好用的包甚至框架。 如果有不信这句话的,你觉得可以使用纯函数编程,使用0个类来实现这样的框架。 如果完全不理会设计模式,实现threding gevent evenlet 3种并发模式,加上10种中间件类型,实现分布式消费流程, 需要反复复制粘贴扣字30次。代码绝对比你这个多。例如基于nsq消息队列实现任务队列框架,加空格只用了80行。 如果完全反对oop,需要多复制好几千行来实现。 例如没听说设计模式的人,在写完rabbitmq版本后写redis版本,肯定十有八九是在rabbitmq版本写完后,把整个所有文件夹, 全盘复制粘贴,然后在里面扣字母修改,把有关rabbitmq操作的全部扣字眼修改成redis。如果某个逻辑需要修改, 要在两个地方都修改,更别说这是10几种中间件,改一次逻辑需要修改10几次。 我接手维护得老项目很多,这种写法的编程思维的是特别常见的,主要是从来没听说设计模式4个字造成的, 在我没主动学习设计模式之前,我也肯定会是这么写代码的。 只要按照36种设计模式里面的oop4步转化公式思维写代码三个月,光就代码层面而言,写代码的速度、流畅度、可维护性 不会比三年经验的老程序员差,顶多是老程序员的数据库 中间件种类掌握的多一点而已,这个有机会接触只要花时间就能追赶上, 但是编程思维层次,如果没觉悟到,可不是那么容易转变的,包括有些科班大学学过java的也没这种意识, 非科班的只要牢牢抓住把设计模式 oop思维放在第一重要位置,写出来的代码就会比科班好, 不能光学 if else 字典 列表 的基本语法,以前我看python pdf资料时候,资料里面经常会有两章以上讲到类, 我非常头疼,一看到这里的章节,就直接跳过结束学习了,现在我也许只会特意去看这些章节, 然后看资料里面有没有把最本质的特点讲述好,从而让用户知道为什么要使用oop,而不是讲下类的语法,这样导致用户还是不会去使用的。 你来写完包括完成10种中间件和3种并发模式,并且预留消息中间件的扩展。 然后我们来和此框架 比较 实现框架难度上、 实现框架的代码行数上、 用户调用的难度上 这些方面。 ``` ## 6.10 框架能做什么 ``` 答:你在你的函数里面写什么,框架就是自动高可靠分布式并发做什么,没有规定你的函数只能写什么逻辑。 框架在你的函数上加了自动使用消息队列、分布式、自动多进程+多线程(协程)超高并发、qps控频、自动重试。 只是增加了稳定性、扩展性、并发,但做什么任务是你的函数里面的代码目的决定的。 只要是你代码涉及到了使用并发,涉及到了手动调用线程或线程池或asyncio,那么就可以使用此框架, 使你的代码本身里面就不需要亲自操作任何线程 协程 asyncio了。 不需要使用此框架的场景是函数不需要消耗cpu也不需要消耗io,例如print("hello"),如果1微秒就能完成的任务不需要使用此框架。 ``` ## 6.11 日志的颜色不好看或者觉得太绚丽刺瞎眼,想要调整。 ``` 一 、关于日志颜色是使用的 \033实现的,控制台日志颜色不光是颜色代码决定的,最主要还是和ide的自身配色主题有关系, 同一个颜色代码,在pycahrm的十几个控制台颜色主题中,表现的都不一样。 所以代码一运行时候就已经能提示用户怎么设置优化控制台颜色了,文这个问题说明完全没看控制台的提示。 """ 1)使用pycharm时候,建议重新自定义设置pycharm的console里面的主题颜色。 设置方式为 打开pycharm的 file -> settings -> Editor -> Color Scheme -> Console Colors 选择monokai, 并重新修改自定义6个颜色,设置Blue为1585FF,Cyan为06B8B8,Green 为 05A53F,Magenta为 ff1cd5,red为FF0207,yellow为FFB009。 2)使用xshell或finashell工具连接linux也可以自定义主题颜色,默认使用shell连接工具的颜色也可以。 颜色效果如连接 https://imgse.com/i/pkFSfc8 在当前项目根目录的 nb_log_config.py 中可以修改当get_logger方法不传参时后的默认日志行为。 """ 二、关于日志太绚丽,你觉得不需要背景色块,在当前项目根目录的 nb_log_config.py 中可以设置 DISPLAY_BACKGROUD_COLOR_IN_CONSOLE = False # 在控制台是否显示彩色块状的日志。为False则不使用大块的背景颜色。 ``` ## 6.12 是不是抄袭模仿 celery **第一性原理:** `funboost` 是从 `while 1:redis.blpop()` 这个简单直接的模型开始迭代形成的。 从6.12.4章节可以看到, 完全可以抛开道德不谈, 仅从技术可行性上,抄袭celery就是自寻死路. ``` 答:有20种优势,例如celery不支持asyncio、celery的控频严重不精确,光抄袭解决不了。比celery有20项提升,具体看2.4章节 我到现在也只能通过实际运行来达到了解推车celery的目的,并不能直接默读代码就搞懂。 celery的层层继承,特别是层层组合,又没多少类型提示,说能精通里面每一行源码的人,多数是高估自己自信过头了。 celery的代码太魔幻,不运行想默读就看懂是不可能的,不信的人可以把自己关在小黑屋不吃不喝把celery源码背诵3个月, 然后3个月后 试试默写能不能写出来实现里面的兼容 多种中间件 + 多种并发模式 + 几十种控制方式的框架。 这是从一个乞丐版精简框架衍生的,加上36种设计模式付诸实践。 此框架运行print hello函数, 性能强过celery 20倍以上(测试每秒消费次数,具体看我的性能对比项目)。 此框架支持的中间件比celery多 此框架引用方式和celery完全不一样,完全不依赖任何特定的项目结构,celery门槛很高。 ``` ``` 此框架和celery没有关系,没有受到celery启发,也不可能找出与celery连续3行一模一样的代码。 这个是从原来项目代码里面大量重复while 1:redis.blpop() 发散扩展的。 这个和celery唯一有相同点是,都是生产者 消费者 + 消息队列中间件的模式,这种生产消费的编程思想或者叫想法不是celery的专利。 包括我们现在java框架实时处理数据的,其实也就是生产者 消费者加kfaka中间件封装的,难道java人员开发框架时候也是需要模仿一下python celery源码或者思想吗。 任何人都有资格开发封装生产者消费者模式的框架,生产者 消费者模式不是celery专利。生产消费模式很容易想到,不是什么高深的架构思想,不需要受到celery的启发才能开发。 ``` ### 6.12.2 生产者-broker-消费者 不是 celery 的专利 **生产者-broker-消费者 模式是计算机科学中一个非常基础和经典的设计模式,它的历史远比 Celery 悠久得多** 如果认为"只要是使用生产者-消费者模式编程,那么就是抄袭celery",那所有编程语言中的生产者消费者编程方式,都抄袭celery了,说这话简直是不长脑子。 ``` 此框架和celery没有关系,没有收到celery启发,也不可能找出与celery连续3行一模一样的代码。 这个是从原来项目代码里面大量重复while 1:redis.blpop() 发散扩展的。 这个和celery唯一有相同点是,都是生产者 消费者 + 消息队列中间件的模式,这种生产消费的编程思想或者叫想法不是celery的专利。 包括我们现在java框架实时处理数据的,其实也就是生产者 消费者加kfaka中间件封装的,难道java人员也是需要模仿python celery源码吗。 任何人都有资格开发封装生产者消费者模式的框架,生产者 消费者模式不是celery专利。生产消费模式很容易想到,不是什么高深的架构思想,不需要受到celery的启发才能开发。 ``` **连任何线程池都是 生产者-broker-消费者 编程思想,线程池也抄袭celery了吗** 几乎所有线程池都是下面这样来实现的: ``` 1.Broker (中间件/任务通道): 线程池 threadpool 有个 work_queue 属性,work_queue 是个内存队列, work_queue 就是 broker 2.Producer (生产者): threadpool有个submit方法,submit方法原理就是把函数和函数入参put丢到这个 threadpool.work_queue里面,submit就是生产者发送消息 3.Consumer (消费者): 线程池里面开启了n个线程, 每个线程里面逻辑是 while True:fun,params = work_queu.get(block=True) ,然后fun(params) 执行函数。 这n个线程就是n个消费者。 ``` 任何线程池都是这么实现的,那线程池使用 “生产者-broker-消费者“” 编程思想,抄袭了celery吗? rq dramtiq huery 都抄袭了celery吗? java中设计线程池的人听都没听说过celery。所以说funboost抄袭celery,简直是不长脑子 ### 6.12.3 gemini-2.5pro ai大模型来分析, funboost 是不是抄袭粘贴了 celery 源码 用户可以在 `google ai studio` 免费无限白嫖 gemini-2.5pro 大模型,拥有`1000k`上下文窗口, gemini-2.5pro 足够阅读`funboost`教程 + `funboost` 源码. [google ai studio网址: https://aistudio.google.com/app/prompts/new_chat](https://aistudio.google.com/app/prompts/new_chat) 有的人看都不看就开始质疑抄袭复制粘贴,又喜欢质疑但又懒惰不愿意自己看代码,直接把所有源码,丢给有无敌上下文窗口的 `gemini-2.5pro` ai大模型来分析就好了. **人工提问:**
你是一名精通celery框架用法和celery框架源码的资深开发者, 你现在仔细分析文档中的 funboost python源代码,然后对比你掌握的celery项目的源码, 你觉得 funboost是从while true:msg=redis.blpop(queue_name),这个第一性原理的模型开始迭代形成的? 还是用户所质疑的认为funboost是抄袭复制粘贴的celery的源码来形成的? 你需要使用以下方式来进行分析,然后列出多个明显且具体的理由。 1.需要详细分析文档中的所有funboost源代码 2.不要仅仅相信作者写的教程文字的一面之词 3.对比celery框架的源代码 4.需要结合假设如果是抄袭模仿celery源码来实现,难度有多高 请注意要使用中文来回答我。以下是`gemini` ai大模型的回答
有的 `pythoner`总是问 `vscode` `pycharrm 的 debug` 模式为什么导入不了 `funboost_config.py` , 或者抱怨不想配置文件放在项目根目录, 有的人写了三四年python代码,还没听说过 `pythonpath` , 太low了 简直是恨铁不成钢, 只要懂了 `pythonpath` 第一性原理,知道python是按什么依据和顺序导入一个模块,就不会有这种低级疑问. 这个 `pythonpath` 知识点根本和 `funboost` 自身毫无关系,还需要这里来大费周章科普``` 有的人写了三四年的python代码,连PYTHONPATH作用和概念都没听说过,真的很悲剧 如果是下面的 pythonpathdemo是一个python项目根目录,pycharm以项目方式打开这个文件夹。 你会发现run5.py在pycahrm可以运行,在cmd中无法运行,因为无法找到d1包,笨瓜会硬编码操作sys.path.insert非常的愚蠢, 这种笨瓜主要是写代码一意孤行,导致不学习PYTHONPATH。 ```  完整讲解pythonpath重要性文章在: [https://github.com/ydf0509/pythonpathdemo](https://github.com/ydf0509/pythonpathdemo) 一句命令行解决设置 pythonpath 和运行 python脚本 一定要设置的是临时终端会话级pythonpath,不要设置到配置文件写死永久固定环境变量 ``` 例如你的项目根目录是 /home/xiaomin/myproj/ 你的运行起点脚本在 /home/xiaomin/myproj/dir2/dir3/run_consume6.py 一句话就是 linux: export PYTHONPATH=/home/xiaomin/myproj/; python3 /home/xiaomin/myproj/dir2/dir3/run_consume6.py win的cmd:假设代码在d盘先切换到盘符d盘 d:/ 。 然后 set PYTHONPATH=/codes2022/myproj/ & python3 /codes2022/myproj/dir2/dir3/run_consume6.py win10和11的powershell: 假设代码在d盘先切换到盘符d盘 d:/。然后 $env:PYTHONPATH="/codes2022/myproj/"; python3 /codes2022/myproj/dir2/dir3/run_consume6.py 压根不要敲击两次命令行好不。 如果你已经cd切换到项目根目录myproj了,那就 export PYTHONPATH=./;python3 dir2/dir3/run_consume6.py ``` ### 6.18.2 为什么celery scrapy django不需要用户设置pythonpath? ``` 因为这些框架都是固定死用户的项目目录解构,项目运行起点有固定的唯一脚本,而且该脚本在项目跟目录的第一个直接层级。 例如django scrapy ,他的命令行启动,你必须先cd 到项目的根目录再运行命令行。 而此框架是为了兼容用户的cmd命令当前文件夹在任意文件夹下,就可以运行你项目下的任意多层级深层级下的脚本。 用户设置了pythonpath后,可以cd到任意文件夹下,再运行 python /dir1/dir2/xxx.py 任然能正确的import。 如果用户能够保证他要python启动运行的脚本始终是放在了项目的第一层级目录下面,当然可以不用设置 PYTHONPATH了。 ``` ### 6.18.3 怎么指定配置文件读取 funboost_config.py 和nb_log_config.py的文件夹位置 ``` 默认就是放在项目根目录,然后设置 export PYTHONPATH=你的项目根目录, (linux win+cmd win+powershell 设置临时会话的环境变量语法不一样,6.18已经介绍了) 如果要指定读取的配置文件的存放位置为别的文件夹,也很容易,归根结底还是要精通PYTHONPATH的作用。 例如你的项目根目录是 /home/codes/proj ,你不想使用项目根目录下的配置文件,想读取别的文件夹的配置文件作为funboost的中间件配置。 假设你的文件夹是 /home/xiaomin/conf_dir ,里面有 funboost_config.py ,如果你的/home/codes/proj 项目想使用 /home/xiaomin/conf_dir/funboost_config.py 作为配置文件, 那就 export PYTHONPATH=/home/xiaomin/conf_dir:/home/codes/proj 也就是意思添加两个文件夹路径到 PYTHONPATH 因为funboost是尝试导入import funboost_config.py 只要能import 到就能读取到,所以只要你把文件夹添加到PYTHONPATH环境变量就可以了(可以print sys.path来查看这个数组) 归根结底是要懂PYTHONPATH,有的人老是不懂PYTHONPATH作用,不愿意认真看 https://github.com/ydf0509/pythonpathdemo ,非常杯具。 ``` ### 6.18.4 怎么根据不同环境使用不同的funboost_config配置文件? ``` 框架获取配置的方式就是直接import funboost_config,然后将里面的值覆盖框架的 funboost_config_deafult.py 值。 为什么能import 到 funboost_config,是因为要求export PYTHONPATH=你的项目根目录,然后第一次运行时候自动生成配置文件到项目根目录了。 假设你的项目根目录是 /data/app/myproject/ 方案一:利用python导入机制,自动import 有PYTHONPATH的文件夹下的配置文件。 例如你在 /data/config_prod/ 放置 funboost_config.py ,然后shell临时命令行 export PYTHONPATH=/data/config_prod/:/data/app/myproject/,再python xx.py。 (这里export 多个值用:隔开,linux设置环境变量为多个值的基本常识无需多说。) 这样就能自动优先使用/data/config_prod/里面的funboost_config.py作为配置文件了,因为import自动会优先从这里。 然后在测试环境 /data/config_test/ 放置 funboost_config.py,然后shell临时命令行 export PYTHONPATH=/data/config_test/:/data/app/myproject/,再python xx.py。 这样测试环境就能自动使用 /data/config_test/ 里面的funboost_config.py作为配置文件了,因为import自动会优先从这里。 方案二: 直接在funboost_config.py种写if else,if os.get("env")=="test" REDIS_HOST=xx ,if os.get("env")=="prod" REDIS_HOST=xx , 因为配置文件本身就是python文件,所以非常灵活,这不是.ini或者 .yaml文件只能写静态的死字符串和数字, python作为配置文件优势本来就很大,里面可以写if else,也可以调用各种函数,只要你的模块下包含那些变量就行了。 ``` ### 6.18.5 多个ptyhon项目怎么使用同一个funboost_config.py 作为配置文件 ``` 还是因为不懂 PYTHONPATH 造成的,需要我无数次举例说明。 这样太low了,这么多python人员到现在还不知道 PYTHONPATH作用, python导入一个模块是怎么去查找的。 6.18 开头就说了 pythonpathdemo 项目连接,有的人不懂PYTHONPATH又不看这个博客。死猪不怕开水烫,永远不学习 PYTHONPATH 的强大作用。 ``` ``` 假设你想每隔项目都使用 /data/conf/funboost_config.py 这一个相同的 funboost_config 作为配置文件, 你有两个python项目在 /data/codes/proj1 和 /data/codes/proj2, 你运行proj1的项目脚本前,只需要 export PYTHONPATH=/data/conf/:/data/codes/proj1 ,然后运行proj1项中的脚本 python dir1/dir2/xx.py 你运行proj2的项目脚本前,只需要 export PYTHONPATH=/data/conf/:/data/codes/proj2 ,然后运行proj2项中的脚本 python dir3/dir4/yy.py 因为你设置了/data/conf/ 为 pythonpath后,那么funboost在 import funboost_config 时候就能自动 import 到 /data/conf/下的 funboost_config.py 模块了。 funboost控制台都打印了 读取的是什么文件作为配置文件了。 归根结底问这个问题的人是完全不懂 PYTHONPATH. ``` ## 6.19 定时任务或延时任务报错 RuntimeError: cannot schedule new futures after interpreter shutdown ``` 如下图所示,运行定时任务或延时任务在高版本python报错,可以在代码末尾加个while 1:time.sleep(10) 因为程序默认是 schedule_tasks_on_main_thread=False,为了方便连续启动多个消费者消费, 没有在主线程调度运行,自己在代码结尾加个不让主线程结束的代码就行了。 在消费启动的那个代码末尾加两行 while 1: time.sleep(10) ```  ## 6.21 支不支持redis cluster集群模式的redis作为消息队列? funboost可以redis cluster集群。 框架支持kombu,支持 celery作为broker_kind。 阿里云分片集群,用户完全不用管是单机reids还是分片集群,代码连接方式一样。 ## 6.22 怎么使用tcp socket 作为消息队列 ``` 见 4.35章节,或者在文档的搜索框输入 tcp 或者 socket 就能搜到了。 或者按照文档第14章节使用ai来帮你学习。 ``` ## 6.23 安装包时候自动安装的三方依赖包太多? ``` 1.安装第三方包是自动的,又不需要手动一个个指令安装,安装多少三方包都没关系。 2.所有三方包加起来还不到30M,对硬盘体积无影响。 3.只要指定阿里云pip源安装,就能很快安装完,30秒以内就安装完了,又不是需要天天安装。 如果如你的不一致报错终端,pip 命令加上 --use-feature=2020-resolver pip install funboost -i https://mirrors.aliyun.com/pypi/simple/ 4.三方包与自己环境不一致问题? 用户完全可以自由选择任何三方包版本。例如你的 sqlalchemy pymongo等等与框架需要的版本不一致,你完全可以自由选择任何版本。 我开发时候实现了很多种中间件,没有时间长期对每一种中间件三方包的每个发布版本都做兼容测试,所以我固定死了。 用户完全可以选择自己的三方包版本,大胆点,等报错了再说,不出错怎么进步,不要怕代码报错,请大胆点升级你想用的版本。 如果是你是用你自己项目里面的requirements.txt方式自动安装三方包,我建议你在文件中第一行写上 funboost,之后再写其它包 这样就能使用你喜欢的版本覆盖funboost框架依赖的版本了。 等用的时候报错了再说。一般不会不兼容报错的请大胆点。 5.为什么要一次性安装完,而不是让用户自己用什么再安装什么? 是为了方便用户切换尝试各种中间件和各种功能时候,不需要自己再亲自一个个安装第三方包,那样手动一个个安装三方包简直是烦死了。 2024 5月份,精简了依赖包,部分包改为选装, pip install funboost[all] 才安装全部中间件. ``` ### 6.23.b 作者为什么不开发pip 选装方式?例如实现选装 pip install funboost[rabbitmq] ``` 这个你是怎么知道funboost作者没有使用选装方式的? 你是怎么知道作者没有掌握 pip 中括号选装依赖包 技术方式的? 用户可以看看setup.py里面的 extras_require里面,有没有开发选装方式? pip funboost[all] 才是安装所有依赖. 作者去掉依赖很容易,已经实现了, funboost/factories/broker_kind__publsiher_consumer_type_map.py 中的 regist_to_funboost 就是动态导入生产者消费者,很容易去掉各种三方包依赖, 但是很容易安装的三方包,我是不会去做成选装的,没有那个必要,自己设置pip 国内源,30秒就能安装完成funboost了,不需要去纠结这个依赖包多少的问题. ``` ## 6.24 funboost框架从消息队列获取多少条消息?有没有负载均衡? ``` funboost 每个消费者进程会从消息队列获取 并发个数 n + 10 条消息,每个消费者实现有差异,一般不会超过并发数2倍。 所以不会造成发布10万条消息后,再a b机器启动2个消费,b机器一直无法消费,全部a机器消费,不会出现这种情况。 如果你只发布6条消息,先在a机器启动消费,下一秒启动b机器,那很有可能b机器无法获取到消息。只要消息数量够多,不会出现忙的忙死,闲的闲死。 例如框架的默认并发方式使线程池,内置了一个有10大小的界队列queue,同时还有n个并发线程正在运行消息,所以每个消费者会获取很多消息在python内存中。 但不会出现一个消费者进程获取了1000条以上的消息,导致站着某坑不拉屎,别的消费进程没办法消费的情况。 如果你是重型任务,希望不预取,每台机器只获取一条消息运行,可以设置并发模式为 SINGLE_THREAD 模式, boost装饰器设置 concurrent_mode=ConcurrentModeEnum.SINGLE_THREAD,这样在a b 两台机器都没有内存缓冲队列,只会一次获取一条消息执行, 有的broker_kind实现时候为了运行快,框架使用了批量拉取消息, 需要设置批量拉取的数量为1。 ``` 消费文件 test_frame\test_redis_ack_able\test_load_balancing_consume.py ```python import logging import time from funboost import boost, BrokerEnum,BoosterParams,ctrl_c_recv,ConcurrentModeEnum @boost(BoosterParams(queue_name='test_load_balancing', broker_kind=BrokerEnum.REDIS_ACK_ABLE, log_level=logging.INFO, # 切记是设置concurrent_mode并发模式为 SINGLE_THREAD ,而不是使用线程池模式然后把 concurrent_num 设置为1,这两个在多进程拉取消息的绝对均衡方面有区别 # 因为即使线程池并发为1,线程池还有内部的work_queue会拉取缓冲一部分消息,导致出现你认为的多机器或多进程拉取消息不均衡 concurrent_mode=ConcurrentModeEnum.SINGLE_THREAD, # pull_msg_batch_size 这行很关键,REDIS_ACK_ABLE 因为默认是拉取100个消息, # 对于重型任务,如果你需要每台机器都严格只运行一个消息,就需要设置批量拉取1个消息,不要一台机器就把消息队列掏空了。 broker_exclusive_config={'pull_msg_batch_size': 1}, )) def test_load_balancing(x): print(x) time.sleep(1) if __name__ == '__main__': test_load_balancing.consume() ctrl_c_recv() ``` 发送消息文件 test_frame\test_redis_ack_able\test_load_balancing_consume.py ```python from test_frame.test_redis_ack_able.test_load_balancing_consume import test_load_balancing if __name__ == '__main__': for i in range(80): test_load_balancing.push(i) ``` 启动2次消费文件,就能看到2个控制台,每个控制台每次只获取1条消息并运行,如果你不设置 pull_msg_batch_size,那么默认是批量拉取100个,而你总共才发布80个消息 所以你只能看到一个控制台消费,你误以为没有负载均衡。 如果你不设置 pull_msg_batch_size,那么就可以发布5000个消息来测试消费负载均衡,两个控制台就都会运行,因为默认批量拉取100个也不会一下子把5000消息都取到内存。 ## 6.25 funboost消费启动后,按ctrl + c 无法结束代码? ```python from funboost import ctrl_c_recv if __name__ == '__main__': # 启动消费 consume_func.consume() ctrl_c_recv(confirmation_count=1) # 在代码最末未加个 ctrl_c_recv() # 想结束代码就连续按 confirmation_count 次 ctrl +c 就好了。 为什么是 confirmation_count次,是防止你误操作了ctrl + c ``` ## 6.25b `ctrl_c_recv` 到底要不要加?—— 直接看效果 win和linux表现不一样: linux 代码末尾不加ctrl_c_recv(),按ctrl +c 可以结束程序。win 不加ctrl_c_recv(),按ctrl +c 无法结束程序。 **先给出正确的结论:** 你即使程序最末尾不加 ctrl_c_recv(),funboost消费程序也会永久持续运行,控制台也会不断打印日志和 `print` 输出。 严禁认为 ctrl_c_recv() 的作用是“优雅退出”或“防止程序结束”。实际上,不加它程序也会一直运行 真实作用:在 Windows 下如果不加这行代码,按 Ctrl+C 无法停止程序。加上后即可正常。 很多开发者(包括一些AI)容易对 `ctrl_c_recv` 的作用产生两种误解: - **误解一**:以为不加 `ctrl_c_recv`,程序就会瞬间结束,根本不会消费消息。 - **误解二**:以为不加 `ctrl_c_recv`,脚本会自动变成 `nohup` 那样的后台任务,关掉终端窗口还能继续跑。 **❌ 这些都是错误的!** ### 6.25b.0 ctrl_c_rev的源码很简单,很容易看懂。 ctrl_c_recv 的源码非常简单,它不是“优雅退出”的实现——没有等待任务完成、没有清理资源、没有发送任何信号给工作线程或消息队列,funboost不需要优雅退出,是依靠mq本身的确认消费来防止丢消息。 ```python def ctrl_c_recv(confirmation_count=1): """ 程序最末尾加 ctrl_c_recv() 主要是为了主线程持续在运行,方便你在 windows系统下敲击键盘 ctrl + c 可以停止程序而已。 你即使程序最末尾不加 ctrl_c_recv(),funboost消费程序也会永久持续运行,控制台也会不断打印日志和 `print` 输出。 加与不加的详细区别,可以看教程6.25b章节 `## 6.25b `ctrl_c_recv` 到底要不要加?—— 直接看效果` 你也可以不用ctrl_c_recv(), 直接在你的启动脚本文件的最末尾加上: while 1: time.sleep(100) 也能达到主线程在持续运行的目的,从而在windows系统下敲击键盘 ctrl + c 可以停止程序。 """ for i in range(confirmation_count): while 1: try: time.sleep(2) except (KeyboardInterrupt,) as e: # time.sleep(2) print(f'{type(e)} 你按了ctrl c ,程序退出, 第 {i + 1} 次', flush=True) # time.sleep(2) break os._exit(44) ``` 下面我们直接列出**加**与**不加**的真实效果,不涉及底层原理,只讲你实际能看到的现象。 --- ### 6.25b.1 ✅ 情况A:代码末尾加了 `ctrl_c_recv()` ```python from funboost import ctrl_c_recv task.consume() # 启动消费 ctrl_c_recv() # 末尾加上这一行 ``` - **启动后**:程序会持续一直运行,控制台会**持续打印日志和 `print` 输出**。 - **如果想停止**:你只需在当前会话终端窗口,在键盘上**按几次 `Ctrl + C`**(默认是1次),程序就会立刻结束。 - **关掉终端窗口**:如果直接关闭终端窗口,程序也会随之结束(这是所有前台程序的默认行为)。 --- ### 6.25b.2 ❌ 情况B:代码末尾**没有**加 `ctrl_c_recv()` ```python task.consume() # 启动消费 # 后面没有 ctrl_c_recv ``` - **启动后**:程序**同样会一直运行**,控制台也会**不断打印日志和 `print` 输出**。**代码绝对不会瞬间结束!** - **如果想停止**:你在当前会话终端窗口,疯狂按 `Ctrl + C` **毫无反应**,程序完全不理你。 - **怎么办**:只能通过以下方式结束程序: - **直接关闭整个终端窗口**(窗口关闭会杀死所有前台进程)。 - 或者在另一个终端里用 `kill -9` 命令杀死进程ID。 - **关掉终端窗口**:如果直接关闭终端窗口,程序同样会结束,**不会像 `nohup` 那样在后台继续运行**。 --- ### 6.25b.3 效果对比表(一目了然) | 你的操作 | 加 `ctrl_c_recv` 的效果 | 不加 `ctrl_c_recv` 的效果 | 加与不加是否有区别? | | :--- | :--- | :--- | :--- | | **启动脚本后** | 程序持续运行,控制台一直打印日志 | **程序同样持续运行,控制台一直打印日志** | ❌ 无区别 | | **按 `Ctrl + C`** | **程序立即退出** | **毫无反应,程序继续运行** | ✅ 有区别 | | **直接关掉终端窗口** | 程序随之结束 | 程序随之结束 | ❌ 无区别 | | **是否像 `nohup` 一样后台运行** | ❌ 否 | ❌ 否 | ❌ 无区别 | --- ### 6.25b.4 最终结论 - **不加 `ctrl_c_recv()` 的最大坏处**:你无法用 `Ctrl + C` **方便地**停止程序,只能粗暴地关窗口或杀进程。 - **既然程序无论如何都会一直运行,为什么不加上 `ctrl_c_recv()` 给自己留一个方便的“停止开关”呢?** **建议:在所有需要交互式停止的 Funboost 脚本末尾,都加上这一行。** ## 6.25b.5 顺便说一下,文档中其他地方提到的 `ctrl_c_recv` 和 apscheduler 的定时器之间的关系 很多开发者(包括AI)会疑惑:文档其他地方说用定时任务时必须加 `ctrl_c_recv()` 或 `while 1: time.sleep(10)`,否则会报错 `RuntimeError: cannot schedule new futures after interpreter shutdown`,但这里又说可以不加,到底哪个是对的? 其实答案很简单:**取决于你用的 Funboost 版本**。 ### 6.25b.5.1 APScheduler 的两种原生定时器 众所周知,APScheduler 有两种常用定时器: - **`BlockingScheduler`**:阻塞型。调用 `start()` 后,代码会卡住不动,后续代码根本没有机会运行。 - **`BackgroundScheduler`**:非阻塞型。调用 `start()` 后,代码会继续往下执行,定时任务在**守护线程**中运行。 ### 6.25b.5.2 早期版本(2025年之前)的问题 Funboost 早期为了能让用户连续写启动多个函数消费 `fun_xx.consume()` 和连续启动多个定时器 `scheduler_xx.start()` 而不阻塞,选择了继承 `BackgroundScheduler` 作为定时器基类。 原生的 `BackgroundScheduler` 有一个特点:它的工作线程是**守护线程**(`_daemon=True`)。 当你的脚本执行完所有代码,主线程结束后,守护线程会被 Python 强行终止。此时如果定时器正在尝试向线程池提交新任务,就会触发: ```python RuntimeError: cannot schedule new futures after interpreter shutdown ``` 所以早期文档里才会反复强调:**如果使用了定时任务,必须在脚本末尾加上 `ctrl_c_recv()` 或 `while 1: time.sleep(10)`,让主线程一直活着,避免报错。** ### 6.25b.5.3 2025年之后的改进 现在 `FunboostBackgroundScheduler` 已经重写了 `_daemon` 属性,**显式将其设为 `False`**。这意味着: - 定时器工作线程变成了**非守护线程**。 - 即使主线程执行完毕,这些非守护线程也会让进程继续保持运行,不会触发上述报错。 **所以从 2025 年以后的 Funboost 版本开始,即使你用了定时任务,也完全不需要加 `ctrl_c_recv()` 或 `while 1: time.sleep(10)`,程序就能正常运行且不会报错。** ### 6.25b.5.4 三种定时器的行为对比 | 定时器类型 | 是否阻塞 | 线程类型 | 不加 `ctrl_c_recv` 会怎样? | | :--- | :--- | :--- | :--- | | **原生 `BlockingScheduler`** | 阻塞 | - | `start()` 后卡住,后续代码根本跑不到,没有这个问题 | | **原生 `BackgroundScheduler`** | 非阻塞 | **守护线程** | 主线程结束后,守护线程被强杀,会报 `RuntimeError` | | **`FunboostBackgroundScheduler`** | 非阻塞 | **非守护线程** | 主线程结束后,非守护线程继续运行,**不会报错** | ### 6.25b.5.5 总结 - 如果你用的是 **2025年之后的 Funboost**,使用定时任务时**可以不加** `ctrl_c_recv()`,程序会一直运行,定时器也能正常工作。 - 如果你用的是**旧版本**,或者不确定版本,**建议还是加上** `ctrl_c_recv()` 或 `while 1: time.sleep(10)` 保平安。 - 当然,即使在新版本中,加上 `ctrl_c_recv()` 也**没有副作用**,还能让你方便地用 `Ctrl + C` 停止程序,所以加上也没什么坏处。 ## 6.26 ASYNC 并发模式,异步函数操作数据库/aiohttp连接池报错 `attached to a different loop`(是用户不看文档和boost的入参说明造成) - **第一性原理**:对于跨线程玩loop 协程,如果用户经常这样编程的一定遇到过这种经典的坑,遇到了才能使自己的asyncio loop知识更上一层楼; 如果一个pythoner,一直只在主线程玩一个loop,是很难真正掌握asyncio的知识的,尤其是不懂loop。 - 使用指定的 `specify_async_loop` 即可解决. 这个放在自问自答 6.26 章节有源码注释演示,太多人不看文档,不看boost装饰器的入参解释说明.不看 `specify_async_loop` 入参解释造成的. - asyncio的经典报错 `attached to a different loop` 和 `context manager should be used inside a task` 和`attached to a different loop` 报错原因: 这个错误通常发生在 对象(如 Future、Task、协程)在一个事件循环中创建,却在另一个事件循环中被使用。 - `RuntimeError: Timeout context manager should be used inside a task` 报错原因: 异步上下文管理器(如timeout)不在Task中运行 - 很多人在async def消费函数中去操作http连接池 发请求,操作数据库连接池查询数据,在funboost中报错. 根本原因是用户不传递指定 `specify_async_loop`, 如果不传递,funboost是独立线程中启动了一个新的loop, 用户的连接池绑定的是主线程的loop,如果使用子线程的新loop去使用这个连接池查询数据库,那就会导致报错. **只在主线程中操作`asyncio`协程的pythoner,永远无法理解这个知识点,需要用户多练习在子线程去操作数据库连接池或者http连接池,才能踩坑积累经验.** 用户可以结合看4b.3章节 **用户需要始终知道 ASYNC 并发模式 第一性原理,才知道怎么根本解决问题**
切记切记,funboost 的 ASYNC 并发模式的核心底层原理, funboost,当用户使用 ASYNC 并发模式时候,是自动使用 AsyncPoolExecutor 来执行用户函数, AsyncPoolExecutor 源码在 funboost/concurrent_pool/async_pool_executor.py AsyncPoolExecutor 原理是启动了一个线程,这个线程会使用传递的specify_async_loop,如果不传递就会新创建1个loop, 这个loop会运行指定的 concurrent_num 个cron 协程,去运行用户的消费函数逻辑 也就是说AsyncPoolExecutor线程只有一个,loop只有一个,真正的一个loop并发运行cron协程 用 loop2 去运行 loop1 创建的task,不报错才怪- **即使是funboost 的 thereding 并发模式也可以直接运行async def 函数,funboost对async的支持完爆celery**
funboost 的 thereding 并发模式也可以直接@boost装饰加到async def 函数,
因为funboost是特制的神级别线程池,能自动运行async def 函数.
但这个模式下简单粗暴,会开启 concurrent_num 个线程,每个线程有自己的loop,去运行用户的async函数,
相当于是每个loop只并发运行一个cron协程,不是真asyncio级别并发
相当于是有无数线程,每个线程一个loop,去运行用户的async函数
funboost的 thereding 并发模式运行async,就是celery 任务中强行使用 asyncio.new_event_loop().run_until_complete(async函数)
但是funboost是特制的神级别线程池 FlexibleThreadPool,这个线程池自动可以运行async def 函数,
无需用户为了运行async def 函数,去手写一个脱了裤子放屁的 同步def 函数,
也就是用户无需这种脱了裤子放屁的写法:
@boost(BoosterParams(queue_name='test_async_queue2', concurrent_mode=ConcurrentModeEnum.THREADING))
def 同步fun(x,y)
asyncio.new_event_loop().run_until_complete(async异步函数(x,y)) #同步里面调用异步函数
用户可以直接使用THREADING 并发模式加到 async def async异步函数:
@boost(BoosterParams(queue_name='test_async_queue2', concurrent_mode=ConcurrentModeEnum.THREADING))
async def async异步函数(x,y)
pass
celery的运行协程才需要脱落裤子放屁再加个同步函数里面run_until_complete调用异步函数,才能@app.task
### 6.26.1 演示aiohttp连接池,解决报错 `RuntimeError: context manager should be used inside a task`
演示aiohttp连接池,解决报错代码如下,用户需要看里面的每个代码注释.
```python
"""
此脚本主要是演示,在funboost自动的单独的子线程的loop中,如何使用asyncio的异步socket池,来发送异步请求。
核心是要把主线程的loop传递到子线程中,让子线程和主线程使用同一个loop来运行异步函数,
在funboost 中 通过 specify_async_loop=主线程loop 来传递.
跨线程的不同loop,使用在主线程的loop生成的异步连接池 来发请求是不行的.
不管是任何三方包的socket池,例如 aiomysql aioredis aiohttp httpx 创建的http或者数据库连接池,
都不能在子线程的loop的异步函数中直接去使用这个连接池中的连接发请求.
有些异步三方包的连接池能直接在子线程的loop去使用连接发请求或查询数据库而不报错,是因为惰性生成的.
"""
"""
用户一定要搞清楚 线程和loop的绑定关系
一定要知道为什么不同的loop,不能操作同一个异步连接池发请求或者查询数据库.
用户一定要多写子线程的loop调用连接池发请求测试demo,这个和funboost本身无关,
用户的asyncio知识体系太差,用户只会在主线程使用loop,导致对loop和线程绑定关系不懂,对不同loop操作一个连接池不懂.
在子线程运行异步函数的loop,比在主线程的loop运行难得多,坑也更多.用户需要多写demo例子测试练习,多问ai大模型.
"""
# 例如子线程的loop去使用主线程loop绑定的http连接池发请求,会报错如下. 数据库连接池同理也会报错.
"""
Traceback (most recent call last):
File "D:\codes\funboost\funboost\consumers\base_consumer.py", line 929, in _async_run_consuming_function_with_confirm_and_retry
rs = await corotinue_obj
File "D:\codes\funboost\test_frame\test_async_consumer\test_child_thread_loop_use_asyncio_sokcet_pool.py", line 64, in async_f2
async with ss.request('get', url=url) as resp: # 如果是这样请求,boost装饰器必须指定specify_async_loop,
File "D:\ProgramData\Miniconda3\envs\py39b\lib\site-packages\aiohttp\client.py", line 1425, in __aenter__
self._resp: _RetType = await self._coro
File "D:\ProgramData\Miniconda3\envs\py39b\lib\site-packages\aiohttp\client.py", line 607, in _request
with timer:
File "D:\ProgramData\Miniconda3\envs\py39b\lib\site-packages\aiohttp\helpers.py", line 636, in __enter__
raise RuntimeError("Timeout context manager should be used inside a task")
RuntimeError: Timeout context manager should be used inside a task
"""
from funboost import boost, BrokerEnum,ConcurrentModeEnum,ctrl_c_recv
import asyncio
import aiohttp
import time
url = 'http://mini.eastday.com/assets/v1/js/search_word.js'
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # 这是重点,设置主线程默认的loop 为你创建的loop
# 这是重点,ss和主线程的loop绑定了. # 新版本aiohttp必须放在协程函数中去实例化,不能全局变量实例化,参考下面的 aiomysql
ss = aiohttp.ClientSession(loop=loop)
@boost(BoosterParams(queue_name='test_async_queue1', concurrent_mode=ConcurrentModeEnum.ASYNC, broker_kind=BrokerEnum.REDIS,
log_level=10,concurrent_num=3,
specify_async_loop=loop, # specify_async_loop传参是核心灵魂代码,不传这个还要使用主loop绑定的连接池就会报错.
is_auto_start_specify_async_loop_in_child_thread=False,
))
async def async_f1(x):
"""
这个函数是自动被funboost在一个单独的子线程中的loop运行的,loop会并发运行很多协程来执行async_f1的逻辑
用户最最需要明白的是,在用funboost的 ConcurrentModeEnum.ASYNC时候,你不是在主线程中操作的异步函数,而是子线程的loop中调用的.
假设如果是在主线程中去运行的,你怎么可能连续丝滑启动多个函数消费 f1.consume() f2.consume() f3.consume() ? 用脑子想想就知道不是主线程去调用异步函数的.
"""
# 如果是async with ss.request('get', url=url)使用主线程loop的连接池发请求,boost装饰器必须指定 specify_async_loop,
# 如果你不使用ss连接池,而是 async with aiohttp.request('GET', 'https://httpbin.org/get') as resp: 那就不需要指定 specify_async_loop
async with ss.request('get', url=url) as resp:
text = await resp.text()
print('async_f1', x, resp.url, text[:10])
await asyncio.sleep(5)
return x
@boost(BoosterParams(queue_name='test_async_queue2', concurrent_mode=ConcurrentModeEnum.ASYNC, broker_kind=BrokerEnum.REDIS,
log_level=10,concurrent_num=3,
# specify_async_loop=loop, # specify_async_loop传参是核心灵魂代码,连接池不传这个还要使用主loop绑定的连接池就会报错.
is_auto_start_specify_async_loop_in_child_thread=False,
))
async def async_f2(x):
# 如果是async with ss.request('get', url=url)使用主线程loop的连接池发请求,boost装饰器必须指定 specify_async_loop,
# 如果你不使用ss连接池,而是 async with aiohttp.request('GET', 'https://httpbin.org/get') as resp: 那就不需要指定 specify_async_loop
async with aiohttp.request('get', url=url) as resp:
text = await resp.text()
print('async_f2', x, resp.url, text[:10])
await asyncio.sleep(5)
return x
async def do_req(i):
async with ss.request('get', url=url) as resp: # 如果是这样请求,boost装饰器必须指定specify_async_loop,
text = await resp.text()
print(f'主线程的loop运行的{i}:',text[:10])
await asyncio.sleep(3)
if __name__ == '__main__':
async_f1.clear()
async_f2.clear()
for i in range(10):
async_f1.push(i)
async_f2.push(i*10)
async_f1.consume()
async_f2.consume()
# time.sleep(5) 加这个是测试主线程的loop和子线程loop谁先启动,造成的影响,如果子线程的specify_async_loop先启动,主线程下面的 loop.run_forever() 会报错已启动 RuntimeError: This event loop is already running
main_tasks = [loop.create_task(do_req(i)) for i in range(20)]
loop.run_forever() # 如果你除了要在funboost运行异步函数,也要在自己脚本调用,那么装饰器配置 is_auto_start_specify_async_loop_in_child_thread=False,,自己手动 启动loop.run_forever()
ctrl_c_recv()
```
如果消费async def的消费函数中:
如果是async with ss.request('get', url=url)使用主线程loop的连接池发请求,boost装饰器必须指定 specify_async_loop,
如果你不使用ss连接池,而是 async with aiohttp.request('GET', 'https://httpbin.org/get') as resp: 那就不需要指定 specify_async_loop
用户一定要好好体会下原因,为什么会这样,才能更懂asyncio
### 6.26.2 演示 aiomysql 连接池在funboost使用,解决 `attached to a different loop`
用户需要认真看代码里面的文字注释,为什么出现 `attached to a different loop` 经典报错
代码例子:
```python
"""
此脚本演示, funboost的子线程的loop中怎么操作aiomysql 连接池.
演示了2种方式使用aiomysql连接池
方式一:
非常推荐方式一
async_aiomysql_f1 使用主线程的连接池,.
核心灵魂代码是 async_aiomysql_f1 的装饰器需要传参specify_async_loop=主线程loop
如果 async_aiomysql_f1 不指定specify_async_loop,就会出现经典报错 attached to a different loop ,
子线程的loop去操作主线程loop的连接池,这是大错特错的.
有的人压根不懂主线程loop和子线程loop,还非要装逼使用asyncio编程生态,
不精通asyncio编程生态的人应该老老实实使用同步多线程编程生态,简单多了.
因为funboost的线程池 FlexibleThreadPool能自动扩缩,
能自动缩容是吊打内置线程池 concurrent.futures.threadpoolexecutor的神级别操作,
FlexibleThreadPool 性能比官方内置线程池提高了250%
-----------------------------------------------------------------------
方式2:
不推荐方式2,浪费连接池宝贵资源。
async_aiomysql_f2_use_thread_local_aio_mysql_pool 使用 thread local 级别的全局变量连接池
这样子线程的loop避免了操作主线程loop的aiomysql连接池, 不会出现张冠李戴 attached to a different loop
"""
import threading
from funboost import boost, BrokerEnum, ConcurrentModeEnum, ctrl_c_recv, BoosterParams
import asyncio
import time
import aiomysql
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # 这是重点
DB_CONFIG = {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': '123456',
'db': 'testdb',
'charset': 'utf8mb4',
'autocommit': True
}
g_aiomysql_pool : aiomysql.Pool
async def create_pool():
pool = await aiomysql.create_pool(**DB_CONFIG, minsize=1, maxsize=10,) # 这些是重点.
global g_aiomysql_pool
g_aiomysql_pool = pool
return pool
# 如果 async_aiomysql_f1 不指定specify_async_loop,就会出现经典报错 attached to a different loop ,子线程的loop去操作主线程loop的连接池
r"""
Traceback (most recent call last):
File "D:\codes\funboost\test_frame\test_async_consumer\test_child_thread_loop_aiomysql_pool.py", line 46, in async_aiomysql_f1
await cur.execute("SELECT now()")
File "D:\ProgramData\Miniconda3\envs\py39b\lib\site-packages\aiomysql\cursors.py", line 239, in execute
await self._query(query)
File "D:\ProgramData\Miniconda3\envs\py39b\lib\site-packages\aiomysql\cursors.py", line 457, in _query
await conn.query(q)
File "D:\ProgramData\Miniconda3\envs\py39b\lib\site-packages\aiomysql\connection.py", line 469, in query
await self._read_query_result(unbuffered=unbuffered)
File "D:\ProgramData\Miniconda3\envs\py39b\lib\site-packages\aiomysql\connection.py", line 683, in _read_query_result
await result.read()
File "D:\ProgramData\Miniconda3\envs\py39b\lib\site-packages\aiomysql\connection.py", line 1164, in read
first_packet = await self.connection._read_packet()
File "D:\ProgramData\Miniconda3\envs\py39b\lib\site-packages\aiomysql\connection.py", line 609, in _read_packet
packet_header = await self._read_bytes(4)
File "D:\ProgramData\Miniconda3\envs\py39b\lib\site-packages\aiomysql\connection.py", line 657, in _read_bytes
data = await self._reader.readexactly(num_bytes)
File "D:\ProgramData\Miniconda3\envs\py39b\lib\asyncio\streams.py", line 723, in readexactly
await self._wait_for_data('readexactly')
File "D:\ProgramData\Miniconda3\envs\py39b\lib\asyncio\streams.py", line 517, in _wait_for_data
await self._waiter
RuntimeError: Task