3.框架详细介绍
3.1 各种中间件选择的场景和优势
class BrokerEnum:
"""
在funboost中万物皆可为消息队列broker,funboost内置了所有 知名的正经经典消息队列作为broker,
也支持了基于 内存 各种数据库 文件系统 tcp/udp/http这些socket 模拟作为broker.
funboost也内置支持了各种python三方包和消费框架作为broker,例如 sqlachemy kombu celery rq dramtiq huey nameko 等等
用户也可以按照文档4.21章节,轻松扩展任何物质概念作为funboost的broker.
"""
# funboost框架能轻松兼容消息队列各种工作模式, 拉模式/推模式/轮询模式,单条获取 批量获取
"""
funboost 的 consumer的 _dispatch_task 非常灵活,用户实现把从消息队列取出的消息通过_submit_task方法
丢到并发池,他不是强制用户重写实现怎么取一条消息,例如强制你实现一个 _get_one_message的法,
那就不灵活和限制扩展任意东西作为broker了,而是用户完全自己来写灵活代码。
所以无论获取消息是 拉模式 还是推模式 还是轮询模式,是单条获取 还是多条批量获取,
不管你的新中间件和rabbitmq api用法差别有多么巨大,都能轻松扩展任意东西作为funboost的中间件。
所以你能看到funboost源码中能轻松实现任物质概念作为funboost的broker。
"""
EMPTY = 'EMPTY' # 空的实现,需要搭配 boost入参的 consumer_override_cls 和 publisher_override_cls使用,或者被继承。
RABBITMQ_AMQPSTORM = 'RABBITMQ_AMQPSTORM' # 使用 amqpstorm 包操作rabbitmq 作为 分布式消息队列,支持消费确认.强烈推荐这个作为funboost中间件。
RABBITMQ = RABBITMQ_AMQPSTORM
# 2025-10 内置新增, 支持rabbitmq 所有路由模式,包括 fanout,direct,topic,headers. 使用概念更复杂
# 用法见 test_frame/test_broker_rabbitmq/test_rabbitmq_complex_routing 中的demo代码.
RABBITMQ_COMPLEX_ROUTING = 'RABBITMQ_COMPLEX_ROUTING'
RABBITMQ_RABBITPY = 'RABBITMQ_RABBITPY' # 使用 rabbitpy 包操作rabbitmq 作为 分布式消息队列,支持消费确认,不建议使用
RABBITMQ_AMQP = 'RABBITMQ_AMQP' # 使用 amqp 包操作 rabbitmq,Celery/Kombu 底层客户端,性能比 pika 更好
"""
以下是各种redis数据结构和各种方式来实现作为消息队列的,redis简直被作者玩出花来了.
因为redis本身是缓存数据库,不是消息队列,redis没有实现经典AMQP协议,所以redis是模拟消息队列不是真消息队列.
例如要实现消费确认,随意重启但消息万无一失,你搞个简单的 redis.blpop 弹出删除消息,那就压根不行.重启就丢失了,但消息可能还没开始运行或者正在运行中.
redis做ack挑战难点不是怎么实现确认消费本身,而是何时应该把关闭或宕机进程的消费者的待确认消费的孤儿消息重回队列.
在 Redis 上实现 ACK 的真正难点,根本不在于“确认”这个动作本身,而在于建立一套可靠的、能够准确判断“何时可以安全地及时地进行任务恢复”的分布式故障检测机制。
所以你以为只要使用 brpoplpush 或者 REDIS_STREAM 就能自动轻易解决ack问题,那就太天真了,因为redis服务端不能像rabbitmq服务端那样天生自带自动重回宕机消费者的消息机制,需要你在redis客户端来维护实现这套机制.
"""
REDIS = 'REDIS' # 使用 redis 的 list结构,brpop 作为分布式消息队列。随意重启和关闭会丢失大量消息,不支持消费确认。注重性能不在乎丢失消息可以选这个redis方案。
REDIS_ACK_ABLE = 'REDIS_ACK_ABLE' # 基于redis的 list + 临时unack的set队列,采用了 lua脚本操持了取任务和加到pengding为原子性,,基于进程心跳消失判断消息是否为掉线进程的,随意重启和掉线不会丢失任务。
REIDS_ACK_USING_TIMEOUT = 'reids_ack_using_timeout' # 基于redis的 list + 临时unack的set队列,使用超时多少秒没确认消费就自动重回队列,请注意 ack_timeout的设置值和函数耗时大小,否则会发生反复重回队列的后果,boost可以设置ack超时,broker_exclusive_config={'ack_timeout': 1800}.缺点是无法区分执行太慢还是真宕机
REDIS_PRIORITY = 'REDIS_PRIORITY' # # 基于redis的多 list + 临时unack的set队列,blpop监听多个key,和rabbitmq的x-max-priority属性一样,支持任务优先级。看文档4.29优先级队列说明。
REDIS_STREAM = 'REDIS_STREAM' # 基于redis 5.0 版本以后,使用 stream 数据结构作为分布式消息队列,支持消费确认和持久化和分组消费,是redis官方推荐的消息队列形式,比list结构更适合。
REDIS_BRPOP_LPUSH = 'RedisBrpopLpush' # 基于redis的list结构但是采用 brpoplpush 双队列形式,和 redis_ack_able的实现差不多,实现上采用了原生命令就不需要lua脚本来实现取出和加入unack了。
REDIS_PUBSUB = 'REDIS_PUBSUB' # 基于redis 发布订阅的,发布一个消息多个消费者都能收到同一条消息,但不支持持久化
"""
MEMORY_QUEUE: (funboost中最最最核心的broker,没有之一)
python内存队列,虽然不支持跨进程 跨脚本 跨机器共享任务,不支持持久化,
但是 MEMORY_QUEUE 作为broker 是funboost最最最重要的broker,绝非玩具和只适合简单场景使用,其在funboost中的用途广泛性远超那些正经服务端mq。
MEMORY_QUEUE 在 funboost 中的重要性是 sss级,重要性远超 redis kafka rabbbitmq等作为broker.
主要原因有:
1.queue.Queue超高的性能,没有socket io
2.queue.Queue作为消息队列时候,单独做了判断,不进行序列化和反序列化,
好处是可以将任何不可json序列化,不可pickle序列化的类型作为函数入参发送到内存Queue中,兼容性灵活性吊打其他broker。
3.不是所有人 所有场景都需要分布式 持久化,很多时候都需要使用到内存queue,内存queue经常作为背压、解耦、限流、回调等场景,
例如用户经常使用的ThreadpoolExecutor,里面就有一个无界队列的 _work_queue属性,内存queue的使用无处不在。
4.使用内存queue,你可以把@boost装饰器当做 tomorrow包或者线程池/asyncio协程池来使用。但是此时funboost的@boost装饰器,
吊打并发池和tomorrow装饰器,因为@boost装饰器不仅提供一键多种并发方式,还提供了qps 重试 超时杀死 函数入参缓存过滤等30多种功能,
你使用内存queue,可以把@boost当做一个超级装饰器,一个@boost涵盖了所有常见常用的装饰器的功能,一个@boost装饰器抵得上10个常规装饰器叠加使用。
5.celery为什么不推荐把memory作为broker?因为celery worker通常在控制台用命令行单独启动,和普通的python脚本中发布任务压根是跨进程跨python解释器了,无法跨程序共享内存队列任务。
而funboost启动消费就是普通的python程序,业务脚本发送消息和启动消费就是处在一个进程中,所以可以共享一个内存queue。
由于2个框架启动消费方式的区别,memory queue在 celery中是六等公民,但在 funboost 中是超一等公民。
"""
MEMORY_QUEUE = 'MEMORY_QUEUE' # 使用python queue.Queue实现的基于当前python进程的消息队列,不支持跨进程 跨脚本 跨机器共享任务,不支持持久化,适合一次性短期简单任务。
LOCAL_PYTHON_QUEUE = MEMORY_QUEUE # 别名,python本地queue就是基于python自带的语言的queue.Queue,消息存在python程序的内存中,不支持重启断点接续。
# 高性能内存队列,使用 collections.deque 代替 queue.Queue,去除不必要的 task_done/join 开销
# 性能比 MEMORY_QUEUE 提升 2-5 倍,支持批量拉取消息(通过 broker_exclusive_config={'pull_msg_batch_size': 1000})
FASTEST_MEM_QUEUE = 'FASTEST_MEM_QUEUE'
RABBITMQ_PIKA = 'RABBITMQ_PIKA' # 使用pika包操作rabbitmq 作为 分布式消息队列。,不建议使用
MONGOMQ = 'MONGOMQ' # 使用mongo的表中的行模拟的 作为分布式消息队列,支持消费确认。
SQLITE_QUEUE = 'sqlite3' # 使用基于sqlite3模拟消息队列,支持消费确认和持久化,但不支持跨机器共享任务,可以基于本机单机跨脚本和跨进程共享任务,好处是不需要安装中间件。
PERSISTQUEUE = SQLITE_QUEUE # PERSISTQUEUE的别名
NSQ = 'NSQ' # 基于nsq作为分布式消息队列,支持消费确认。
KAFKA = 'KAFKA' # 基于kafka作为分布式消息队列,如果随意重启会丢失消息,建议使用BrokerEnum.CONFLUENT_KAFKA。
"""基于confluent-kafka包,包的性能比kafka-python提升10倍。同时应对反复随意重启部署消费代码的场景,此消费者实现至少消费一次,第8种BrokerEnum.KAFKA是最多消费一次。"""
KAFKA_CONFLUENT = 'KAFKA_CONFLUENT'
CONFLUENT_KAFKA = KAFKA_CONFLUENT
KAFKA_CONFLUENT_SASlPlAIN = 'KAFKA_CONFLUENT_SASlPlAIN' # 可以设置账号密码的kafka
SQLACHEMY = 'SQLACHEMY' # 基于SQLACHEMY 的连接作为分布式消息队列中间件支持持久化和消费确认。支持mysql oracle sqlserver等5种数据库。
ROCKETMQ = 'ROCKETMQ' # 基于 rocketmq 作为分布式消息队列,这个中间件必须在linux下运行,win不支持。
ZEROMQ = 'ZEROMQ' # 基于zeromq作为分布式消息队列,不需要安装中间件,可以支持跨机器但不支持持久化。
"""
kombu 和 celery 都是 funboost中的神级别broker_kind。
使得funboost以逸待劳,支持kombu的所有现有和未来的消息队列。
通过直接支持 kombu,funboost 相当于一瞬间就继承了 `kombu` 支持的所有现有和未来的消息队列能力。无论 kombu 社区未来增加了对哪种新的云消息服务(如 Google
Pub/Sub、Azure Service Bus)或小众 MQ 的支持,funboost 无需修改自身代码,就能自动获得这种能力。这
是一种“以逸待劳”的策略,极大地扩展了 funboost 的适用范围。
kombu 包可以作为funboost的broker,这个包也是celery的中间件依赖包,这个包可以操作10种中间件(例如rabbitmq redis),但没包括分布式函数调度框架的kafka nsq zeromq 等。
同时 kombu 包的性能非常差,可以用原生redis的lpush和kombu的publish测试发布,使用brpop 和 kombu 的 drain_events测试消费,对比差距相差了5到10倍。
由于性能差,除非是分布式函数调度框架没实现的中间件才选kombu方式(例如kombu支持亚马逊队列 qpid pyro 队列),否则强烈建议使用此框架的操作中间件方式而不是使用kombu。
"""
KOMBU = 'KOMBU'
""" 基于emq作为中间件的。这个和上面的中间件有很大不同,服务端不存储消息。所以不能先发布几十万个消息,然后再启动消费。mqtt优点是web前后端能交互,
前端不能操作redis rabbitmq kafka,但很方便操作mqtt。这种使用场景是高实时的互联网接口。
"""
MQTT = 'MQTT'
HTTPSQS = 'HTTPSQS' # httpsqs中间件实现的,基于http协议操作,dcoker安装此中间件简单。
PULSAR = 'PULSAR' # 最有潜力的下一代分布式消息系统。5年后会同时取代rabbitmq和kafka。
UDP = 'UDP' # 基于socket udp 实现的,需要先启动消费端再启动发布,支持分布式但不支持持久化,好处是不需要安装消息队列中间件软件。
TCP = 'TCP' # 基于socket tcp 实现的,需要先启动消费端再启动发布,支持分布式但不支持持久化,好处是不需要安装消息队列中间件软件。
HTTP = 'HTTP' # 基于http实现的,发布使用的urllib3,消费服务端使用的aiohttp.server实现的,支持分布式但不支持持久化,好处是不需要安装消息队列中间件软件。
GRPC = 'GRPC' # 使用知名grpc作为broker,可以使用 sync_call 方法同步获取grpc的结果, 简单程度暴击用户手写原生的 grpc客户端 服务端
NATS = 'NATS' # 高性能中间件nats,中间件服务端性能很好,。
TXT_FILE = 'TXT_FILE' # 磁盘txt文件作为消息队列,支持单机持久化,不支持多机分布式。不建议这个,用sqlite。
PEEWEE = 'PEEWEE' # peewee包操作mysql,使用表模拟消息队列
CELERY = 'CELERY' # funboost支持celery框架来发布和消费任务,由celery框架来调度执行任务,但是写法简单远远暴击用户亲自使用celery的麻烦程度,
# 用户永无无需关心和操作Celery对象实例,无需关心celery的task_routes和includes配置,funboost来自动化设置这些celery配置。
# funboost将Celery本身纳入了自己的Broker体系。能“吞下”另一个大型框架,简直太妙了。本身就证明了funboost架构的包容性和精妙性和复杂性。
DRAMATIQ = 'DRAMATIQ' # funboost使用 dramatiq 框架作为消息队列,dramatiq类似celery也是任务队列框架。用户使用funboost api来操作dramatiq核心调度。
HUEY = 'HUEY' # huey任务队列框架作为funboost调度核心
RQ = 'RQ' # rq任务队列框架作为funboost调度核心
NAMEKO = 'NAMEKO' # funboost支持python微服务框架nameko,用户无需掌握nameko api语法,就玩转python nameko微服务
"""
MYSQL_CDC 是 funboost 中 神奇 的 与众不同的 broker 中间件
mysql binlog cdc 自动作为消息,用户无需手动发布消息,只需要写处理binlog内容的逻辑,
一行代码就能轻量级实现 mysql2mysql mysql2kafka mysql2rabbitmq 等等.
这个是与其他中间件不同,不需要手工发布消息, 任何对数据库的 insert update delete 会自动作为 funboost 的消息.
几乎是轻量级平替 canal flinkcdc 的作用.
以此类推, 日志文件也能扩展作为broker,只要另外一个程序写入了文件日志,就能触发funboost消费,
然后自己在函数逻辑把消息发到kafka,(虽然是已经有大名鼎鼎elk,这只是举个场景例子,说明funboost broker的灵活性)
日志文件、文件系统变更(inotify)、甚至是硬件传感器的信号,按照4.21章节文档,都可以被封装成一个 funboost 的 Broker。
充分说明 funboost 有能力化身为 通用的、事件驱动的函数调度平台,而非仅仅是celery这种传统的消息驱动.
"""
"""
funboost 有能力消费canal发到kafka的binlog消息,也能不依赖canal,自己捕获cdc数据
"""
MYSQL_CDC = 'MYSQL_CDC'
SQS = 'SQS' # aws sqs ,虽然 funboost 支持 kombu ,kombu支持sqs,所以 funboost间接支持了sqs,但原生实现逻辑更清晰,比kombu性能更强
"""
原生 PostgreSQL 中间件,充分利用 PostgreSQL 独有特性:
1. FOR UPDATE SKIP LOCKED - 高并发无锁竞争,多消费者不阻塞
2. LISTEN/NOTIFY - 原生发布订阅机制,实时推送无需轮询
3. 支持任务优先级
相比 SQLACHEMY 通用实现性能更好,实时性更强
"""
POSTGRES = 'POSTGRES'
WATCHDOG = 'WATCHDOG' # 使用python watchdog 库监控文件夹文件变更事件,自动触发消费
WEBSOCKET = 'WEBSOCKET' # 使用websocket作为broker,支持实时双向通信
你项目根目录下自动生成的 funboost_config.py 文件中修改配置,会被自动读取到。
此文件按需修改,例如你使用redis中间件作为消息队列,可以不用管rabbitmq mongodb kafka啥的配置。
但有3个功能例外,如果你需要使用rpc模式或者分布式控频或者任务过滤功能,无论设置使用何种消息队列中间件都需要把redis连接配置好,
如果@boost装饰器设置is_using_rpc_mode为True或者 is_using_distributed_frequency_control为True或do_task_filtering=True则需要把redis连接配置好,默认是False。
3.2 框架支持的函数调度并发模式种类详细介绍
1、threading 多线程,使用自定义的可缩小、节制开启新线程的自定义线程池,不是直接用官方内置concurrent.futures.ThreadpoolExecutor
此线程池非常智能,配合qps参数,任何场景可以无脑开500线程,真正的做到智能扩张,智能自动缩小。
这线程池是智能线程池,由于非常好用,为这个线程池做了独立的pypi包,可以单独用于没有使用此框架的项目。
2、gevent 需要在运行起点的脚本首行打 gevent 猴子补丁。
3、eventlet 需要在运行起点的脚本首行打 eventlet 猴子补丁。
4、asyncio async异步,主要是针对消费函数已经定义成了 async def fun(x) 这种情况,这种情况不能直接使用多线程,
因为执行 fun(1) 后得到的并不是所想象的函数最终结果,而是得到的一个协程对象,所以针对已经定义成异步函数了的,需要使用此种并发模式。
框架不鼓励用户定义异步函数,你就用同步的直观方式思维定义函数就行了,其余的并发调度交给框架就行了。
5、开启多进程启动多个consumer,此模式是 多进程 + 上面4种的其中一种并发方式,充分利用多核和充分利用io,用法如下。可以实现 多进程 叠加 协程并发。
# 这种是多进程方式,一次编写能够兼容win和linux的运行。
from funboost import boost, BrokerEnum, ConcurrentModeEnum
import os
@boost('test_multi_process_queue',broker_kind=BrokerEnum.REDIS_ACK_ABLE,
concurrent_mode=ConcurrentModeEnum.THREADING,)
def fff(x):
print(x * 10,os.getpid())
if __name__ == '__main__':
fff.multi_process_consume(6) # 一次性启动6进程叠加多线程。
3.3 框架最最重要的boost装饰器的BoosterParams参数入参大全
class BoosterParams(BaseJsonAbleModel):
"""
掌握funboost 的精华就是知道 BoosterParams 的入参有哪些,如果知道有哪些入参字段,就掌握了funboost的 90% 用法。
pydatinc pycharm编程代码补全,请安装 pydantic插件, 在pycharm的 file -> settings -> Plugins -> 输入 pydantic 搜索,点击安装 pydantic 插件.
@boost的传参必须是此类或者继承此类,如果你不想每个装饰器入参都很多,你可以写一个子类继承BoosterParams, 传参这个子类,例如下面的 BoosterParamsComplete
"""
queue_name: str # 队列名字,必传项,每个函数要使用不同的队列名字.
broker_kind: str = BrokerEnum.SQLITE_QUEUE # 中间件选型见3.1章节 https://funboost.readthedocs.io/zh-cn/latest/articles/c3.html
""" project_name是项目名,属于管理层面的标签, 默认为None, 给booster设置所属项目名, 用于对于在redis保存的funboost信息中,根据项目名字查看相关队列。
# 如果不设置很难从redis保存的funboost信息中,区分哪些队列名属于哪个项目。 主要是给web接口查看用。
# 一个项目的队列名字有哪些,是保存在redis的set中,key为 f'funboost.project_name:{project_name}'
# 通常配合 CareProjectNameEnv.set($project_name) 使用 ,它可以让你在监控和管理时“只看自己的一亩三分地“,避免被其他人的队列刷屏干扰。"""
project_name: typing.Optional[str] = None
"""如果设置了qps,并且cocurrent_num是默认的50,会自动开了500并发,由于是采用的智能线程池任务少时候不会真开那么多线程而且会自动缩小线程数量。
具体看ThreadPoolExecutorShrinkAble的说明
由于有很好用的qps控制运行频率和智能扩大缩小的线程池,此框架建议不需要理会和设置并发数量只需要关心qps就行了,框架的并发是自适应并发数量,这一点很强很好用。"""
concurrent_mode: str = ConcurrentModeEnum.THREADING # 并发模式,支持THREADING,GEVENT,EVENTLET,ASYNC,SINGLE_THREAD并发,multi_process_consume 支持协程/线程 叠加多进程并发,性能炸裂.
concurrent_num: int = 50 # 并发数量,并发种类由concurrent_mode决定
specify_concurrent_pool: typing.Optional[FunboostBaseConcurrentPool] = None # 使用指定的线程池/携程池,可以多个消费者共使用一个线程池,节约线程.不为None时候。threads_num失效
specify_async_loop: typing.Optional[asyncio.AbstractEventLoop] = None # 指定的async的loop循环,设置并发模式为async才能起作用。 有些包例如aiohttp,发送请求和httpclient的实例化不能处在两个不同的loop中,可以传过来.
is_auto_start_specify_async_loop_in_child_thread: bool = True # 是否自动在funboost asyncio并发池的子线程中自动启动指定的async的loop循环,设置并发模式为async才能起作用。如果是False,用户自己在自己的代码中去手动启动自己的loop.run_forever()
"""qps:
强悍的控制功能,指定1秒内的函数执行次数,例如可以是小数0.01代表每100秒执行一次,也可以是50代表1秒执行50次.为None则不控频。
设置qps时候,不需要指定并发数量,funboost的能够自适应智能动态调节并发池大小."""
qps: typing.Union[float, int, None] = None
"""is_using_distributed_frequency_control:
是否使用分布式空频(依赖redis统计消费者数量,然后频率平分),默认只对当前实例化的消费者空频有效。
假如实例化了2个qps为10的使用同一队列名的消费者,并且都启动,则每秒运行次数会达到20。
如果使用分布式空频则所有消费者加起来的总运行次数是10。"""
is_using_distributed_frequency_control: bool = False
is_send_consumer_heartbeat_to_redis: bool = False # 是否将发布者的心跳发送到redis,有些功能的实现需要统计活跃消费者。因为有的中间件不是真mq。这个功能,需要安装redis.
# --------------- 重试配置 开始
"""max_retry_times:
最大自动重试次数,当函数发生错误,立即自动重试运行n次,对一些特殊不稳定情况会有效果。
可以在函数中主动抛出重试的异常ExceptionForRetry,框架也会立即自动重试。
主动抛出ExceptionForRequeue异常,则当前 消息会重返中间件,
主动抛出 ExceptionForPushToDlxqueue 异常,可以使消息发送到单独的死信队列中,死信队列的名字是 队列名字 + _dlx。"""
max_retry_times: int = 3
"""
is_using_advanced_retry:
是否使用高级重试,高级重试支持指数退避重试。is_using_advanced_retry=True 时候,is_using_advanced_retry参数才能生效。
重试间隔有2种模式,requeue 模式 和 sleep 模式。
requeue模式: 将消息发回队列带延迟重试,立即释放线程,适合重试间隔大,防止长时间sleep占用线程导致降低了系统吞吐量。
sleep模式: 在当前线程/协程 sleep 重试,适合重试间隔小,并且消息数量少执行不频繁。此模式的sleep时候会占用工作线程/协程
如果retry_base_interval= 1.0,retry_multiplier=2.0,retry_max_interval=60.0,则重试间隔为:
1s,2s,4s,8s,16s,32s,60s,60s,60s...
"""
is_using_advanced_retry: bool = False
advanced_retry_config :dict = {
'retry_mode': 'sleep', # 可以是 'sleep' 或 'requeue',如果重试间隔大并且指数退避倍数大,那么应该使用requeue模式。
'retry_base_interval': 1.0, # 基础重试间隔(秒) 1s,2s,4s,8s,16s,30s,30s,30s...
'retry_multiplier': 2.0, # 指数退避倍数 ,如果你想固定重试间隔,则设置为1.0
'retry_max_interval': 60.0, # 最大重试间隔上限(秒)
'retry_jitter': False, # 是否添加随机抖动
}
is_push_to_dlx_queue_when_retry_max_times: bool = False # 函数达到最大重试次数仍然没成功,是否发送到死信队列,死信队列的名字是 队列名字 + _dlx。
# --------------- 重试配置 结束
consuming_function_decorator: typing.Optional[typing.Callable[..., typing.Any]] = None # 函数的装饰器。因为此框架做参数自动转指点,需要获取精准的入参名称,不支持在消费函数上叠加 @ *args **kwargs的装饰器,如果想用装饰器可以这里指定。
"""
function_timeout:
超时秒数,函数运行超过这个时间,则自动杀死函数。为0是不限制。
用户应该尽量使用各种三方包例如 aiohttp pymysql 自己的 socket timeout 设置来控制超时,而不是无脑使用funboost的function_timeout参数。
谨慎使用,非必要别去设置超时时间,设置后性能会降低(因为需要把用户函数包装到另一个线单独的程中去运行),而且突然强制超时杀死运行中函数,可能会造成死锁.
(例如用户函数在获得线程锁后突然杀死函数,别的线程再也无法获得锁了)
"""
function_timeout: typing.Union[int, float,None] = None
"""
is_support_remote_kill_task:
是否支持远程任务杀死功能,如果任务数量少,单个任务耗时长,确实需要远程发送命令来杀死正在运行的函数,才设置为true,否则不建议开启此功能。
(是把函数放在单独的线程中实现的,随时准备线程被远程命令杀死,所以性能会降低)
"""
is_support_remote_kill_task: bool = False
"""
log_level:
logger_name 对应的 日志级别
消费者和发布者的日志级别,建议设置DEBUG级别,不然无法知道正在运行什么消息.
这个是funboost每个队列的单独命名空间的日志级别,丝毫不会影响改变用户其他日志以及root命名空间的日志级别,所以DEBUG级别就好,
用户不要压根不懂什么是python logger 的name,还去手痒调高级别.
不懂python日志命名空间的小白去看nb_log文档,或者直接问 ai大模型 python logger name的作用是什么.
"""
log_level: int = logging.DEBUG # 不需要改这个级别,请看上面原因
logger_prefix: str = '' # 日志名字前缀,可以设置前缀
create_logger_file: bool = True # 发布者和消费者是否创建文件文件日志,为False则只打印控制台不写文件.
logger_name: typing.Union[str, None] = '' # 队列消费者发布者的日志命名空间.
log_filename: typing.Union[str, None] = None # 消费者发布者的文件日志名字.如果为None,则自动使用 funboost.队列 名字作为文件日志名字. 日志文件夹是在nb_log_config.py的 LOG_PATH中决定的.
is_show_message_get_from_broker: bool = False # 运行时候,是否记录从消息队列获取出来的消息内容
is_print_detail_exception: bool = True # 消费函数出错时候,是否打印详细的报错堆栈,为False则只打印简略的报错信息不包含堆栈.
publish_msg_log_use_full_msg: bool = False # 发布到消息队列的消息内容的日志,是否显示消息的完整体,还是只显示函数入参。
msg_expire_seconds: typing.Union[float, int,None] = None # 消息过期时间,可以设置消息是多久之前发布的就丢弃这条消息,不运行. 为None则永不丢弃
do_task_filtering: bool = False # 是否对函数入参进行过滤去重.
task_filtering_expire_seconds: int = 0 # 任务过滤的失效期,为0则永久性过滤任务。例如设置过滤过期时间是1800秒 , 30分钟前发布过1 + 2 的任务,现在仍然执行,如果是30分钟以内执行过这个任务,则不执行1 + 2
function_result_status_persistance_conf: FunctionResultStatusPersistanceConfig = FunctionResultStatusPersistanceConfig(
is_save_result=False, is_save_status=False, expire_seconds=7 * 24 * 3600, is_use_bulk_insert=False) # 是否保存函数的入参,运行结果和运行状态到mongodb。这一步用于后续的参数追溯,任务统计和web展示,需要安装mongo。
user_custom_record_process_info_func: typing.Optional[typing.Callable[..., typing.Any]] = None # 提供一个用户自定义的保存消息处理记录到某个地方例如mysql数据库的函数,函数仅仅接受一个入参,入参类型是 FunctionResultStatus,用户可以打印参数
is_using_rpc_mode: bool = False # 是否使用rpc模式,可以在发布端获取消费端的结果回调,但消耗一定性能,使用async_result.result时候会等待阻塞住当前线程。
rpc_result_expire_seconds: int = 1800 # redis保存rpc结果的过期时间.
rpc_timeout:int = 1800 # rpc模式下,等待rpc结果返回的超时时间
delay_task_apscheduler_jobstores_kind :Literal[ 'redis', 'memory'] = 'redis' # 延时任务的aspcheduler对象使用哪种jobstores ,可以为 redis memory 两种作为jobstore
"""
allow_run_time_cron:
只允许在规定的crontab表达式时间内运行。
例如 '* 23,0-2 * * *' 表示只在23点到2点运行。
allow_run_time_cron='* 9-17 * * 1-5', 表示只在周一到周五的9点到17:59:59运行。
为None则不限制运行时间。
语法是知名 croniter 包的语法,不是funboost创造的特殊语法,用户自己去google或者ai学习语法。
"""
allow_run_time_cron: typing.Optional[str] = None
schedule_tasks_on_main_thread: bool = False # 直接在主线程调度任务,意味着不能直接在当前主线程同时开启两个消费者。
is_auto_start_consuming_message: bool = False # 是否在定义后就自动启动消费,无需用户手动写 .consume() 来启动消息消费。
# booster_group :消费分组名字, BoostersManager.consume_group 时候根据 booster_group 启动多个消费函数,减少需要写 f1.consume() f2.consume() ...这种。
# 不像BoostersManager.consume_all() 会启动所有不相关消费函数,也不像 f1.consume() f2.consume() 这样需要逐个启动消费函数。
# 可以根据业务逻辑创建不同的分组,实现灵活的消费启动策略。
# 用法见文档 4.2d.3 章节. 使用 BoostersManager ,通过 consume_group 启动一组消费函数
booster_group:typing.Union[str, None] = None
consuming_function: typing.Optional[typing.Callable[..., typing.Any]] = None # 消费函数,在@boost时候不用指定,因为装饰器知道下面的函数.
consuming_function_raw: typing.Optional[typing.Callable[..., typing.Any]] = None # 不需要传递,自动生成
consuming_function_name: str = '' # 不需要传递,自动生成
"""
# 加上一个不同种类中间件非通用的配置,不同中间件自身独有的配置,不是所有中间件都兼容的配置,因为框架支持30种消息队列,消息队列不仅仅是一般的先进先出queue这么简单的概念,
# 例如kafka支持消费者组,rabbitmq也支持各种独特概念例如各种ack机制 复杂路由机制,有的中间件原生能支持消息优先级有的中间件不支持,
# 每一种消息队列都有独特的配置参数意义,可以通过这里传递。
# 每种中间件能传递的键值对可以看 funboost/core/broker_kind__exclusive_config_default.py 的 BROKER_EXCLUSIVE_CONFIG_DEFAULT 属性。
"""
broker_exclusive_config: dict = {}
should_check_publish_func_params: bool = True # 消息发布时候是否校验消息发布内容,比如有的人发布消息,函数只接受a,b两个入参,他去传2个入参,或者传参不存在的参数名字; 如果消费函数加了装饰器 ,你非要写*args,**kwargs,那就需要关掉发布消息时候的函数入参检查
manual_func_input_params :dict= {'is_manual_func_input_params': False,'must_arg_name_list':[],'optional_arg_name_list':[]} # 也可以手动指定函数入参字段,默认是根据消费函数def定义的入参来生成这个。
consumer_override_cls: typing.Optional[typing.Type] = None # 使用 consumer_override_cls 和 publisher_override_cls 来自定义重写或新增消费者 发布者,见文档4.21b介绍,
publisher_override_cls: typing.Optional[typing.Type] = None
# func_params_is_pydantic_model: bool = False # funboost 兼容支持 函数娼还是 pydantic model类型,funboost在发布之前和取出来时候自己转化。
consuming_function_kind: typing.Optional[str] = None # 自动生成的信息,不需要用户主动传参,如果自动判断失误就传递。是判断消费函数是函数还是实例方法还是类方法。如果传递了,就不自动获取函数类型。
""" consuming_function_kind 可以为以下类型,
class FunctionKind:
CLASS_METHOD = 'CLASS_METHOD'
INSTANCE_METHOD = 'INSTANCE_METHOD'
STATIC_METHOD = 'STATIC_METHOD'
COMMON_FUNCTION = 'COMMON_FUNCTION'
"""
"""
user_options:
用户额外自定义的配置,高级用户或者奇葩需求可以用得到,用户可以自由发挥,存放任何设置.
user_options 提供了一个统一的、用户自定义的命名空间,让用户可以为自己的“奇葩需求”或“高级定制”传递配置,而无需等待框架开发者添加官方支持。
funboost 是自由框架不是奴役框架,不仅消费函数逻辑自由,目录层级结构自由,自定义奇葩扩展也要追求自由,用户不用改funboost BoosterParams 源码来加装饰器参数
使用场景见文档 4b.6 章节.
"""
user_options: dict = {} # 用户自定义的配置,高级用户或者奇葩需求可以用得到,用户可以自由发挥,存放任何设置,例如配合 consumer_override_cls中读取 或 register_custom_broker 使用
auto_generate_info: dict = {} # 自动生成的信息,不需要用户主动传参.例如包含 final_func_input_params_info 和 where_to_instantiate 等。
"""# is_fake_booster:是否是伪造的booster,
# 用于faas模式下,因为跨项目的faas管理只拿到了redis的一些基本元数据,没有booster的函数逻辑,
# 例如ApsJobAdder管理定时任务,需要booster,但没有真实的函数逻辑,
# 你可以看 SingleQueueConusmerParamsGetter.gen_booster_for_faas 的用法,目前主要是控制不要执行 BoostersManager.regist_booster
# 普通用户完全不用改这个参数。
"""
is_fake_booster: bool = False
# 普通用户不用管不用改,用于隔离boosters注册。例如faas的是虚假的跨服务跨项目的booster,没有具体函数逻辑,不可污染真正的注册。
# 如果你是想分组启动部分booster,那你应该用的是 booster_group 参数。
booster_registry_name: str = StrConst.BOOSTER_REGISTRY_NAME_DEFAULT
关于boost参数太多的说明:
有人会抱怨入参超多很复杂,是因为要实现一切控制方式,实现的运行控制手段非常丰富,所以参数就会多。
看这个里面的参数解释非常重要,几乎能想到的控制功能全部都有。比如有人说日志太多,不想看那么详细的提示日志
,早就通过参数提供实现了,自己抱怨参数多又以为没提供这个功能,简直是自相矛盾。
想入参参数少那就看新增的那个10行代码的函数的最精简乞丐版实现的分布式函数执行框架,演示最本质实现原理。“
这个例子的框架啥控制手段都没有,参数自然就很少。
乞丐版分布式函数调度框架的代码在
funboost/beggar_version_implementation/beggar_redis_consumer.py
3.3.1 funboost_config.py 配置文件内容
funboost_config.py 配置文件主要是配置各种消息队列的连接信息账号 密码 地址等,任务控制功能则是在 BoostParams 中传参.
所以 funboost_config.py 除了第一次配置你需要使用到的中间件类型的ip 密码正确即可,例如你只使用redis做中间件,完全无视kafka rabbitmq 等配置,后续基本很少改动,因为任务控制参数全部是在 BoostParams 中传参.
funboost_config.py 就是 从 funboost/funboost_config_deafult.py 自动复制拷贝的,你只需要在你的 funboost_config.py 里面修改配置,funboost框架就能自动使用你的配置,因为 funboost_config.py 会覆盖 funboost/funboost_config_deafult.py的配置.
funboost_config.py 是第一次运行任何任意funboost项目代码自动生成到项目根目录的,也可以放在磁盘任何位置,只要文件夹是在pythonpath之一就好了,
因为是自动 import funboost_config ,只需要能被 import 到就好了.
可以查看 6.18.3 自问自答,"怎么指定配置文件读取 funboost_config.py 和nb_log_config.py的文件夹位置"
funboost_config.py 默认内容如下:
# -*- coding: utf-8 -*-
import logging
from pathlib import Path
from funboost.utils.simple_data_class import DataClassBase
from nb_log import nb_log_config_default
'''
funboost_config.py 文件是第一次运行框架自动生成到你的项目根目录的,不需要用由户手动创建。
此文件里面可以写任意python代码。例如 中间件 帐号 密码自己完全可以从apola配置中心获取或者从环境变量获取。
'''
'''
你项目根目录下自动生成的 funboost_config.py 文件中修改配置,会被自动读取到。
用户不要动修改框架的源码 funboost/funboost_config_deafult.py 中的代码,此模块的变量会自动被 funboost_config.py 覆盖。
funboost/funboost_config_deafult.py配置覆盖逻辑可看funboost/set_frame_config.py中的代码.
框架使用文档是 https://funboost.readthedocs.io/zh_CN/latest/
'''
class BrokerConnConfig(DataClassBase):
"""
中间件连接配置
此文件按需修改,例如你使用redis中间件作为消息队列,可以不用管rabbitmq mongodb kafka啥的配置。
但有3个功能例外,如果你需要使用rpc模式或者分布式控频或者任务过滤功能,无论设置使用何种消息队列中间件都需要把redis连接配置好,
如果@boost装饰器设置is_using_rpc_mode为True或者 is_using_distributed_frequency_control为True或do_task_filtering=True则需要把redis连接配置好,默认是False不强迫用户安装redis。
"""
MONGO_CONNECT_URL = f'mongodb://127.0.0.1:27017' # 如果有密码连接 'mongodb://myUserAdmin:XXXXX@192.168.199.202:27016/' authSource 指定鉴权db,MONGO_CONNECT_URL = 'mongodb://root:123456@192.168.64.151:27017?authSource=admin'
RABBITMQ_USER = 'rabbitmq_user'
RABBITMQ_PASS = 'rabbitmq_pass'
RABBITMQ_HOST = '127.0.0.1'
RABBITMQ_PORT = 5672
RABBITMQ_VIRTUAL_HOST = '' # my_host # 这个是rabbitmq的虚拟子host用户自己创建的,如果你想直接用rabbitmq的根host而不是使用虚拟子host,这里写 空字符串 即可。
RABBITMQ_URL = f'amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VIRTUAL_HOST}'
REDIS_HOST = '127.0.0.1'
REDIS_USERNAME = ''
REDIS_PASSWORD = ''
REDIS_PORT = 6379
REDIS_DB = 7 # redis消息队列所在db,请不要在这个db放太多其他键值对,以及方便你自己可视化查看你的redis db,使用单独的db。
REDIS_DB_FILTER_AND_RPC_RESULT = 8 # 如果函数做任务参数过滤 或者使用rpc获取结果,使用这个db,因为这个db的键值对多,和redis消息队列db分开
REDIS_SSL = False # 是否使用ssl加密,默认是False
REDIS_URL = f'{"rediss" if REDIS_SSL else "redis"}://{REDIS_USERNAME}:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'
NSQD_TCP_ADDRESSES = ['127.0.0.1:4150']
NSQD_HTTP_CLIENT_HOST = '127.0.0.1'
NSQD_HTTP_CLIENT_PORT = 4151
KAFKA_BOOTSTRAP_SERVERS = ['127.0.0.1:9092']
KFFKA_SASL_CONFIG = {
"bootstrap_servers": KAFKA_BOOTSTRAP_SERVERS,
"sasl_plain_username": "",
"sasl_plain_password": "",
"sasl_mechanism": "SCRAM-SHA-256",
"security_protocol": "SASL_PLAINTEXT",
}
SQLACHEMY_ENGINE_URL = 'sqlite:////sqlachemy_queues/queues.db'
# 如果broker_kind 使用 peewee 中间件模式会使用mysql配置
MYSQL_HOST = '127.0.0.1'
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_PASSWORD = '123456'
MYSQL_DATABASE = 'testdb6'
# persist_quque中间件时候采用本机sqlite的方式,数据库文件生成的位置,如果linux账号在根目录没权限建文件夹,可以换文件夹。
SQLLITE_QUEUES_PATH = '/sqllite_queues'
TXT_FILE_PATH = Path(__file__).parent / 'txt_queues' # 不建议使用这个txt模拟消息队列中间件,本地持久化优先选择 PERSIST_QUQUE 中间件。
ROCKETMQ_NAMESRV_ADDR = '192.168.199.202:9876'
MQTT_HOST = '127.0.0.1'
MQTT_TCP_PORT = 1883
HTTPSQS_HOST = '127.0.0.1'
HTTPSQS_PORT = 1218
HTTPSQS_AUTH = '123456'
NATS_URL = 'nats://192.168.6.134:4222'
KOMBU_URL = 'redis://127.0.0.1:6379/9' # 这个就是celery依赖包kombu使用的消息队列格式,所以funboost支持一切celery支持的消息队列种类。
# KOMBU_URL = 'sqla+sqlite:////dssf_kombu_sqlite.sqlite' # 4个//// 代表磁盘根目录下生成一个文件。推荐绝对路径。3个///是相对路径。
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/12' # 使用celery作为中间件。funboost新增支持celery框架来运行函数,url内容就是celery的broker形式.
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/13' # celery结果存放,可以为None
DRAMATIQ_URL = RABBITMQ_URL
PULSAR_URL = 'pulsar://192.168.70.128:6650'
class FunboostCommonConfig(DataClassBase):
# nb_log包的第几个日志模板,内置了7个模板,可以在你当前项目根目录下的nb_log_config.py文件扩展模板。
# NB_LOG_FORMATER_INDEX_FOR_CONSUMER_AND_PUBLISHER = 11 # 7是简短的不可跳转,5是可点击跳转的,11是可显示ip 进程 线程的模板,也可以亲自设置日志模板不传递数字。
NB_LOG_FORMATER_INDEX_FOR_CONSUMER_AND_PUBLISHER = logging.Formatter(
f'%(asctime)s-({nb_log_config_default.computer_ip},{nb_log_config_default.computer_name})-[p%(process)d_t%(thread)d] - %(name)s - "%(filename)s:%(lineno)d" - %(funcName)s - %(levelname)s - %(task_id)s - %(message)s',
"%Y-%m-%d %H:%M:%S",) # 这个是带task_id的日志模板,日志可以显示task_id,方便用户串联起来排查某一个任务消息的所有日志.
TIMEZONE = 'Asia/Shanghai' # 时区
# 以下配置是修改funboost的一些命名空间和启动时候的日志级别,新手不熟练就别去屏蔽日志了
SHOW_HOW_FUNBOOST_CONFIG_SETTINGS = True # 如果你单纯想屏蔽 "分布式函数调度框架会自动导入funboost_config模块当第一次运行脚本时候,函数调度框架会在你的python当前项目的根目录下 ...... " 这句话,
FUNBOOST_PROMPT_LOG_LEVEL = logging.DEBUG # funboost启动时候的相关提示语,用户可以设置这个命名空间的日志级别来调整
KEEPALIVETIMETHREAD_LOG_LEVEL = logging.DEBUG # funboost的作者发明的可缩小自适应线程池,用户对可变线程池的线程创建和销毁线程完全无兴趣,可以提高日志级别.
3.3.2 funboost 重要公有方法大全介绍
仔细看以下代码注释,函数的功能
import json
import time
from funboost import boost, BrokerEnum,TaskOptions,BoosterParams
@boost(BoosterParams(queue_name='queue1', broker_kind=BrokerEnum.REDIS, qps=0.2))
def f(x, y):
return x + y
@boost(BoosterParams(queue_name='queue2', broker_kind=BrokerEnum.REDIS, qps=7))
def f2(a, b):
return a - b
if __name__ == '__main__':
f.clear() # 清空f函数对应的queue1所有消息
for i in range(10):
f.push(i, i * 2) # 使用push发布消息到queue1,push的入参和正常调用函数一样
f2.publish({'a': i, 'b': i * 2},task_options=TaskOptions(msg_expire_seconds=30)) # # 使用publish发布消息到queue2,publish的入参第一个参数是一个字典,把所有参数组成一个字典,还可以传入其他参数。publish更强大。
print(f.get_message_count()) # 获取消息队列中的消息数量
f.consume() # 在当前进程启动多线程/协程消费
f2.multi_process_consume(3) # 启动3个进程,每个进程内部都启动多线程/协程消费,性能炸裂。
重要方法就是 @boost装饰器的入参,被@boost装饰的消费函数自动有funboost框架的功能
其中最常见的是:
push,推送消息到消息队列
consume, 在当前进程启动多线程/协程消费
multi_process_consume(n) ,启动多个进程,每个进程内部叠加多线程/协程,性能更强.
冷门方法
除以上方法外,还有其他的不常用的方法,在第四章有介绍,
在pycharm中可以代码补全有哪些方法,自己按照方法名字就能猜出是什么意思了。也可以点进去boost装饰器里面去,里面有每个方法的注释说明。
例如 f.pause_consume() 可以从python解释器的外部远程,让已经启动queue1的消费函数停止消费,f.continue_consume() 继续消费。
3.3.3 boost装饰器 的 concurrent_num 和 qps 之间的关系。
concurrent_num:并发数量。
qps qps是有个很有趣的参数,能精确控制函数每秒运行多少次。
concurrent_num和qps存在着一定的关系。
例如对于下面这个函数
def func(x):
time.sleep(2)
print(x)
1)如果设置 concurrent_num = 1000(或100万) qps = 10
那么一秒钟会执行10次func函数。如果不指定qps的值,则不进行控频,消费框架会平均每秒钟会执行50次函数func。
如果设置concurrent_num = 1000 qps = 5
那么一秒钟会执行5次func函数。所以可以看到,当你不知道要开多少并发合适的时候,可以粗暴开1000个线程,但要设置一个qps。
那为什么次框架,可以让你粗暴的设置1000设置100万线程呢,并不是做了数字截取,判断线程设置大于多少就自动调小了,此消费框架并没有这样去实现。
而是次框架使用的非concurrent.tutures.ThreadpoolExecutor,是使用的自定义的 ThreadPoolExecutorShrinkAble 线程池,
此线程池其中之一的功能就是节制开更多的线程,因为对于上面的休眠2秒的func函数,如果设置concurrent_num = 1000000 qps = 5,
正常来说开10个线程足够实现每秒执行5次了,此框架在调节线程新增线程时候进行了更多的判断,所以原生线程池不可以设置100万大小,
而ThreadPoolExecutorShrinkAble可以设置为100万大小。
此外ThreadPoolExecutorShrinkAble 实现了线程池自动缩小的功能,这也是原生concurrent.tutures.ThreadpoolExecutor没有的功能。
自动缩小是什么意思呢,比如一段时间任务非常密集1秒钟来了几百个任务,所以当时开启了很多线程来应付,但一段时间后每分钟只来了个把任务,
此时 ThreadPoolExecutorShrinkAble 能够自动缩小线程池,
ThreadPoolExecutorShrinkAble实现了java ThreadpoolExecutor的KeepAliveTime参数的功能,
原生concurrent.tutures.ThreadpoolExecutor线程池即使以后永久不来新任务,之前开的线程数量一致保持这。
关于 ThreadPoolExecutorShrinkAble 的厉害之处,可以参考 https://github.com/ydf0509/threadpool_executor_shrink_able
最终关于 concurrent_num 大小设置为多少,看自己需求,上面说的100万是举个例子,
实际这个参数还被用作为线程池的任务队列的有界队列的大小,所以一定要设置为1000以下,否则如果设置为100万,
从消息中间件预取出的消息过多,造成python内存大、单个消费者掏空消息队列中间件造成别的新启动的消费者无任务可消费、
对于不支持消费确认类型的中间件的随意重启会丢失大量正在运行的任务 等不利影响。
2)上面的func函数,设置 concurrent_num = 1 qps = 100,那会如何呢?
由于你设置的并发是1,对于一个需要2秒运行完成的函数,显然平均每2秒才能执行1次,就是框架真正的只能达到0.5个qps。
所以 concurrent_num 和 qps,既有关系,也不是绝对的关系。
在对一个随机消耗时间的函数进行并发控制时候,如果函数的运行时间是0.5到20秒任意时间不确定的徘徊,你可以设置 concurrent_num = 100,
如果合作方要求了只能1秒钟能让你使用多少次,例如需要精确控频10次,可以设置qps =10,concurrent_num随便搞个 一两百 两三百就行了,
因为是智能的克制的调节线程池大小的,所以不会真的达到concurrent_num的值。
3)qps是个小数可以小于1,如果要设置10秒执行一次函数,则设置qps=0.1
这主要是介绍了 concurrent_num 和qps的关系和设置值,qps是优先,但受到concurrent_num的约束。
3.4 框架的乞丐精简版实现方式
由于框架的功能十分多,如果没学习36种设计模式,就很难看懂源码,现在演示精简实现原理
此精简例子十分之简单明了,就是死循环从中间件取任务然后丢到线程池里面执行。
此代码在 funboost/beggar_version_implementation/beggar_redis_consumer.py
这样简单明了,演示了基本原理,但是这个缺少消费确认(随意重启代码会造成大量任务丢失) qps恒定等20种功能。
def start_consuming_message(queue_name, consume_function, threads_num=50):
pool = ThreadPoolExecutor(threads_num)
while True:
try:
redis_task = redis.brpop(queue_name, timeout=60)
if redis_task:
task_str = redis_task[1].decode()
print(f'从redis的 {queue_name} 队列中 取出的消息是: {task_str}')
pool.submit(consume_function, **json.loads(task_str))
else:
print(f'redis的 {queue_name} 队列中没有任务')
except redis.RedisError as e:
print(e)
if __name__ == '__main__':
import time
def add(x, y):
time.sleep(5)
print(f'{x} + {y} 的结果是 {x + y}')
# 推送任务
for i in range(100):
print(i)
redis.lpush('test_beggar_redis_consumer_queue', json.dumps(dict(x=i, y=i * 2)))
start_consuming_message('test_beggar_redis_consumer_queue', consume_function=add, threads_num=10)
3.5 框架的任务消费确认
此框架可以确保客户端任何时候 随意断电 粗暴重启代码 随意关机,任务万无一失。 3.4演示的精简版框架,实现redis的list的push和pop来模拟消息队列,很明显不靠谱,kill 9 重启代码或者重启电脑很容易会丢失大量任务。 分布式一致性消息传递、事件处理等场景中十分重要,分为3种情况: At most Onece:最多一次,如果算子处理事件失败,算子将不再尝试该事件。 At Least Onece:至少一次,如果算子处理事件失败,算子会再次尝试该处理事件,直到有一次成功。 Exactly-Once:严格地,有且仅处理一次,通常有两种方法实现。 3.4实现的是最多一次,框架在多种中间件使用消费确认实现了万无一失 ,达到了Exactly-Once。 Exactly-Once是最好的也是实现难度最复杂的;At most Onece通常是最差的方式,也是最简单的实现方式。 框架在使用rabbitmq,内置默认了确认消费。 框架在使用redis作为中间件时候,有很多种实现方式,REDIS 是最不靠谱的会丢失消息。 REDIS_ACK_ABLE 、 REDIS_STREAM、 RedisBrpopLpush BrokerKind 这三种都是实现了确认消费。
3.6 框架的设计规范原则
因为使用了oop编程和良好的设计模式,所以 funboost 很容易新增任意消息队列类型以及任何消费框架 作为 funboost的 broker_kind。
目前没遇到集成不到funboost的消息队列类型和消费框架。
源码实现思路基本90%遵守了oop的6个设计原则,很容易扩展中间件。
1、单一职责原则——SRP
2、开闭原则——OCP
3、里式替换原则——LSP
4、依赖倒置原则——DIP
5、接口隔离原则——ISP
6、迪米特原则——LOD
最主要是大量使用了模板模式、工厂模式、策略模式、鸭子类。
可以仿照源码中实现中间件的例子,只需要继承发布者、消费者基类后实现几个抽象方法即可添加新的中间件。