7.更新记录
7.0 很小的更新,对api使用完全无变化或者无增加新功能的不写更新记录。
7.1 新增第十种Consumer,以redis为中间件,但增加了消费确认,是RedisConsumerAckAble类。
支持运行过程中,随意关闭和启动python程序。无惧反复关闭python和 突然断电导致任务丢失几百个。
之前开100线程/协程的话,随意重启python和断电会导致极大概率丢失200个任务。
官方Threadpoolexecutor是无界队列。使用这个会导致丢失无数个任务,
因为他会迅速把redis的消息全部取出来,添加到自己的queue队列慢慢消费。
因为这个原因所以需要自定义写BoundedThreadpoolexecutor和CustomThreadpoolexecutor。
改版的CustomThreadpoolexecutor修改成了queue最大长度是max_works,自己内部存储100个,
运行中100个,突然关闭python会丢失200个任务。如果queue设置大小为0,则只会丢失100个运行中的任务。
采用的是消费者去除消息时候,用lua脚本同时pop和添加到unacked的独立zset中,函数运行成功后会从set中删除该任务。
同时有一个一直每隔5秒发送心跳到redis服务中的线程,心跳标识中有消费者的唯一标识,绝对不会重复。
如果突然关闭消费者(例如突然断电或者点击关闭python),那么该消费者的心跳将会停止了。这时其他机器的同队列消费者或者当前机器重新启动代码后,在15秒后会
检到被关闭的消费者是非活跃消费者,那么自动将该消费者的unack里面任务全部重新取出返回到待消费队列中。
RedisConsumerAckAble类比RedisConsumer会有一丝丝性能损耗,但python玩redis大部分情况还是python代码本身有性能瓶颈,
而不是造成redis服务端有性能瓶颈,一般只要用在有意义的业务上,就算python很忙把cpu占光了,也不会造成redis服务端达到极限,
python是性能很差的语言,没玩垮redis,自身就把电脑玩死了,所以大部分情况下不要在意加入确认消费后产生额外的对redis服务端的性能压力。
redis要是能直接作为mq使用,redis早就一统天下了,哪里还不断有几十种mq出来。
所以直接基于redis list的如果要做到可靠就必须改进。
7.2 新增基于以redis为消息中间件时候的页面管理和消费速度显示。
基于redisboard,但对redis的list模拟mq功能,进行页面显示优化突出消息队列消费,
加黄显示正在运行中的队列和每10秒的消费速度。每隔10秒自动刷新统计。
由于实时发布和消费,例如10秒内发布20个,消费50个,页面只能显示大小降低了30个,
这个只有专业的mq才能分别显示出来,redis list只是简单数组。
rabbitmq nsq都有官方自带速率显示。

7.3 新增一个10行代码的函数的最精简乞丐版实现的分布式函数执行框架.
新增一个10行代码的函数的最精简乞丐版实现的分布式函数执行框架,演示最本质实现原理,不要亲自这么使用。
beggar_redis_consumer.py文件的 start_consuming_message函数。
def start_consuming_message(queue_name, consume_function, threads_num):
pool = ThreadPoolExecutor(threads_num)
while True:
try:
redis_task = redis_db_frame.brpop(queue_name, timeout=60)
if redis_task:
task_str = redis_task[1].decode()
print(f'从redis的 {queue_name} 队列中 取出的消息是: {task_str}')
pool.submit(consume_function, **json.loads(task_str))
else:
print(f'redis的 {queue_name} 队列中没有任务')
except redis.RedisError as e:
print(e)
def add(x, y):
time.sleep(5)
print(f'{x} + {y} 的结果是 {x + y}')
# 推送任务
for i in range(100):
redis_db_frame.lpush('test_beggar_redis_consumer_queue', json.dumps(dict(x=i, y=i * 2)))
# 消费任务
start_consuming_message('test_beggar_redis_consumer_queue', consume_function=add, threads_num=10)
看完整版代码很长很多,是由于控制功能太多,中间件类型多,并发模式多, 所以加入一个最精简版,精简版的本质实现原理和完整版相同。
7.4 新增sqlachemy 支持的数据库作为消息中间件
新增sqlachemy 支持的数据库作为消息中间件,包括sqlserver mysql postgre oracle sqlite
每个队列是一张表模拟的。

每个任务是表里面的一行记录。

