# 15. ⚡ Funboost.FaaS:让函数起飞的 Serverless 引擎 - **Faas 是 Function as a Service 的缩写** - **💡 核心定义**: `funboost.faas` 是框架内置的 **云原生适配器**。 它能将你散落在项目角落里的 Python 消费函数,**零代码** 瞬间转化为可被 HTTP 调用的 **微服务接口 (Microservices)**。 - **它是连接 "后台离线任务" 与 "前台在线业务" 的星际之门。🚪✨** --- ## 15.1 🌟 架构质变:从 Worker 到 Service 在传统架构中,后台任务队列(Worker)往往是孤独的“数据孤岛”。但在 Funboost 的世界里,**每一个函数都是一个潜在的服务**。 | 对比维度 | 🐢 传统模式 (Worker) | 🚀 Funboost FaaS 模式 (Service) | | :--- | :--- | :--- | | **触发方式** | **被动消费**:只能默默等待队列里的消息。 | **主动响应**:可通过 HTTP 接口被外部系统直接唤醒。 | | **可控性** | **黑盒运行**:外界不知道它在干什么,进度如何。 | **透明可控**:外界可查询状态、获取结果、控制暂停/恢复。 | | **灵活性** | **代码耦合**:Web 端需硬编码调用逻辑,改动需重启。 | **热插拔**:基于元数据驱动,Web 端**永不重启**,自动发现新函数。 | --- ## 15.2 💎 核心特性:六大“无限宝石” ### 🔌 **开箱即用 (Out-of-the-Box)** 无需重写业务逻辑。针对 **FastAPI / Flask / Django** 三大主流框架,提供预置的 `Router`。 👉 **一行代码** `include_router`,瞬间挂载 20+ 个管理接口。 ### 🔥 **无缝热发布 (Hot Discovery)** **这是架构上的神来之笔!** * **场景**:你修改了消费函数的逻辑,或者新增了一个处理报表的函数。 * **传统**:修改 Web 代码 -> 提交 -> 审批 -> 重启 Web 服务 -> 可能会由短暂服务不可用。 * **Funboost**:新部署启动一个booster函数即可。Web 网关层 **毫不知情,却能立即调用**。 ### 🔄 **RPC 同步调用 (HTTP-RPC Bridge)** 打破异步边界。前端发起 HTTP 请求,Funboost 自动将其转为 MQ 消息,并挂起 HTTP 请求,直到消费者处理完毕返回结果。 * **效果**:`前端 -> HTTP -> Funboost -> Redis -> Worker -> (Result) -> HTTP -> 前端`。 ### 🧬 **全自动参数校验** 基于 redis元数据中的 auto_generate_info.final_func_input_params_info ,发布接口会自动校验 JSON Body 是否符合消费函数的参数定义。如果不匹配,**毫秒级拒绝**,根本不会产生脏数据进入队列。 ### ⏱️ **可编程调度中心** 不仅仅是触发任务。通过 HTTP 接口,你可以动态添加 **Crontab 定时任务**、**延时任务**。 * **价值**:你不需要开发后台管理界面,直接调用 API 即可管理成千上万个定时作业。 ### 📊 **全景透视能力** 提供 `/get_queue_run_info` 等接口,实时吐出 **QPS、积压量、活跃消费者 IP、错误堆栈**。对接 Grafana 或 Prometheus 易如反掌。 --- ## 15.3 🌌 深度发散:从“任务队列”到“无服务器计算平台” `funboost.faas` 不仅仅是一个路由生成器,它是企业级架构演进的加速器。以下是它带来的**革命性价值**: | 核心维度 | 应用场景 | 带来的革命性价值 (The Revolution) | | :--- | :--- | :--- | | **🦄 动态架构**
*(灵活性)* | **热插拔式微服务** | **Web 服务永不重启**。这是对传统开发模式的颠覆。新增或修改消费函数后,Web 网关层无需任何变更,基于 Redis 元数据**自动发现**新逻辑,实现真正的“零停机”发布。 | | **🌉 异构融合**
*(连接性)* | **通用语言网关** | **打破语言壁垒**。让 Java、Go、Node.js 甚至 Shell 脚本,通过标准的 HTTP JSON 协议,直接调用 Python 的丰富生态(如 AI 模型、Pandas 处理),无需编写任何胶水代码。 | | **⚡ 交互模式**
*(即时性)* | **HTTP-RPC 桥接** | **异步变同步**。将后台的异步任务队列,瞬间转化为前端可等待的同步 API。前端请求 `/publish?need_result=true`,Funboost 自动挂起 HTTP 请求直到 Worker 处理完成,省去了轮询数据库的开发成本。 | | **🏭 研发效能**
*(生产力)* | **零代码后端接口** | **消灭 Controller 层**。开发者只需关注业务函数本身 (`@boost`)。所有的参数校验、路由绑定、异常处理、文档生成,全部由框架自动完成。写完函数,接口即上线。 | | **🕹️ 运维管控**
*(可控性)* | **可编程调度中心** | **API 化运维**。不再依赖静态配置文件或复杂的 Supervisor 命令。通过 HTTP 接口动态添加定时任务、暂停异常队列、清空脏数据。让运维平台或 ChatOps 机器人直接接管任务调度。 | | **🔭 可观测性**
*(透明度)* | **全景数据透视** | **透明化黑盒**。实时暴露每一个队列的积压量、QPS、活跃消费者 IP 和历史成功率。为 Prometheus、Grafana 或自建监控大屏提供标准化的数据源,让后台任务不再是监控盲区。 | | **🧪 质量保障**
*(稳定性)* | **生产环境热调试** | **所见即所得**。QA 或开发人员可以直接通过 Postman 构造特定参数触发生产环境的某个函数,并直接在 Response 中看到函数返回值或报错堆栈,极大地缩短了 Bug 复现和修复的路径。 | --- ## 15.4 🛠️ 核心原理:为什么它能"以一当百"? ### ❌ 痛点:传统的"硬编码"地狱 在没有 Funboost.faas 之前,为了让 Web 端发布消息到不同的队列,开发者被迫写出如下臃肿代码,且每加一个任务都要重启 Web: ```python # 🚫 反面教材:硬编码路由 + 必须重启 @app.post("/api/do_task") def do_task(task_type: str, data: dict): if task_type == 'sms': sms_fun.push(data) elif task_type == 'email': email_fun.push(data) # ... 无限的 elif ... ``` ### ✅ Funboost 解法:基于元数据的动态代理 Funboost.faas 的灵魂在于 **`SingleQueueConusmerParamsGetter`**。它不依赖本地代码 `import`,而是直接从 Redis 中读取队列的元数据配置。 * **步骤 1**: 消费者启动时,将自己的函数签名、参数类型、队列配置写入 Redis。 * **步骤 2**: Web 端收到请求 `/funboost/publish` `{"queue_name":"sms",...}` * **步骤 3**: Web 端从 Redis 读取 `sms` 队列的配置元数据 * **步骤 4**: **动态生成** 一个 Publisher 对象,完成消息发布 **结果**:Web 服务与 Worker 逻辑 **彻底解耦**。 --- ## 15.5 📚 API 接口清单 (Swagger 开箱即用) 只要启动了服务,访问 `/docs` 即可看到以下全套接口: ### 15.5.1. 📨 消息发布与结果查询 > **核心能力**:触发任务、获取结果、查看积压。 | 方法 | 路径 | 功能深度解析 | 核心参数 | | :--- | :--- | :--- | :--- | | **POST** | `/funboost/publish` | **万能发布接口**。支持函数入参校验,可开启 RPC 模式同步等待结果。 | `queue_name`, `msg_body`, `need_result` | | **GET** | `/funboost/get_result` | **RPC 结果轮询**。根据 `task_id` 主动获取异步任务的执行结果。 | `task_id`, `timeout` | | **GET** | `/funboost/get_msg_count` | **积压监控**。实时获取指定队列当前堆积的消息数量。 | `queue_name` | ### 15.5.2. 🔍 服务发现与元数据 (FaaS 核心) > **核心能力**:自动发现队列、获取参数定义、生成前端表单。 | 方法 | 路径 | 功能深度解析 | 核心参数 | | :--- | :--- | :--- | :--- | | **GET** | `/funboost/get_all_queues` | **服务目录**。获取 `care_project_name` 项目注册的所有队列名称列表。 | 无 | | **GET** | `/funboost/get_queues_config` | **接口定义描述 (Schema)**。获取所有队列的详细配置。**重点:** 包含 `final_func_input_params_info`,前端可据此自动生成表单,跨部门调用方可据此知道需要传什么参数。 | 无 | | **GET** | `/funboost/get_one_queue_config` | **单队列配置**。获取指定队列的详细配置信息,包括函数入参定义。 | `queue_name` | ### 15.5.3. ⚙️ 队列运维与全景监控 > **核心能力**:熔断、恢复、清洗数据、上帝视角监控。 | 方法 | 路径 | 功能深度解析 | 核心参数 | | :--- | :--- | :--- | :--- | | **GET** | `/funboost/get_queue_run_info` | **单体透视**。获取指定队列的消费速率、积压量、活跃消费者 IP、历史成功率等详情。 | `queue_name` | | **GET** | `/funboost/get_all_queue_run_info` | **上帝视角**。一次性拉取**所有**队列的运行状态和统计信息(适合监控大屏)。 | 无 | | **POST** | `/funboost/pause_consume` | **紧急熔断**。暂停某个队列的消费(例如下游数据库压力过大时)。 | `queue_name` | | **POST** | `/funboost/resume_consume` | **服务恢复**。恢复某个队列的消费。 | `queue_name` | | **POST** | `/funboost/clear_queue` | **清空队列**。**⚠️ 慎用!** 清空队列中所有堆积的待消费消息。 | `queue_name` | | **DELETE** | `/funboost/deprecate_queue` | **废弃队列**。从 Redis 中移除不再使用的队列名(队列改名或下线时使用)。 | `queue_name` | ### 15.5.4. ⏰ 动态定时任务编排 > **核心能力**:动态增删改查 Crontab 或 间隔定时任务。 | 方法 | 路径 | 功能深度解析 | 核心参数 | | :--- | :--- | :--- | :--- | | **POST** | `/funboost/add_timing_job` | **创建任务**。支持 `date`(一次性), `interval`(间隔), `cron`(周期) 三种触发器。 | `queue_name`, `trigger`, `job_id`, `kwargs` | | **GET** | `/funboost/get_timing_jobs` | **任务列表**。获取指定队列或所有队列下的所有定时计划。 | `queue_name`(可选), `job_store_kind` | | **GET** | `/funboost/get_timing_job` | **任务详情**。获取指定 `job_id` 的详细调度配置。 | `job_id`, `queue_name` | | **POST** | `/funboost/pause_timing_job` | **暂停定时计划**。暂停某个定时任务,但保留配置。 | `job_id`, `queue_name` | | **POST** | `/funboost/resume_timing_job` | **恢复定时计划**。恢复运行某个被暂停的定时任务。 | `job_id`, `queue_name` | | **DELETE** | `/funboost/delete_timing_job` | **删除定时计划**。永久移除某个定时任务。 | `job_id`, `queue_name` | | **DELETE** | `/funboost/delete_all_timing_jobs` | **批量清理**。一键清空指定队列或所有队列下的所有定时任务。 | `queue_name`(可选) | ### 15.5.5. 🎛️ Scheduler 调度器控制 > **核心能力**:控制定时任务调度器的运行状态(暂停/恢复整个调度器)。 | 方法 | 路径 | 功能深度解析 | 核心参数 | | :--- | :--- | :--- | :--- | | **GET** | `/funboost/get_scheduler_status` | **状态查询**。获取调度器状态:`stopped`(已停止)、`running`(运行中)、`paused`(已暂停)。 | `queue_name`, `job_store_kind` | | **POST** | `/funboost/pause_scheduler` | **暂停调度器**。暂停整个调度器,所有定时任务停止触发。 | `queue_name`, `job_store_kind` | | **POST** | `/funboost/resume_scheduler` | **恢复调度器**。恢复调度器运行,继续执行定时任务。 | `queue_name`, `job_store_kind` | ### 15.5.6. 🏷️ 项目筛选 (care_project_name) > **核心能力**:多项目场景下,筛选只显示特定项目的队列信息。 | 方法 | 路径 | 功能深度解析 | 核心参数 | | :--- | :--- | :--- | :--- | | **GET** | `/funboost/get_care_project_name` | **获取筛选设置**。获取当前 `care_project_name` 的值,`None` 表示不限制。 | 无 | | **POST** | `/funboost/set_care_project_name` | **设置筛选项目**。设置后,所有查询接口只返回该项目的队列信息。空字符串表示不限制。 | `care_project_name` | | **GET** | `/funboost/get_all_project_names` | **项目列表**。获取 Redis 中存储的所有项目名称列表,用于前端下拉筛选。 | 无 | ### 15.5.10. 📄 Swagger接口文档 **fastapi.faas 的Swagger接口文档截图,你打开自己的fastapi web服务的 /docs 这个url就能看到了。** ![img_87.png](img_87.png) --- ## 15.6 💻 快速接入指南 ### 15.6.1 FastAPI 接入 (推荐 🌟) ```python import uvicorn from fastapi import FastAPI # 1. 导入路由 from funboost.faas import fastapi_router, CareProjectNameEnv # (可选) 设置项目名,只管理本项目相关的队列,避免干扰 CareProjectNameEnv.set('my_awesome_project') app = FastAPI() # 2. 核心一行代码:注册路由 app.include_router(fastapi_router) if __name__ == '__main__': # 启动后访问 http://127.0.0.1:8000/docs 即可看到几十个 Funboost 管理接口 uvicorn.run(app, host="0.0.0.0", port=8000) ``` ### 15.6.2 Flask 接入 ```python from flask import Flask # 1. 导入 Blueprint from funboost.faas import flask_blueprint app = Flask(__name__) # 2. 核心一行代码:注册蓝图 app.register_blueprint(flask_blueprint) if __name__ == '__main__': app.run(port=5000) ``` ### 15.6.3 Django 接入 (使用 Django-Ninja) ```python from ninja import NinjaAPI # 1. 导入 Router from funboost.faas import django_router api = NinjaAPI() # 2. 核心一行代码:添加路由 api.add_router("/funboost", django_router) # urls.py urlpatterns = [ path("api/", api.urls), ] ``` ### 15.6.4 权限控制 (Security) 担心接口裸奔?像控制普通接口一样控制它! ```python # FastAPI 示例:添加 Token 验证 from fastapi import Depends, HTTPException async def verify_token(token: str = Header(...)): if token != "secret-password": raise HTTPException(status_code=400, detail="Invalid Token") # 给所有 Funboost 接口加上权限锁 app.include_router(fastapi_router, dependencies=[Depends(verify_token)]) ``` --- ## 15.7 🎯 调用示例 ### 场景:通过 Curl 发布一个 RPC 任务并等待结果 假设你有一个计算函数 `@boost('calc_queue') def add(x, y): return x + y`。 **请求:** ```bash curl -X POST "http://127.0.0.1:8000/funboost/publish" \ -H "Content-Type: application/json" \ -d '{ "queue_name": "test_funboost_faas_queue", "msg_body": {"x": 33, "y": 44}, "need_result": true, "timeout": 10 }' ``` **响应 (Funboost 自动计算并返回):** ```json { "succ": true, "msg": "获取成功", "data": { "task_id": "test_funboost_faas_queue_result:3aaf05e6-6673-4e52-a148-bb3eca4cf1dc", "status_and_result": { "host_process": "LAPTOP-7V78BBO2 - 59156", "queue_name": "test_funboost_faas_queue", "function": "add", "msg_dict": { "x": 33, "y": 44, "extra": { "task_id": "test_funboost_faas_queue_result:3aaf05e6-6673-4e52-a148-bb3eca4cf1dc", "publish_time": 1765364867.2218, "publish_time_format": "2025-12-10 19:07:47" } }, "task_id": "test_funboost_faas_queue_result:3aaf05e6-6673-4e52-a148-bb3eca4cf1dc", "process_id": 59156, "thread_id": 48632, "total_thread": 12, "publish_time": 1765364867.2218, "publish_time_format": "2025-12-10 19:07:47", "time_start": 1765364867.275423, "time_cost": 1.006, "time_end": 1765364868.281448, "insert_time_str": "2025-12-10 19:07:48", "insert_minutes": "2025-12-10 19:07", "params": { "x": 33, "y": 44 }, "params_str": "{\"x\":33,\"y\":44}", "result": 77, "run_times": 1, "success": true, "run_status": "finish", "exception": null, "exception_type": null, "exception_msg": null, "rpc_chain_error_msg_dict": null, "rpc_result_expire_seconds": 1800, "host_name": "LAPTOP-7V78BBO2", "script_name": "start_consume.py", "script_name_long": "d:/codes/funboost/examples/example_faas/start_consume.py" } }, "error_data": null, "code": 200, "error": null, "traceback": null, "trace_id": null } ``` **简单、暴力、有效。这就是 `funboost.faas`。** ## ⚠️💡 15.8 注意事项 - 1. 如果要用faas功能,@boost一定需要设置 `is_send_consumer_heartbeat_to_redis=True`,否则无法获从redis获取相关queue_name的配置元数据和运行信息 - 2. 别忘了,**需要启动funboost消费**,否则只发布消息,没有后台消费执行消息。启动消费可以和web一起启动,也可以单独的脚本部署启动消费。(因为funboost.faas 是基于funboost的redis 注册的元数据驱动,不需要import依赖具体的函数。) ## 🧪 15.9 funboost.faas 演示例子 🌐 [example_faas例子](https://github.com/ydf0509/funboost/tree/master/test_frame/examples/example_faas) - 启动funboost faas web: ```bash python example_fastapi_faas.py ``` - 启动消费: ```bash python start_consume.py ``` - 访问web: ```bash http://127.0.0.1:8000/docs ``` - 测试请求 funboost faas 接口: ```bash python example_req_fastapi.py ``` ### 15.9.2 如何测试测试函数自动发现? - 1. 你可以再写个脚本新增一个@boost函数,或者修改原来脚本新增加一个@boost函数 - 2. 你部署启动新增加的booster消费函数 - 3. web无需重启,你直接就能通过http接口 `/funboost/publish` 立即调用新增的函数了。 - 4. 如果你乱造请求,例如乱造 不存在的`queue_name` 或者 乱写`msg_body` 的函数入参 ,接口会迅速反馈请求参数不合法,而不是让你请求有错还不知情。 - 5. Web 服务和消费函数可分开写在不同的git项目仓库中。 ## ⏰ 15.10 funboost.faas 定时任务管理 —— 架构级解耦的调度中心(完爆传统 APScheduler 管理) - **凡是 Web 管理端需要 import 业务函数的调度系统,都不可能成为真正的调度平台** - **Funboost.FaaS** 内置了一个强大的 **动态定时任务管理中心**。它不仅仅是一个简单的 API 包装,而是对传统定时任务管理模式的一次 **降维打击**。 - **你只需要一个web界面,就可以管理任意其他几百个funboost项目的定时任务的增删改查**。本质原因是 funboost.faas管理定时任务 ,不需要依赖import 具体的定时任务函数; 所以吊打传统的 flask-apscheudler 这种插件, flask-apscheudler还需要用户手写定时任务接口和管理界面。 ### 15.10.1 🥊 核心差异:为什么完爆传统 APScheduler 管理? 在传统的 Python 定时任务开发中(例如使用 `Flask-APScheduler` 或 `Django-APScheduler`),存在一个致命的 **强耦合** 问题: > **传统痛点:Web 管理端必须引用具体的函数代码** 如果你想在 Web 界面上管理任务,你的 Web 项目代码中必须能 `import` 到那个任务函数。因为传统 APScheduler 底层依赖 `pickle` 序列化函数对象,或者需要通过 `module.path:function_name` 字符串在本地寻找函数。 **这意味着:** 1. **无法物理分离**:你的 Web 管理后台必须和业务消费代码在同一个 Git 项目中,或者必须安装所有的业务依赖包。 2. **牵一发而动全身**:如果你修改了消费函数的代码,Web 管理端可能也需要重新部署,甚至因为 pickle 版本不一致导致报错。 3. **不支持跨语言/跨项目**:你无法用一个统一的 Web 管理台去管理 10 个不同 Git 仓库的定时任务。 --- ### 15.10.2 🚀 Funboost 的解决方案:彻底解耦 **Funboost** 采用了完全不同的哲学:**基于“队列名”和“JSON数据”的调度,而非基于“函数代码”的调度。** 通过 Funboost.FaaS 接口添加定时任务时,**Web 管理端不需要知道那个函数长什么样,甚至不需要知道那个函数存在不存在。** * **Web 端(管理面)**: * 只负责往 Redis 的 JobStore 里写入一条记录:“*每隔 5 秒,往 `queue_A` 队列里推一条 JSON 消息 `{"x": 1}`*”。 * **它完全不需要 import 任何业务代码**。 * 它可以是一个独立部署的通用服务,甚至可以是 Go 或 Java 写的前端页面。 * funboost添加定时计划,会立刻校验入参是否合法,如果不合法,会立刻返回错误,不继续添加错误的定时任务。 * **Worker 端(执行面)**: * 启动时会自动扫描 Redis。 * 发现触发时间到了,就执行“推消息”动作。 * 消费者收到消息,执行真正的 Python 函数。 **🏆 结果:Web 管理界面 和 定时任务执行函数,可以处于两个完全隔离的 Git 项目,部署在完全不同的服务器上,互不依赖。** --- ### 15.10.3 🛠️ 接口能力概览 所有接口支持 `Redis` 持久化存储(推荐)。这意味着即使 Web 服务挂了,或者消费者重启了,定时任务依然存在于 Redis 中不丢失。 | 功能 | HTTP 方法 | 接口路径 | 描述 | | :--- | :--- | :--- | :--- | | **添加任务** | `POST` | `/add_timing_job` | **核心**:基于队列名添加。支持 Interval/Cron/Date 触发器 | | **查询任务** | `GET` | `/get_timing_jobs` | 查看所有任务,Web端不需要业务代码也能展示 | | **任务详情** | `GET` | `/get_timing_job` | 获取单个任务的详细参数 | | **暂停任务** | `POST` | `/pause_timing_job` | 暂停调度(不发消息了) | | **恢复任务** | `POST` | `/resume_timing_job` | 恢复调度 | | **删除任务** | `DELETE` | `/delete_timing_job` | 永久删除 | | **调度器控制** | `POST` | `/pause_scheduler` | 暂停整个调度器 | ### 15.10.4 💻 调用示例:完全黑盒化的管理 假设在 **项目 A (Git Repo A)** 中,有一台机器运行着消费函数: ```python # Project A: consumer.py @boost(BoosterParams(queue_name='send_email_queue', broker_kind=BrokerEnum.REDIS,is_send_heartbeat_to_redis=True)) def send_email(email_address, content): print(f"Sending email to {email_address}: {content}") if __name__ == '__main__': # 启动消费,同时启动内置的 Scheduler 监听 Redis ApsJobAdder(send_email, job_store_kind='redis', is_auto_start=True) send_email.consume() ctrl_c_recv() ``` 现在,我们在 **项目 B (Git Repo B)** 或者 **Postman** 中,完全不知道 `consumer.py` 代码的情况下,远程控制它: #### 1. 动态添加任务 (无需代码引用) 我们只需要知道队列名 `send_email_queue`,即可添加任务。 ```python import requests # 这是一个通用的 FaaS 管理端地址 url = "http://faas-admin-server:8000/funboost/add_timing_job" payload = { "queue_name": "send_email_queue", # 关键:只通过字符串解耦 "job_id": "daily_report_job", "trigger": "cron", # 每天上午 9:00 "hour": "9", "minute": "0", "kwargs": { # 纯 JSON 数据,不是 Python 对象 "email_address": "boss@example.com", "content": "Daily Report" }, "job_store_kind": "redis", "replace_existing": True } resp = requests.post(url, json=payload) print("任务添加成功,Project A 的 Worker 将会自动感知并执行") ``` #### 2. 管理任务 ```python # 暂停任务 requests.post("http://faas-admin-server:8000/funboost/pause_timing_job", params={"job_id": "daily_report_job", "queue_name": "send_email_queue"}) # 删除任务 requests.delete("http://faas-admin-server:8000/funboost/delete_timing_job", params={"job_id": "daily_report_job", "queue_name": "send_email_queue"}) ``` ### 15.10.5 ⚙️ 技术原理:为什么能做到? 1. **中间件化**: Funboost 利用 Redis 作为中间媒介(JobStore)。Web 端只负责往 Redis 写配置数据(JSON),不涉及任何 Python 对象的序列化/反序列化(Pickle)。 2. **执行逻辑的转换**: 传统 APScheduler:`Trigger -> Execute Python Function(args)` Funboost APScheduler:`Trigger ->Execute push_fun_params_to_broker(Push msg to Queue) -> Consumer executes Function` 因为 Web 端不需要执行函数,只需要“推消息”,所以 Web 端不需要函数代码。 3. **分布式锁(防止重复)**: 在 Project A 的 Worker 中,Funboost 增强了 Redis JobStore,在取出任务时加了分布式锁。即使 Project A 部署了 10 个节点,Web 端添加的一个定时任务,同一时间也只会被其中一个节点触发推送,**完美支持分布式部署**。 ### 15.10.6 总结 * **传统 APScheduler**:Web 管理后台和业务代码是 **强耦合** 的,必须在一个项目中,难以维护大型微服务架构。 * **Funboost FaaS**:Web 管理后台和业务代码是 **零耦合** 的。你可以搭建一个统一的“Funboost 管理中心”,管理全公司 100 个不同项目的定时任务,而无需拉取这 100 个项目的代码。这是架构设计上的巨大优势。 ## 15.11 funboost.faas 为什么比传统Web框架写接口更爽? - funboost.faas (**降维打击**):它彻底改变了后端开发的范式。 - 它消灭了 `Controller` 层、`Router` 层、`Serializer` 层。 - 它消灭了 “**重启服务**”这个动作。 - 这是对传统 MVC 开发模式`(Django/Flask)`的一次降维打击。一旦用习惯了 FaaS 这种**写完函数即接口**的爽快模式,就很难回得去写那些繁琐的样板代码了。 ### 15.11.1 传统 Django/Flask 的尴尬:"脱了裤子放屁" Django 的视图函数一般不直接写复杂逻辑,因为**视图函数不能作为普通函数被复用调用**。所以你被迫要: ```python # 视图函数 - 只是个"搬运工",不能直接复用 @api_view(['POST']) def calculate_score_view(request): user_id = request.data['user_id'] weights = request.data['weights'] result = calculate_score(user_id, weights) # 被迫多一层调用 return Response({'result': result}) # 真正的业务逻辑 - 另外封装 def calculate_score(user_id, weights): # 复杂逻辑... return score ``` **问题**: - 视图只是个"接收参数 → 调用函数 → 返回结果"的搬运工 - 每个功能都要写两遍:一份业务函数 + 一份视图适配器 - 还要配路由、写序列化器、写参数校验... --- ### 15.11.2 funboost.faas 的设计哲学:函数即接口 ```python # 这就是业务函数,同时也是 HTTP 接口,也能被其他代码直接调用 @boost(BoosterParams(queue_name="calculate_score", is_send_heartbeat_to_redis=True)) def calculate_score(user_id: int, weights: dict): # 复杂逻辑... return score # 直接当普通函数调用 result = calculate_score(123, {"a": 0.5}) # 通过队列异步调用 calculate_score.push(123, {"a": 0.5}) # 通过 HTTP 接口调用 # POST /funboost/publish {"queue_name": "calculate_score", "msg_body": {...}} ``` **一个函数,三种调用方式**,没有"脱了裤子放屁"的中间层! --- ### 15.11.3 代码量对比 | 功能点 | Django 需要写 | funboost 需要写 | |-------|-------------|----------------| | 业务函数 | ✅ 1份 | ✅ 1份 | | 视图/路由 | ❌ 额外1份 | 0(自动) | | 序列化器 | ❌ 额外1份 | 0(自动) | | 参数校验 | ❌ 额外写 | 0(根据函数签名自动) | | 接口文档 | ❌ 额外写 | 0(自动生成) | --- ### 15.11.4 上新功能流程对比 | 对比维度 | 传统 Django/Flask | funboost.faas | |---------|------------------|---------------| | **上新功能流程** | 写视图函数 → 配路由 → 写序列化 → 写参数校验 → 重启服务 | 写 `@boost` 函数 → 部署上线 → **自动可调用** | | **接口文档** | 需要手写或用 Swagger 注解 | 自动从函数签名生成 | | **参数校验** | 手动写校验逻辑或用 Pydantic | 自动根据 `final_func_input_params_info` 校验 | | **Web服务重启** | **每次都要重启** | **永不重启**(热加载) | | **跨项目复用** | 需要打包成库或微服务 | 只要共享 Redis,任意项目都能调用 | --- ### 15.11.5 最爽的几个点 #### 15.11.5.1 真正的"写完即上线" ```python # 只写这个,部署上线后,HTTP接口马上就能调用 @boost(BoosterParams(queue_name="new_feature", is_send_heartbeat_to_redis=True # faas功能,一定要设置 is_send_heartbeat_to_redis=True,目的是将booster元信息注册到redis中,否则根本发布不了消息 )) def calculate_score(user_id: int, weights: dict): return score ``` #### 15.11.5.2 Web 网关 = 万能入口 **一个 `app.include_router(fastapi_router)` 搞定所有接口**,不用再纠结: - 这个接口用 GET 还是 POST? - URL 路径怎么设计? - 参数放 query 还是 body? #### 15.11.5.3 天然支持异步和 RPC 传统视图函数要实现"提交任务 → 轮询结果"需要额外设计,funboost 直接内置: ```python # need_result=True 一行搞定 RPC {"queue_name": "xxx", "msg_body": {...}, "need_result": true} ``` #### 15.11.5.4 跨团队协作超方便 其他团队只需要知道 `queue_name` (入参格式能通过faas的接口传递queue_name获取),就能直接调用你的功能,不用关心: - 你用什么语言实现的 - 你的服务部署在哪里 - 你的服务有没有挂掉(消息队列会等你恢复) --- ### 15.11.6 什么场景传统方式更合适? | 场景 | 推荐方式 | 核心理由 | |-----|---------|---------| | 需要精细控制 HTTP 状态码/headers | 传统视图函数 | 框架原生能力,控制力更强 | | 需要实时流式响应(SSE/WebSocket) | 传统视图函数 | 需要长连接或特定协议支持 | | 需要复杂的中间件链条 | 传统视图函数 | 依赖特定 Web 框架生态 | | CPU 密集型异步任务 | funboost.faas ✅ | 不阻塞 Web 服务主线程 | | IO 密集型异步任务 | funboost.faas ✅ | 原生支持异步高并发 | | 跨服务编排调用 | funboost.faas ✅ | 队列解耦,天然分布式 | | 快速迭代上新功能 | funboost.faas ✅ | 写完即发,无需重启 | --- ### 15.11.7 本质区别 > **Django/Flask 以"请求-响应"为中心,funboost 以"函数"为中心。** > > 函数天然可复用,所以不需要适配层! 这就是"函数即服务"(FaaS) 的魅力——**专注业务逻辑本身,基础设施全自动化**。