7.5 日志改为导入独立包nb_log,支持用户配置文件自定义日志配置。
例如设置默认需不需要彩色,需不需要大背景彩色色块,需不需要自动拦截转化python内置的print.
在用户当前项目根目录下生成的nb_log_config.py 可以自定义优先日志配置。
7.6 优化qps控频。
将qps按范围分段,采用不同的等待或计数方式。使当qps设置很高的时候,控频更精确。
增加了分布式控频,需要依赖redis中间件。
分布式环境中的控频指的是,假如xx.py文件中有一个consumer,设置func函数的qps为10。
如果在线上部署了三个容器服务,如果不使用分布式控频,则func函数的每秒运行总次数会是30。
即使只有1台机器,如果开多进程,Process运行3个进程,或者把xx.py反复运行启动3个,
也会造成func函数每秒运行总次数是30。
分布式控频主要是解决这种问题。默认不使用分布式控频,
当设置 is_using_distributed_frequency_control为True的时候,使用分布式控频。
7.7 增加rocketmq支持。 (2020-7)
from funboost import boost, BrokerEnum, BoosterParams
@boost(BoosterParams(queue_name='queue_test_f03', qps=2, broker_kind=BrokerEnum.ROCKETMQ))
def f(a, b):
print(f'{a} + {b} = {a + b}')
if __name__ == '__main__':
for i in range(100):
f.push(i, i * 2)
f.consume()
7.8 新增 async 并发模式 (2020-12)
框架希望用户写同步函数,不鼓励用户新写async def的函数,如果你的代码函数已经写成了async def,可以用此种并发方式。
写async def的函数很烦人,asyncio里面的概念很难学。
这两个项目用的asyncio你能写出来,能看懂说明不?
https://github.com/ydf0509/async_pool_executor
https://github.com/ydf0509/sync2asyncio
之前一直都没支持这种并发模式,异步代码不仅消费函数本身与同步代码很多不同,例如函数的定义和调用以及三方库,
不同于gevent和eventlet打个猴子补丁就可以变并发方式并且代码保持100%原样,asyncio的方式代比同步码真的是要大改特改。
而且在框架层面要支持异步也要增加和修改很多,支持异步并不是很容易。这一点连celery5.0目前都还没支持到(据官方文档说5.0要加入支持,但目前的5.0.3还没加入。)
如果消费函数已经写成了async def这种,那么可以设置 concurrent_mode=ConcurrentModeEnum.ASYNC,
框架会在一个新的线程的loop里面自动运行协程,所有协程任务会自动在一个loop里面运行,不是每次临时都生成新的loop只运行一个当前任务方式。
from funboost import boost, BrokerEnum, ConcurrentModeEnum, BoosterParams
import asyncio
# 此段代码使用的是语言级Queue队列,不需要安装中间件,可以直接复制运行测试。
@boost(BoosterParams(queue_name='test_async_queue2', concurrent_mode=ConcurrentModeEnum.ASYNC,
broker_kind=BrokerEnum.LOCAL_PYTHON_QUEUE, concurrent_num=500, qps=20))
async def async_f(x):
# 测试异步阻塞并发, 此处不能写成time.sleep(1),否则无论设置多高的并发,1秒钟最多只能运行1次函数。
# 同理asyncio 不能和 requests搭配,要和 aiohttp 搭配。
await asyncio.sleep(1)
print(id(asyncio.get_event_loop()))
# 通过 id 可以看到每个并发函数使用的都是同一个loop,而不是采用了愚蠢的临时 asyncio.new_event_loop().run_until_complete(async_f(x)) 方式调度。
print(x)
if __name__ == '__main__':
async_f.clear()
for i in range(100):
async_f.push(i, )
async_f.consume()
7.8.2 gevent/eventlet 和 asyncio 用法区别感受
比方说汽车的自动挡和手动挡,学了手动挡一定会开自动挡,只学自动挡很难开手动挡。
asyncio方式的代码比正常普通同步思维的代码写法也要难得多了,能玩asyncio的人一定会用threading gevent,
但只用过threading gevent,不去专门学习asyncio的用法,100%是玩不转的。
gevent就像自动挡汽车,自动换挡相当于自动切换阻塞。
asyncio就像手动挡,全要靠自己写 await / async def /loop / run_until_complete /run_forever/
run_coroutine_threadsafe /wait / wait_for /get_event_loop / new_event_loop / get_running_loop
,写法很麻烦很难。异步多了一个loop就像手动挡汽车多了一个离合器一样,十分之难懂。
手动挡玩的溜性能比自动挡高也更省油。asyncio玩的溜那么他的io并发执行速度和效率也会更好,cpu消耗更少。
如果你写一般的代码,那就用同步方式思维来写吧,让分布式函数调度框架来替你自动并发就可以啦。
如果追求更好的控制和性能,不在乎代码写法上的麻烦,并且asyncio技术掌握的很溜,那就用asyncio的方式吧。
7.8.3 关于 async 并发模式,为什么框架还使用 pyredis pika pymongo,而没有使用aioredis aiomongo
异步鬓发模式里面,整个调用链路必须是一旦异步,必须处处异步,在base_consumer.py的AbstractConsumer中,
方法 _async_run_consuming_function_with_confirm_and_retry里面使用的还是操作中间件的同步库,
主要是因为框架目前支持15种中间件,一个一个的使用异步模式的库操作中间件来实现,比现在代码起码要增加80%,无异于重写一个项目了。
异步和同步真的写法语法相差很大的,不信可以比比aiomysql 和pymysql库,aiohttp和requests,如果非常简单能实现异步,
那aiohttp和aiomysql作者为什么要写几万行代码来重新实现,不在原来基础上改造个七八行来实现?
目前此库对 消息拉取和消息消费完全是属于在两个不同的线程里面,井水不犯河水,所以用同步库拉取消息对asyncio的消费函数没有任何影响,不存在同步库阻塞异步库的问题。
对于消息确认 消息重新入队 任务过滤 mongo插入,都是采用的同步库,但是使用了 run_in_executor,
把这些操作在异步链路中交给线程池来运行了,同事这个线程池不是官方内置线程池,是智能缩小扩大线程池 ThreadPoolExecutorShrinkAble。
run_in_executor 会把一个同步的操作,sumbit提交给线程池,线程池返回的是一个concurrent.futures包的Future对象,
run_in_executor包装转化了这个Future(此Future不是asyncio的,不是一个awaitable对象)成为了一个asyncio包的Future对象,asyncio的Future对象可以被await,
所以这是非常快捷的同步阻塞函数在异步链路中转同步转异步语法的最佳方式。官方也是这么推荐的。
除了框架内部的阻塞函数是run_in_executor快速转化成非阻塞事件循环的,但是主要的用户的消费函数,是使用的真async模式运行在一个loop循环中的,
也即是单线陈鬓发运行用户的异步函数。
其次框架的同步阻塞函数,都是操作中间件类型的库,异步就是 入队 确认消费 查询是否过滤,这些操作一般都会在1毫秒之内完成,不阻塞太长的事件,
即使不使用run_in_executor,直接在异步链路使用这些同步操作,也没太大问题。一旦异步必须处处异步,说的是不能调用耗时太长的同步阻塞函数,
1毫秒的无伤大雅,因为celery 1秒钟最多能调度300个 def f: print(hello) 这样的无cpu 无io的函数,此框架调度运行速度任然超过celery。
还有一种调度起 async def定义 的消费函数方式是继续开多线程并发,然后使用 临时loop = asyncio.new_event_loop(),loop.run_until_complete,这方式愚蠢了,
相当于只是为了运行起这个函数,但全流程丝毫没有丁点异步。
7.8.4 愚蠢的celery调用异步函数写法
下面截图这种写法为了异步而异步,非常废物滑稽的写法。
如果是celery多线程并发模式,那就是每个线程里面临时起一个loop,每个生成的loop只运行了一次协程成对象。这样完全没有利用到asyncio的优势
如果是celery多进程并发模式,那就是每个进程里面临时起一个loop,每个生成的loop只运行了一次协程成对象。这样完全没有利用到asyncio的优势
celery真正的最终目标是直接能把@task装饰器加到 一个asynnc def的函数上,而不是现在间接的再新增写一个同步函数来调用异步函数。
到目前为止的最新版 celery 5.2.3还没有实现 直接支持 asyncio 并发模式。
用户不要抱多大希望celery能很快支持asyncio,例如celery使用kafka做中间件,官方承诺了7年,一次次的放鸽子到现在还不支持,没那么容易。

7.9 2021-04 新增以 redis 的 stream 数据结构 为中间件的消息队列。
这个是 redis 的 真消息队列,这次是 真mq,
stream 数据结构功能更加丰富接近 rabbitmq kafka这种真mq的消息队列协议,比 list 做消息队列更强。
需要redis的服务端5.0版本以上才能使用这个数据结构。
代码文件在 funboost/consumers/redis_stream_consumer.py
这个 REDIS_STREAM 中间件和 REDIS_ACK_ABLE 都支持消费确认,不管客户端怎么掉线关闭,都可以确保消息万无一失。
BrokerEnum.REDIS 中间件 不支持消费确认,随意重启或者断电断线会丢失一批任务。
from funboost import boost, BrokerEnum, BoosterParams
@boost(BoosterParams(queue_name='queue_test_f01', broker_kind=BrokerEnum.REDIS_STREAM))
def f(a, b):
print(f'{a} + {b} = {a + b}')
if __name__ == '__main__':
for i in range(100):
f.push(i, b=i * 2)
f.consume()
7.10 2021-04 新增以 redis 的 list 为数据结构,但使用 brpoplpush 命令 双队列 作为中间件的消息队列。
此 brpoplpush 双队列方式 + 消费者唯一id标识的心跳检测,可以媲美 rabbitmq 的确认消费功能。
代码演示省略,设置broker_kind=BrokerEnum.RedisBrpopLpush就行了。
@boost(BoosterParams(queue_name='queue_test_f01', broker_kind=BrokerEnum.RedisBrpopLpush))
7.11 2021-04 新增以 zeromq 为中间件的消息队列。
zeromq 和rabbbitmq kafka redis都不同,这个不需要安装一个服务端软件,是纯代码的。
zeromq方式是启动一个端口,所以queue_name传一个大于20000小于65535的数字,不能传字母。
消费端代码,启动消费端时候会自动启动 broker 和 server。
import time
from funboost import boost, BrokerEnum, BoosterParams
@boost(BoosterParams(queue_name='30778', broker_kind=BrokerEnum.ZEROMQ, qps=2))
def f(x):
time.sleep(1)
print(x)
if __name__ == '__main__':
f.consume()
发布端代码
from test_frame.test_broker.test_consume import f
for i in range(100):
f.push(i)
7.12 2021-04 新增以 操作kombu包 为中间件的消息队列
一次性新增操作10种消息队列,.但比较知名的例如rabbitmq redis sqlite3 函数调度框架已经在之前实现了。
使用方式为设置 @boost 装饰器的 broker_kind 为 BrokerEnum.KOMBU
在你项目根目录下的 funboost_config.py 文件中设置
KOMBU_URL = 'redis://127.0.0.1:6379/7' 那么就是使用komb 操作redis。
KOMBU_URL = 'amqp://username:password@127.0.0.1:5672/',那么就是操纵rabbitmq
KOMBU_URL = 'sqla+sqlite:////dssf_sqlite.sqlite',那么就是在你的代码所在磁盘的根目录创建一个sqlite文件。四个////表示根目,三个///表示当前目录。
其余支持的中间件种类大概有10种,不是很常用,可以百度 google查询kombu或者celery的 broker_url 配置方式。
操作 kombu 包,这个包也是celery的中间件依赖包,这个包可以操作10种中间件(例如rabbitmq redis),
但没包括分布式函数调度框架能支持的kafka nsq zeromq 等。
但是 kombu 包的性能非常差,如何测试对比性能呢?
可以用原生redis的lpush和kombu的publish测试发布
使用brpop 和 kombu 的 drain_events测试消费,对比差距相差了5到10倍。
由于性能差,除非是分布式函数调度框架没实现的中间件才选kombu方式(例如kombu支持亚马逊队列 qpid pyro 队列),
否则强烈建议使用此框架的操作中间件方式而不是使用kombu。
可以把@boost装饰器的broker_kind参数 设置为 BrokerEnum.REDIS_ACK_ABLE 和BrokerEnum.KOMBU(配置文件的KOMBU_URL配置为redis),
进行对比,REDIS_ACK_ABLE的消费速度远远超过 BrokerEnum.KOMBU,所以之前专门测试对比celery和此框架的性能,
差距很大,光一个 kombu 就拉了celery大腿很多,再加上celery的除了kombu的执行性能也很低,所以celery比此框架慢很多。
test_frame\test_celery 下面有celery的发布 消费例子,可以测试对比下速度,同样gevent 并发和redis中间件,
celery 执行 print hello 这样的最简单任务,单核单进程每秒执行次数过不了300,celery性能真的是太差了。
消费
import time
from funboost import boost, BrokerEnum, BoosterParams
@boost(BoosterParams(queue_name='test_kombu2', broker_kind=BrokerEnum.KOMBU, qps=5))
def f(x):
time.sleep(60)
print(x)
if __name__ == '__main__':
f.consume()
发布
from test_frame.test_broker.test_consume import f
for i in range(10000):
f.push(i)
你项目根目录下的 funboost_config.py
KOMBU_URL = 'redis://127.0.0.1:6379/7'
# KOMBU_URL = f'amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VIRTUAL_HOST}'
# KOMBU_URL = 'sqla+sqlite:////celery_sqlite3.sqlite' # 4个//// 代表磁盘根目录下生成一个文件。推荐绝对路径。3个///是相对路径。
7.14 2021-04 新增以mqtt emq 作为消息中间件
例子,设置 broker_kind=BrokerEnum.MQTT
from funboost import boost, BrokerEnum, BoosterParams
@boost(BoosterParams(queue_name='mqtt_topic_test', broker_kind=BrokerEnum.MQTT))
def f(x, y):
print(f''' {x} + {y} = {x + y}''')
return x + y
for i in range(100):
f.push(i, i * 2)
f.consume()
这个默认做成服务端不存储消息,mqtt中间件适合前后端实时交互的。可以直接绕开后端flask django 接口,不用写接口,
前端直接发任务到mqtt,后端订阅,后端完成后,发送结果到唯一任务的topic
当然也可以 前端订阅topic,前端发任务到python flask接口,flask接口中发布任务到rabbitmq redis等,
后台消费完成把函数结果发布到mqtt,mqtt推送给前端
此框架的消费做成了mqtt的共享订阅,例如启动多个重复的消费者脚本,不会所有消费脚本都去重复处理一个消息
7.15 2021-04 新增以 httpsqs 作为消息中间件
@boost(BoosterParams(queue_name='httpsqs_queue_test', broker_kind=BrokerEnum.HTTPSQS))
7.16 2021-04 新增支持下一代分布式消息系统 pulsar 。
@boost(BoosterParams(queue_name='httpsqs_queue_test', broker_kind=BrokerEnum.PULSAR))
在开源的业界已经有这么多消息队列中间件了,pulsar作为一个新势力到底有什么优点呢?
pulsar自从出身就不断的再和其他的消息队列(kafka,rocketmq等等)做比较,但是Pulsar的设计思想和大多数的消息队列中间件都不同
,具备了高吞吐,低延迟,计算存储分离,多租户,异地复制等功能,所以pulsar也被誉为下一代消息队列中间件
pulsar 的消费者数量可以不受topic 分区数量的限制,比kafka和rabbitmq 强。5年后会替代kafka rabbitmq。
from funboost import boost, BrokerEnum, BoosterParams
@boost(BoosterParams(queue_name='test_pulsar_topic2', broker_kind=BrokerEnum.PULSAR, qps=1, broker_exclusive_config={'subscription_name':'funboost_g1'}))
def add(x,y):
print(x+y)
if __name__ == '__main__':
add.push(1,2)
add.push(3,4)
add.consume()
7.17 2021-04 新增延时运行任务,介绍见4.8
7.18 2021-09 新增 轻松远程服务器部署运行函数
框架叫分布式函数调度框架,可以在多台机器运行,因为消息队列任务是共享的。
我用的时候生产环境是使用 阿里云 codepipeline k8s部署的多个容器。还算方便。
在测试环境一般就是单机多进程运行的,用supervisor部署很方便。
所以之前没有涉及到多态机器的轻松自动部署。
如果要实现轻松的部署多台物理机,不借助除了python以外的其他手段的话,只能每台机器登录上然后下载代码,启动运行命令,机器多了还是有点烦的。
现在最新加入了 Python代码级的函数任务部署,不需要借助其他手段,python代码自动上传代码到远程服务器,并自动启动函数消费任务。
目前的自动化在远程机器启动函数消费,连celery都没有做到。
不依赖阿里云codepipeline 和任何运维发布管理工具,只需要在python代码层面就能实现多机器远程部署。
7.19 2021-09 新增 socket udp/tcp/http 消息队列,不需要安装消息中间件软件。
好处是不需要安装消息队列服务,就可以跨机器通信,详见4.35章节
7.20 2021-09 新增 支持 nats 高性能消息队列
用法一如既往,只需要修改broker_kind的枚举,并在 funboost_config.py 配置好 NATS_URL 的值就完了。
@boost(BoosterParams(queue_name='test_queue66c', broker_kind=BrokerEnum.NATS))
def f(x, y):
pass
7.21 2022-01 新增 @boost装饰器全局默认配置
boost装饰器没有亲自指定参数时候的全局默认值 ,用法见 4.15 章节 说明
7.22 2022-02 新增暂停消费功能
框架支持暂停消费功能和继续消费功能,用法见文档4.18
7.23 2022-04 消费者/boost装饰器 新增 broker_exclusive_config 参数
加上一个不同种类中间件非通用的配置,不同中间件自身独有的配置,不是所有中间件都兼容的配置,因为框架支持30种消息队列,消息队列不仅仅是一般的先进先出queue这么简单的概念,
可以看 4.20 章节
7.24 2022-04 新增用户 自定义记录消费状态结果函数钩子
可以通过设置 user_custom_record_process_info_func 的值为你的自定义函数,来记录消费状态及结果,用户可以自由发挥保存消费结果状态到任意地方
可以看 4.19章节
7.25 2022-04 新增用户灵活自由自定义扩展中间件和生产消费逻辑的功能
register_custom_broker 这个是增加的一种很强大的功能,用户可以自定义发布者和消费者,注册到框架中。boost装饰器就能自动使用你的消费者类和发布者类了。
可以看 4.21 章节
7.26 2022-07 新增内置以redis作为apscheduler存储的定时器,动态增删改查定时任务配置。
可以看4.4b章节的演示代码例子
7.27 2023-02 新增适配python 3.6-3.11所有版本
适配python 3.6-3.11所有版本
适配python 3.10 3.11 的asyncio并发模式,因为官方老是在新版本的asyncio模块,把asyncio的api入参改来改去的,现在适配了。
7.28 2023-02 新增 asyncio 语法生态下rpc获取执行结果
因为 async_result= fun.push() ,默认返回的是 AsyncResult 类型对象,里面的方法都是同步语法。
async_result.result 是一个耗时的函数, 解释一下result, 是property装饰的所以不用 async_result.result()
有的人直接在async def 的异步函数里面 print (async_result.result),如果消费函数消费需要耗时5秒,
那么意味rpc获取结果至少需要5秒才能返回,你这样写代码会发生灭顶之灾,asyncio生态流程里面一旦异步需要处处异步。
所以新增了 AioAsyncResult 类,和用户本来的asyncio编程生态更好的搭配。
7.29 2023-03 新增死信队列
抛出 ExceptionForPushToDlxqueue 类型错误,消息发送到单独另外的死信队列中,查看文档 4.24.c 和 4.24.d 4.24.e 章节。
7.30 2023-03 新增支持ctrl + c 退出程序
因为程序是多个子线程while 1的,ctrl+c无法结束程序。 现在框架增内部增加了下面的代码了,可以支持ctrl+c结束程序了。
def _interrupt_signal_handler(signal, frame):
print('你按了 Ctrl+C 。 You pressed Ctrl+C! 结束程序!')
# sys.exit(0)
# noinspection PyUnresolvedReferences
os._exit(0) # os._exit才能更强力的迅速终止python,sys.exit只能退出主线程。
signal.signal(signal.SIGINT, _interrupt_signal_handler)
看4.6.5章节的演示代码例子
7.31 2023-04 新增支持 celery 作为 broker。
完全由celery框架来调度函数,发布函数和发布消息都是由celery框架来完成,但是用户无需学习celery语法和celery烦人的配置方式和烦人的celery目录结构,
用户对celery.Celery对象实例完全无需感知,用户不需要学习celery命令行启动消费和定时,funboost帮你自动搞定这些,用户无需接触celery命令行。
有的人担心funboost调度执行不稳定,有的人不对比瞎质疑funboost性能没有celery强,那么可以使用celery作为funboost中间件。
funboost只充当发布和启动消费的一层api,内部由celery驱动。
funboost的好处是兼容30种消息队列或者叫ptython包,一统使用这些三方包的行为。用户切换中间件成本很低,无需知道每种中间件的语法差异。
就像sqlachemy能操作5种数据库,用户不需要知道mysql和sqlserver语法差异一样。
使用celery作为中间件,用户需要在 funboost_config.py 配置
CELERY_BROKER_URL(必须) 和 CELERY_RESULT_BACKEND (可以为None)
from funboost import boost, BrokerEnum, BoosterParams
@boost(BoosterParams(queue_name=queue_1, broker_kind=BrokerEnum.CELERY))
python例子见 11.1章节
7.32 2023-04 新增支持 python 微服务框架 nameko 作为 broker。
nameko 是 外国人用的多的最知名python微服务框架,使用eventlet并发
funboost支持nameko作为执行调度和rpc实现,funboost只是提供统一的api交互。
from funboost import boost, BrokerEnum, ConcurrentModeEnum, BoosterParams
@boost(BoosterParams(queue_name='test_nameko_queue', broker_kind=BrokerEnum.NAMEKO, concurrent_mode=ConcurrentModeEnum.EVENTLET))
python例子见 11.2 章节
7.33 2023-05 优化了apscheduler定式框架的动态删除添加定时任务
FsdfBackgroundScheduler 继承重写了BackgroundScheduler的 _main_loop 方法。
class FunboostBackgroundScheduler(BackgroundScheduler):
def _main_loop(self):
"""原来的_main_loop 删除所有任务后wait_seconds 会变成None,无限等待。
或者下一个需要运行的任务的wait_seconds是3600秒后,此时新加了一个动态任务需要3600秒后,
现在最多只需要1秒就能扫描到动态新增的定时任务了。
"""
MAX_WAIT_SECONDS_FOR_NEX_PROCESS_JOBS = 1
wait_seconds = None
while self.state == STATE_RUNNING:
if wait_seconds is None:
wait_seconds = MAX_WAIT_SECONDS_FOR_NEX_PROCESS_JOBS
time.sleep(min(wait_seconds,MAX_WAIT_SECONDS_FOR_NEX_PROCESS_JOBS)) # 这个要取最小值,不然例如定时间隔0.1秒运行,不取最小值,不会每隔0.1秒运行。
wait_seconds = self._process_jobs()
7.34 2023-05 重新实现了boost装饰器
对于一个呗@boost装饰的函数,到底应该怎么称呼它?
之前叫消费函数,消费函数是指的是boost装饰后的还是 原始函数本身,称呼不太明确。
之前的boost装饰器是使用函数来实现的,没有类型,现在的boost装饰器使用类来实现,一个函数被 boost装饰后,类型是 Booster,现在有类型了。
之前的boost装饰器是一个函数,在被装饰的函数 fun 本身附加consumer和publisher对象,以及各种方法。
返回的还是函数本身,但是附加了各种方法,方便用户 fun.push() fun.consume() fun.get_message_count() 等等。
为了代码在pycahrm下补全犀利,还加了类型注释,boost的返回值指向一个为了补全犀利而写的 IdeAutoCompleteHelper类。
修改后的boost就是Booster类,现在boost返回的是 Booster类型的对象,补全效果很好。
对funboost的功能pycharm自动补全和函数本身的入参pycharm自动补全都很好。去掉了为了补全而写的 IdeAutoCompleteHelper。
现在一个函数被boost装饰后,他的类型就变成Booster了,可以称此函数为一个booster了。
重构之后的boost装饰器实现代码:
https://github.com/ydf0509/funboost/blob/master/funboost/core/booster.py
重构之前的boost装饰器实现代码:
https://github.com/ydf0509/funboost/blob/e299606a7271e24cae8dea7b9cbbcc400a4f4b0b/funboost/__init__old.py
7.35 2023-06 新增支持优先级队列
见文档4.29
7.36 2023-07 新增支持 funboost远程杀死任务
见文档4.30
7.37 2023-07 新增所有命名空间的日志和print都另外写入到一个总的文件中。
之前的funboost日志,每个命名空间的日志写入到不同的文件,每个队列名的消费者和发布者都有独立的日志命名空间,写入到不同的文件中,是为了用户方便排查。
例如你查func2的报错和运行记录,只需要到那个func2消费者.log的文件中去排查,这个文件日志不会包含别的消费函数的运行记录。
但有的人希望是项目中的所有日志 + print 写入到一个相同的文件中,方便排查上下文,这个几乎相当于 nohup 部署然后重定向标准输出到一个文件中了。
现在python代码级别对print和sys.stdout sys.stderr打了猴子补丁,支持所有print和logger打印另外写入到一个单独的文件中了。这是nb_log的新功能。
见nb_log文档 https://nb-log-doc.readthedocs.io/zh_CN/latest/articles/c10.html
10.1章节和1.1章节里面介绍了,怎么修改是否另外所有日志和print再单独写入到一个总的日志文件中。
7.38 2023-10 多线程并发模式,增加了支持async def的函数
对于async def 的函数,不需要boost装饰器指定 concurrent_mode=ConcurrentModeEnum.ASYNC ,每个线程内部会临时 loop= asyncio.new_event_loop(),
然后loop.run_unyil_cpmplete来运行async def的函数。
这种情况下 每个协程是运行在不同的loop中,这是假asyncio变成。
如果你想每个协程运行在一个loop里面,那就需要 boost装饰器指定 concurrent_mode=ConcurrentModeEnum.ASYNC,这是真asyncio编程。
7.39 2024-01 @booost装饰器入参变成pydantic Model类型, BoostParams类或子类
@boost(queue_test_f01', qps=0.2,broker_kind=BrokerEnum.REDIS_ACK_ABLE,)
建议把所有传参变为放在BoosterParams类或子类里面:
@boost(BoosterParams(queue_name='queue_test_f01', qps=0.2,broker_kind=BrokerEnum.REDIS_ACK_ABLE,))
7.40 2024-03 函数运行状态页面增加消息运行中状态
之前是只有消息运行完成后才会显示这条消息,运行中的消息不会显示,现在新增 running 状态的消息。

7.41 2024-03 新增 funboost_current_task 上下文
之前无法在用户的消费函数内部去获取消息的完全体,只能知道函数的入参,无法知道消息的发布时间 taskid等.
上下文就是类似flask的request对象,线程中任意地方可以获取,线程/协程隔离.
用户在任意消费函数中使用fct就能获取当前的任务消息了。
这个功能使得用户在用户函数中就能知道消息的完全体、 当前是哪台机器 、哪个进程、 第几次重试运行函数
消息的发布时间 消息的task_id 等等。
原来用户在消费函数中是无法获取这些信息的。
见文档4.31
详见文档4.31
7.42 2024-04 新增支持消费函数定义入参 **kwargs,用于消费任意json消息
见文档 4b.2 章节介绍.
7.43 2024-05 新增另外一种方式来 自定义增加或重写 消费者 发布者
见文档 4.21b 章节介绍
boost装饰器 传参 consumer_override_cls 和 publisher_override_cls 来自定义或重写消费者 发布者。
7.44 2024-06 重磅升级!funboost 支持实例方法、类方法、静态方法、普通函数 4种类型,作为消费函数的例子
funboost 在 2024年6月新增支持了实例方法、类方法作为消费函数 ,写法见文档4.32章节
7.45 2024-08 @boost入参新增 is_auto_start_consuming_message,定义后立即自动启动消费。
@BoosterParams(queue_name="q1", is_auto_start_consuming_message=True)
def f(x):
这样写后,自动启动消费,不需要 用户手动的写 f.consume() 来启动消费。
代码例子见4.33章节
7.46 2024-08 修复使用redis心跳来辅助确认消费的redis中间件模式,重复消费的bug
用户的 broker_kind 如果是这四种 [BrokerEnum.REDIS_ACK_ABLE, BrokerEnum.REDIS_STREAM, BrokerEnum.REDIS_PRIORITY, BrokerEnum.RedisBrpopLpush]
用户需要升级到 46.2, 如果不想升级就需要手动指定 @boost(BoostParams(is_send_consumer_heartbeat_to_redis=True))
用户如果使用的是 BrokerEnum.REIDS 和 BrokerEnum.REIDS_ACK_USING_TIMEOUT 不受影响,因为不使用redis心跳来辅助确认消费。
7.47 2025-01 加强了 funboost web manager 功能
具体看文档13章节。
7.48 2025-07 消费函数的入参类型可以是自定义类型对象(不可json序列化的类型)
以前作者不愿意支持消费函数入参是自定义类型,2025-07 之后支持了,不愿意支持的原因可以看文档第6章.
就是现在消费函数的入参可以是 字符串 数字 列表 字典 以外的自定义类型,
def func1(a:MyClass,b:str,c:MyPydanticModel) 现在可以.
7.49 2025-08 新增Booster对象pickle序列化, 间接支持了 aps_obj.add_job 方式添加定时任务
新增了支持了 Booster 对象 pickle 序列化和反序列化.
新增支持
aps_obj.add_job来添加定时任务,而不是非要使用ApsJobAdder.add_push_job,具体看文档 4.4.4 章节
Booster 对象能 pickle 序列化的核心原理如下:
class Booster:
def __getstate__(self):
state = {}
state['queue_name'] = self.boost_params.queue_name
return state
def __setstate__(self, state):
"""非常高级的骚操作,支持booster对象pickle序列化和反序列化,设计非常巧妙,堪称神来之笔
这样当使用redis作为apscheduler的 jobstores时候,aps_obj.add_job(booster.push,...) 可以正常工作,
使不报错 booster对象无法pickle序列化.
这个反序列化,没有执着于对 socket threding.Lock 怎么反序列化,而是偷换概念,绕过难题,基于标识的代理反序列化
"""
_booster = BoostersManager.get_or_create_booster_by_queue_name(state['queue_name'])
self.__dict__.update(_booster.__dict__)
7.50 2025-08 新增 grpc 作为funboost 的 broker
见文档 11.7 使用 grpc 作为funboost的broker ,这个中间件可以通过sync_call方法同步阻塞获取rpc结果,不依赖redis实现rpc
7.51 2025-08 新增支持 mysql_cdc 作为 broker
cdc 就是 Change Data Capture,是大数据热门技术。
这是个特殊的broker,基于mysql的binlog cdc ,无需用户主动发布消息,自动捕获 mysql 变化的数据转换成消息作为消费者入参。
见文档 11.8 使用 mysql_cdc 作为 funboost 的broker
第一性原理: funboost使用了 pymysqlreplication 包来实现mysql_cdc功能
7.52 2025-10 @boost装饰器,新增设置 booster_group 消费分组
例如一组函数写 BoosterParams(booster_group='group1') ,那么 BoostersManager.consume_group('group1') 会启动这组函数消费。
主要是取代用户手动写 f1.consume() f2.consume() 这样需要多次亲自手写启动相关消费函数。
也避免了 BoostersManager.consume_all() 会启动不相关消费函数.
python代码例子,见文档 4.2d.3 章节.
7.53 2026-01 新增 funboost.faas,快速实现FaaS(非常好用重要)
给各种流行的python web框架的app,自动注册一组funboost路由,支持的web包括fastapi django flask
助力用户快速实现 http rpc、从http管理定时任务,队列管理、自动注册和发现 、微服务 、FaaS平台、funboost web管理后台接口 等用途场景。
详细见文档 15 章节。
7.54 2026-01 修改了一些拼写错误的名字,(对应funboost 版本 53.0之后)。
本次更新主要修正了部分方法的命名规范及拼写错误。
为了方便用户快速查阅,我将更新日志整理为一份清晰的对照表,并按模块进行了分类。
1. 内部保护方法变更
影响范围:仅影响按照教程 4.21 章节进行继承扩展的高级用户。
类名 |
旧方法名 (Old) |
新方法名 (New) |
|---|---|---|
|
|
|
|
|
|
2. 消息发送与配置类变更
影响范围:使用了priority_control_config的用户,一般用户没用到。
变更位置 |
旧名称 |
新名称 |
说明 |
|---|---|---|---|
|
|
|
减少歧义,扩大配置涵盖范围 |
配置类定义 |
|
|
更加通用化的命名 |
3. BoosterParams 参数拼写修正
影响范围:公有 API 修正。虽然涉及范围广,但字段较冷门。
错误拼写 (旧) |
正确拼写 (新) |
修改类型 |
|---|---|---|
|
|
拼写纠错 |
|
|
拼写纠错 |
|
|
拼写纠错 |
4. 💡 迁移建议
全局替换:大部分变更只需在 IDE 中进行全局搜索并替换即可完成升级。
兼容性:绝大部分普通用户(仅调用常规
consume或push的用户)无感知。
5. funboost 没有celery的配置变化改动大
celery3到celery4的公有配置名字变化才叫翻天覆地,funboost一直以来,很少改变,这次改变一批。
7.55 2026-01 函数结果持久化支持设置表名
FunctionResultStatusPersistanceConfig 增加 table_name 参数,用于设置表名。 之前默认保存的mongo表名同queue_name一致,现在可以自定义。 这就意味着你可以把同一个项目的多个队列的函数结果都放到一个mongo表中。
7.56 2026-01 新增 opentelemetry 全链路任务追踪,这是funboost的一个生产级别的重要战略级功能
使用国际w3c的规范协议,基于opentelemetry实现全链路任务追踪。
使用方式,就是可以直接使用OtelBoosterParams
或者你在你的BoosterParams中指定consumer_override_cls和publisher_override_cls为OtelConsumerMixin和OtelPublisherMixin。
7.56 2026-01 新增 Prometheus 指标监控,对接你们自己的运维系统
funboost_web_manager 已经可以显示消费失败 成功 消息数量曲线了,现在新增了 集成 Prometheus 指标监控,方便对接你们自己的grafana等运维系统。
7.58 2026-01 funboost新增声明式任务编排 workfolw
funboost现在也能支持和celery类似的 fun.s(1,2) 和 chain chord group 复杂的工作流编排了。
详见文档 4b.8 章节
7.59 2026-01 增强redis稳定性
连接redis时候,默认设置
{'health_check_interval' :30,
'socket_keepalive' :True,}
7.60 2026-01 funboost 的确认消费,unack消息重回队列去掉 scan
funboost 去掉了ack机制的unack消息重回队列的通过 redis.scan 命令扫描unack队列大全,更好应对超多数量keys的的db(但是还是建议消息队列使用单独的db,减少查看redis数据库信息的其他keys干扰)。
在旧版本中(BrokerEnum.REDIS_ACK_ABLE 等确认消费模式),当需要查找掉线消费者的遗留任务(unack 队列)
或者清空队列时,框架需要找到所有类似的unack队列名字
self.redis_db_frame.scan(0, f'{self._queue_name}__unack_id_*', count=self.SCAN_COUNT)
7.61 2026-01 funboost极限性能优化,发布和消费性能提升120%
运行def fun(): pass的函数,
funboost 之前教程中大部分文案写的是:
funboost发布性能是celery的22倍,消费性能是celery的46倍。
经过极限优化后,现在2026年后,funboost发布性能是celery的50倍,消费性能是celery的100倍。
7.62 2026-01 funboost消费新增支持微批消费
微批的核心是:生产者单个单个地提交任务,但是消费者自动将多个任务聚合起来,一次性消费。
这是一个消费侧优化能力,很多消息队列框架都不提供原生支持(需要用户自己实现累积条数+超时强制触发消费的逻辑),funboost 通过 MicroBatchConsumerMixin 将这个模式抽象成了可复用的组件。
即使你不用消息队列,你用 funboost + MEMORY_QUEUE 充当一个微批消费的工具,也是非常方便的。
7.63 2026-03 funboost 增加高级重试:指数退避重试
详见 4.24.5 章节。
指数退避重试,能更灵活指定重试间隔,符合业界主流高级重试策略。
通过装饰器的 advanced_retry_config 的 retry_mode 设置 sleep 或者 requeue 可以设置重试模式。
优势说明:funboost 重试的
requeue模式完爆各种 Python 三方包的retry装饰器。因为指数退避重试的间隔很大并且会越来越大,不适合在装饰器中简单粗暴地sleep,那样会导致长时间阻塞霸占线程或协程,降低系统吞吐量。所以需要使用重回队列的方式,用 APScheduler 来调度何时再次重试。
删除了 BoosterParams 的 retry_interval 入参。如果你需要延时重试,请放在 advanced_retry_config字典的 retry_base_interval 进行配置。
7.63 2026-03 funboost 增加熔断降级
见文档 4b.14 funboost 支持熔断降级,智能自动熔断、探测、恢复 (高级功能)
熔断支持跳转到 使用降级函数 和 暂停拉取消息 消费两种
7.64 2026-03 funboost 增加 失败告警,使用AlertNotifierConsumerMixin
基于内存计数,连续失败次数或者失败比例 触发告警 ,告警渠道支持 企业微信 钉钉 飞书。
见6.30.1章节
7.65 2026-03 funboost 增加 失败告警,使用MongoAlertMonitor
基于查询 分布式汇总保存到mongo的数据,实现触发告警,告警渠道支持 企业微信 钉钉 飞书。
见6.30.4 章节
7.66 2026-03 funboost_web_manager 统一改称 funweb,增加3个系统功能
funboost_web_manager → funweb(新老导入和运行方式均兼容)
新增功能
功能 |
说明 |
详见章节 |
|---|---|---|
📜 脚本部署管理 |
一键管理任何语言的脚本(进程守护 + 自动发布 + 日志聚合) |
|
💻 服务器资源监控 |
实时监控 CPU、内存、磁盘使用率,支持历史走势 |
|
📖 通用日志查看器 |
脱离 SSH 终端的 Web 日志排查方案,支持实时 tail -f 体验、日志内容搜索 |
7.77 2026-05 增加 funboost_pool 两个类, 完美复刻 concurrent.futures.Executor API
在 funboost/core/funboost_pool.py 中新增了两个类:
MemoryFunboostPool :内存队列实现的并发池
FunboostPool :可选所有broker,能灵活配置所有BoosterParams入参。
用法见 教程 4.38 章节
这两个类完美复刻了 concurrent.futures.Executor 的 API 入参和返回值,提供了以下功能:
任务提交:支持
MemoryFunboostPool.submit()和FunboostPool.submit()方法,均返回concurrent.futures.Future对象批量任务:支持
MemoryFunboostPool.map()和FunboostPool.map()方法进行批量任务提交
兼容性:可直接替换老项目中的 concurrent.futures.ThreadPoolExecutor,无需修改调用代码。
7.78 2026-05 优化增强 FlexibleThreadPool 和 AsyncPoolExecutor
增强内容:提升并发池的通用适用性
FlexibleThreadPool 优化
新增支持
map()方法submit()方法增加返回concurrent.futures.Future对象
AsyncPoolExecutor 优化
新增支持
map()方法submit()方法增加返回concurrent.futures.Future对象新增
aio_submit()方法,返回asyncio.Future对象
7.79 2026-05 优化funboost pydantic model 在ide 下的自动补全和提示
专门新增了pyi文件, funboost/core/func_params_model.pyi ,用于在ide 下自动补全和提示。
因为pydantic model定义没有写 init 方法,所以model实例化时候,ide不能自动补全提示,需要用户付出一些高级技巧才能使ide自动补全提示。
现在优化成了不依赖pycharm pydantic插件,不依赖vscode 的高级的lsp和设置settings.json 来配置。
例如 BoosterParams 这个类是funboost最核心的,现在能无门槛ide自动补全提示了。funboost很注重简单性和用户体验。
7.80 2026-05 funweb增加高进配置功能
funweb页面中也可以配置告警,原理是复用已有的上报到redis中的数据,用户在页面上针对队列名字,可以配置 积压超标、qps骤降、消费者掉线、失败率飙升、平均耗时高 5种告警指标。
详见6.30.5章节教程。