7.更新记录

7.0 很小的更新,对api使用完全无变化或者无增加新功能的不写更新记录。

7.1 新增第十种Consumer,以redis为中间件,但增加了消费确认,是RedisConsumerAckAble类。

支持运行过程中,随意关闭和启动python程序。无惧反复关闭python和 突然断电导致任务丢失几百个。

之前开100线程/协程的话,随意重启python和断电会导致极大概率丢失200个任务。

官方Threadpoolexecutor是无界队列。使用这个会导致丢失无数个任务,
因为他会迅速把redis的消息全部取出来,添加到自己的queue队列慢慢消费。
因为这个原因所以需要自定义写BoundedThreadpoolexecutor和CustomThreadpoolexecutor。       

改版的CustomThreadpoolexecutor修改成了queue最大长度是max_works,自己内部存储100个,
运行中100个,突然关闭python会丢失200个任务。如果queue设置大小为0,则只会丢失100个运行中的任务。

采用的是消费者去除消息时候,用lua脚本同时pop和添加到unacked的独立zset中,函数运行成功后会从set中删除该任务。
同时有一个一直每隔5秒发送心跳到redis服务中的线程,心跳标识中有消费者的唯一标识,绝对不会重复。
如果突然关闭消费者(例如突然断电或者点击关闭python),那么该消费者的心跳将会停止了。这时其他机器的同队列消费者或者当前机器重新启动代码后,在15秒后会
检到被关闭的消费者是非活跃消费者,那么自动将该消费者的unack里面任务全部重新取出返回到待消费队列中。

RedisConsumerAckAble类比RedisConsumer会有一丝丝性能损耗,但python玩redis大部分情况还是python代码本身有性能瓶颈,
而不是造成redis服务端有性能瓶颈,一般只要用在有意义的业务上,就算python很忙把cpu占光了,也不会造成redis服务端达到极限,
python是性能很差的语言,没玩垮redis,自身就把电脑玩死了,所以大部分情况下不要在意加入确认消费后产生额外的对redis服务端的性能压力。

redis要是能直接作为mq使用,redis早就一统天下了,哪里还不断有几十种mq出来。
所以直接基于redis list的如果要做到可靠就必须改进。

7.2 新增基于以redis为消息中间件时候的页面管理和消费速度显示。

基于redisboard,但对redis的list模拟mq功能,进行页面显示优化突出消息队列消费,
加黄显示正在运行中的队列和每10秒的消费速度。每隔10秒自动刷新统计。

由于实时发布和消费,例如10秒内发布20个,消费50个,页面只能显示大小降低了30个,
这个只有专业的mq才能分别显示出来,redis list只是简单数组。

rabbitmq nsq都有官方自带速率显示。

img_75.png

7.3 新增一个10行代码的函数的最精简乞丐版实现的分布式函数执行框架.

新增一个10行代码的函数的最精简乞丐版实现的分布式函数执行框架,演示最本质实现原理,不要亲自这么使用。

beggar_redis_consumer.py文件的 start_consuming_message函数。

def start_consuming_message(queue_name, consume_function, threads_num):
    pool = ThreadPoolExecutor(threads_num)
    while True:
        try:
            redis_task = redis_db_frame.brpop(queue_name, timeout=60)
            if redis_task:
                task_str = redis_task[1].decode()
                print(f'从redis的 {queue_name} 队列中 取出的消息是: {task_str}')
                pool.submit(consume_function, **json.loads(task_str))
            else:
                print(f'redis的 {queue_name} 队列中没有任务')
        except redis.RedisError as e:
            print(e)


def add(x, y):
    time.sleep(5)
    print(f'{x} + {y} 的结果是 {x + y}')


# 推送任务
for i in range(100):
    redis_db_frame.lpush('test_beggar_redis_consumer_queue', json.dumps(dict(x=i, y=i * 2)))

# 消费任务 
start_consuming_message('test_beggar_redis_consumer_queue', consume_function=add, threads_num=10)

看完整版代码很长很多,是由于控制功能太多,中间件类型多,并发模式多, 所以加入一个最精简版,精简版的本质实现原理和完整版相同。

7.4 新增sqlachemy 支持的数据库作为消息中间件

新增sqlachemy 支持的数据库作为消息中间件,包括sqlserver mysql postgre oracle sqlite

每个队列是一张表模拟的。

img_76.png

每个任务是表里面的一行记录。

img_77.png

7.5 日志改为导入独立包nb_log,支持用户配置文件自定义日志配置。

例如设置默认需不需要彩色,需不需要大背景彩色色块,需不需要自动拦截转化python内置的print. 在用户当前项目根目录下生成的nb_log_config.py 可以自定义优先日志配置。

7.6 优化qps控频。

将qps按范围分段,采用不同的等待或计数方式。使当qps设置很高的时候,控频更精确。

增加了分布式控频,需要依赖redis中间件。
分布式环境中的控频指的是,假如xx.py文件中有一个consumer,设置func函数的qps为10。
如果在线上部署了三个容器服务,如果不使用分布式控频,则func函数的每秒运行总次数会是30。
即使只有1台机器,如果开多进程,Process运行3个进程,或者把xx.py反复运行启动3个,
也会造成func函数每秒运行总次数是30。
分布式控频主要是解决这种问题。默认不使用分布式控频,
当设置 is_using_distributed_frequency_control为True的时候,使用分布式控频。

7.7 增加rocketmq支持。 (2020-7)

from funboost import boost, BrokerEnum


@boost('queue_test_f03', qps=2, broker_kind=BrokerEnum.ROCKETMQ)
def f(a, b):
    print(f'{a} + {b} = {a + b}')


if __name__ == '__main__':
    for i in range(100):
        f.push(i, i * 2)
    f.consume()

7.8 新增 async 并发模式 (2020-12)

框架希望用户写同步函数,不鼓励用户新写async def的函数,如果你的代码函数已经写成了async def,可以用此种并发方式。

写async def的函数很烦人,asyncio里面的概念很难学。

这两个项目用的asyncio你能写出来,能看懂说明不?

https://github.com/ydf0509/async_pool_executor

https://github.com/ydf0509/sync2asyncio

之前一直都没支持这种并发模式,异步代码不仅消费函数本身与同步代码很多不同,例如函数的定义和调用以及三方库,
不同于gevent和eventlet打个猴子补丁就可以变并发方式并且代码保持100%原样,asyncio的方式代比同步码真的是要大改特改。
而且在框架层面要支持异步也要增加和修改很多,支持异步并不是很容易。这一点连celery5.0目前都还没支持到(据官方文档说5.0要加入支持,但目前的5.0.3还没加入。)

如果消费函数已经写成了async def这种,那么可以设置 concurrent_mode=ConcurrentModeEnum.ASYNC,
框架会在一个新的线程的loop里面自动运行协程,所有协程任务会自动在一个loop里面运行,不是每次临时都生成新的loop只运行一个当前任务方式。

from funboost import boost, BrokerEnum, ConcurrentModeEnum
import asyncio


# 此段代码使用的是语言级Queue队列,不需要安装中间件,可以直接复制运行测试。
@boost('test_async_queue2', concurrent_mode=ConcurrentModeEnum.ASYNC,
           broker_kind=BrokerEnum.LOCAL_PYTHON_QUEUE, concurrent_num=500, qps=20)
async def async_f(x):
    # 测试异步阻塞并发, 此处不能写成time.sleep(1),否则无论设置多高的并发,1秒钟最多只能运行1次函数。
    # 同理asyncio 不能和 requests搭配,要和 aiohttp 搭配。
    await asyncio.sleep(1)
    print(id(asyncio.get_event_loop()))
    # 通过 id 可以看到每个并发函数使用的都是同一个loop,而不是采用了愚蠢的临时 asyncio.new_event_loop().run_until_complete(async_f(x)) 方式调度。
    print(x)


if __name__ == '__main__':
    async_f.clear()
    for i in range(100):
        async_f.push(i, )
    async_f.consume()

7.8.2 gevent/eventlet 和 asyncio 用法区别感受

比方说汽车的自动挡和手动挡,学了手动挡一定会开自动挡,只学自动挡很难开手动挡。
asyncio方式的代码比正常普通同步思维的代码写法也要难得多了,能玩asyncio的人一定会用threading gevent,
但只用过threading gevent,不去专门学习asyncio的用法,100%是玩不转的。

gevent就像自动挡汽车,自动换挡相当于自动切换阻塞。
asyncio就像手动挡,全要靠自己写 await / async def /loop / run_until_complete /run_forever/ 
run_coroutine_threadsafe /wait / wait_for /get_event_loop / new_event_loop / get_running_loop
,写法很麻烦很难。异步多了一个loop就像手动挡汽车多了一个离合器一样,十分之难懂。

手动挡玩的溜性能比自动挡高也更省油。asyncio玩的溜那么他的io并发执行速度和效率也会更好,cpu消耗更少。
如果你写一般的代码,那就用同步方式思维来写吧,让分布式函数调度框架来替你自动并发就可以啦。
如果追求更好的控制和性能,不在乎代码写法上的麻烦,并且asyncio技术掌握的很溜,那就用asyncio的方式吧。 

7.8.3 关于 async 并发模式,为什么框架还使用 pyredis pika pymongo,而没有使用aioredis aiomongo

异步鬓发模式里面,整个调用链路必须是一旦异步,必须处处异步,在base_consumer.py的AbstractConsumer中,
方法 _async_run_consuming_function_with_confirm_and_retry里面使用的还是操作中间件的同步库,

主要是因为框架目前支持15种中间件,一个一个的使用异步模式的库操作中间件来实现,比现在代码起码要增加80%,无异于重写一个项目了。
异步和同步真的写法语法相差很大的,不信可以比比aiomysql 和pymysql库,aiohttp和requests,如果非常简单能实现异步,
那aiohttp和aiomysql作者为什么要写几万行代码来重新实现,不在原来基础上改造个七八行来实现?



目前此库对 消息拉取和消息消费完全是属于在两个不同的线程里面,井水不犯河水,所以用同步库拉取消息对asyncio的消费函数没有任何影响,不存在同步库阻塞异步库的问题。
对于消息确认 消息重新入队 任务过滤  mongo插入,都是采用的同步库,但是使用了 run_in_executor,
把这些操作在异步链路中交给线程池来运行了,同事这个线程池不是官方内置线程池,是智能缩小扩大线程池 ThreadPoolExecutorShrinkAble。
run_in_executor 会把一个同步的操作,sumbit提交给线程池,线程池返回的是一个concurrent.futures包的Future对象,
run_in_executor包装转化了这个Future(此Future不是asyncio的,不是一个awaitable对象)成为了一个asyncio包的Future对象,asyncio的Future对象可以被await,
所以这是非常快捷的同步阻塞函数在异步链路中转同步转异步语法的最佳方式。官方也是这么推荐的。

除了框架内部的阻塞函数是run_in_executor快速转化成非阻塞事件循环的,但是主要的用户的消费函数,是使用的真async模式运行在一个loop循环中的,
也即是单线陈鬓发运行用户的异步函数。

其次框架的同步阻塞函数,都是操作中间件类型的库,异步就是 入队 确认消费 查询是否过滤,这些操作一般都会在1毫秒之内完成,不阻塞太长的事件,
即使不使用run_in_executor,直接在异步链路使用这些同步操作,也没太大问题。一旦异步必须处处异步,说的是不能调用耗时太长的同步阻塞函数,
1毫秒的无伤大雅,因为celery 1秒钟最多能调度300个 def f: print(hello) 这样的无cpu 无io的函数,此框架调度运行速度任然超过celery。

     

还有一种调度起 async def定义 的消费函数方式是继续开多线程并发,然后使用 临时loop = asyncio.new_event_loop(),loop.run_until_complete,这方式愚蠢了,
相当于只是为了运行起这个函数,但全流程丝毫没有丁点异步。

7.8.4 愚蠢的celery调用异步函数写法

下面截图这种写法为了异步而异步,非常废物滑稽的写法。
如果是celery多线程并发模式,那就是每个线程里面临时起一个loop,每个生成的loop只运行了一次协程成对象。这样完全没有利用到asyncio的优势
如果是celery多进程并发模式,那就是每个进程里面临时起一个loop,每个生成的loop只运行了一次协程成对象。这样完全没有利用到asyncio的优势

celery真正的最终目标是直接能把@task装饰器加到 一个asynnc def的函数上,而不是现在间接的再新增写一个同步函数来调用异步函数。
到目前为止的最新版 celery 5.2.3还没有实现 直接支持 asyncio 并发模式。
用户不要抱多大希望celery能很快支持asyncio,例如celery使用kafka做中间件,官方承诺了7年,一次次的放鸽子到现在还不支持,没那么容易。

img_22.png

7.9 2021-04 新增以 redis 的 stream 数据结构 为中间件的消息队列。

这个是 redis 的 真消息队列,这次是 真mq,
stream 数据结构功能更加丰富接近 rabbitmq kafka这种真mq的消息队列协议,比 list 做消息队列更强。
需要redis的服务端5.0版本以上才能使用这个数据结构。
代码文件在 funboost/consumers/redis_stream_consumer.py

这个 REDIS_STREAM 中间件和 REDIS_ACK_ABLE 都支持消费确认,不管客户端怎么掉线关闭,都可以确保消息万无一失。
BrokerEnum.REDIS 中间件 不支持消费确认,随意重启或者断电断线会丢失一批任务。
from funboost import boost, BrokerEnum


@boost('queue_test_f01', broker_kind=BrokerEnum.REDIS_STREAM, )
def f(a, b):
    print(f'{a} + {b} = {a + b}')


if __name__ == '__main__':
    for i in range(100):
        f.push(i, b=i * 2)
    f.consume()

7.10 2021-04 新增以 redis 的 list 为数据结构,但使用 brpoplpush 命令 双队列 作为中间件的消息队列。

此 brpoplpush 双队列方式 + 消费者唯一id标识的心跳检测,可以媲美 rabbitmq 的确认消费功能。

代码演示省略,设置broker_kind=BrokerEnum.RedisBrpopLpush就行了。 
@boost('queue_test_f01', broker_kind=BrokerEnum.RedisBrpopLpush,)

7.11 2021-04 新增以 zeromq 为中间件的消息队列。

zeromq 和rabbbitmq kafka redis都不同,这个不需要安装一个服务端软件,是纯代码的。
zeromq方式是启动一个端口,所以queue_name传一个大于20000小于65535的数字,不能传字母。

消费端代码,启动消费端时候会自动启动 broker 和 server。

import time
from funboost import boost, BrokerEnum


@boost('30778', broker_kind=BrokerEnum.ZEROMQ, qps=2)
def f(x):
    time.sleep(1)
    print(x)


if __name__ == '__main__':
    f.consume()

发布端代码

from test_frame.test_broker.test_consume import f

for i in range(100):
    f.push(i) 

7.12 2021-04 新增以 操作kombu包 为中间件的消息队列

一次性新增操作10种消息队列,.但比较知名的例如rabbitmq redis sqlite3 函数调度框架已经在之前实现了。
使用方式为设置 @boost 装饰器的 broker_kind 为 BrokerEnum.KOMBU
在你项目根目录下的 funboost_config.py  文件中设置 
KOMBU_URL = 'redis://127.0.0.1:6379/7' 那么就是使用komb 操作redis。
KOMBU_URL = 'amqp://username:password@127.0.0.1:5672/',那么就是操纵rabbitmq
KOMBU_URL = 'sqla+sqlite:////dssf_sqlite.sqlite',那么就是在你的代码所在磁盘的根目录创建一个sqlite文件。四个////表示根目,三个///表示当前目录。
其余支持的中间件种类大概有10种,不是很常用,可以百度 google查询kombu或者celery的 broker_url 配置方式。

操作 kombu 包,这个包也是celery的中间件依赖包,这个包可以操作10种中间件(例如rabbitmq redis),
但没包括分布式函数调度框架能支持的kafka nsq zeromq 等。


但是 kombu 包的性能非常差,如何测试对比性能呢?
可以用原生redis的lpush和kombu的publish测试发布
使用brpop 和 kombu 的 drain_events测试消费,对比差距相差了5到10倍。
由于性能差,除非是分布式函数调度框架没实现的中间件才选kombu方式(例如kombu支持亚马逊队列  qpid pyro 队列),
否则强烈建议使用此框架的操作中间件方式而不是使用kombu。

可以把@boost装饰器的broker_kind参数 设置为 BrokerEnum.REDIS_ACK_ABLE 和BrokerEnum.KOMBU(配置文件的KOMBU_URL配置为redis),
进行对比,REDIS_ACK_ABLE的消费速度远远超过 BrokerEnum.KOMBU,所以之前专门测试对比celery和此框架的性能,
差距很大,光一个 kombu 就拉了celery大腿很多,再加上celery的除了kombu的执行性能也很低,所以celery比此框架慢很多。
test_frame\test_celery 下面有celery的发布 消费例子,可以测试对比下速度,同样gevent 并发和redis中间件,
celery 执行 print hello 这样的最简单任务,单核单进程每秒执行次数过不了300,celery性能真的是太差了。

消费

import time
from funboost import boost, BrokerEnum


@boost('test_kombu2', broker_kind=BrokerEnum.KOMBU, qps=5, )
def f(x):
    time.sleep(60)
    print(x)


if __name__ == '__main__':
    f.consume()

发布

from test_frame.test_broker.test_consume import f

for i in range(10000):
    f.push(i)

你项目根目录下的 funboost_config.py

KOMBU_URL = 'redis://127.0.0.1:6379/7'
# KOMBU_URL = f'amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VIRTUAL_HOST}'
# KOMBU_URL = 'sqla+sqlite:////celery_sqlite3.sqlite'  # 4个//// 代表磁盘根目录下生成一个文件。推荐绝对路径。3个///是相对路径。

7.14 2021-04 新增以mqtt emq 作为消息中间件

例子,设置 broker_kind=BrokerEnum.MQTT

from funboost import boost, BrokerEnum


@boost('mqtt_topic_test', broker_kind=BrokerEnum.MQTT)
def f(x, y):
    print(f''' {x} + {y} = {x + y}''')
    return x + y


for i in range(100):
    f.push(i, i * 2)

f.consume()
这个默认做成服务端不存储消息,mqtt中间件适合前后端实时交互的。可以直接绕开后端flask django 接口,不用写接口,
前端直接发任务到mqtt,后端订阅,后端完成后,发送结果到唯一任务的topic

当然也可以 前端订阅topic,前端发任务到python flask接口,flask接口中发布任务到rabbitmq redis等,
后台消费完成把函数结果发布到mqtt,mqtt推送给前端
此框架的消费做成了mqtt的共享订阅,例如启动多个重复的消费者脚本,不会所有消费脚本都去重复处理一个消息

7.15 2021-04 新增以 httpsqs 作为消息中间件

@boost('httpsqs_queue_test',broker_kind=BrokerEnum.HTTPSQS)

7.16 2021-04 新增支持下一代分布式消息系统 pulsar 。

@boost('httpsqs_queue_test',broker_kind=BrokerEnum.PULSAR)

在开源的业界已经有这么多消息队列中间件了,pulsar作为一个新势力到底有什么优点呢?
pulsar自从出身就不断的再和其他的消息队列(kafka,rocketmq等等)做比较,但是Pulsar的设计思想和大多数的消息队列中间件都不同
,具备了高吞吐,低延迟,计算存储分离,多租户,异地复制等功能,所以pulsar也被誉为下一代消息队列中间件

pulsar 的消费者数量可以不受topic 分区数量的限制,比kafka和rabbitmq 强。5年后会替代kafka rabbitmq。
 
from funboost import boost,BrokerEnum

@boost('test_pulsar_topic2',broker_kind=BrokerEnum.PULSAR,qps=1,broker_exclusive_config={'subscription_name':'funboost_g1'})
def add(x,y):
    print(x+y)

if __name__ == '__main__':
    add.push(1,2)
    add.push(3,4)
    add.consume()

7.17 2021-04 新增延时运行任务,介绍见4.8

7.18 2021-09 新增 轻松远程服务器部署运行函数

框架叫分布式函数调度框架,可以在多台机器运行,因为消息队列任务是共享的。

我用的时候生产环境是使用 阿里云 codepipeline k8s部署的多个容器。还算方便。
在测试环境一般就是单机多进程运行的,用supervisor部署很方便。

所以之前没有涉及到多态机器的轻松自动部署。
如果要实现轻松的部署多台物理机,不借助除了python以外的其他手段的话,只能每台机器登录上然后下载代码,启动运行命令,机器多了还是有点烦的。
现在最新加入了 Python代码级的函数任务部署,不需要借助其他手段,python代码自动上传代码到远程服务器,并自动启动函数消费任务。
目前的自动化在远程机器启动函数消费,连celery都没有做到。

不依赖阿里云codepipeline 和任何运维发布管理工具,只需要在python代码层面就能实现多机器远程部署。

7.19 2021-09 新增 socket udp/tcp/http 消息队列,不需要安装消息中间件软件。

消费脚本,test_udp_consumer.py

必须先启动这个consume脚本消费然后才能发布,因为消费脚本里面启动了socket服务端,socket客户端才能连接。

from funboost import boost,BrokerEnum

# ip可以是远程机器的ip,不是一定必须是127.0.0.1,这代码只是举个例子,因为tcp支持跨机器.
@boost('127.0.0.1:5689',broker_kind=BrokerEnum.UDP)  #使用BrokerEnum.TCP就是tcp,BrokerEnum.HTTP就是http,  
def f(x):
    print(x)


if __name__ == '__main__':
    f.consume()
    f.push('hello')

发布脚本,test_udp_publisher.py

启动这个脚本之前必须先启动上面的消费脚本启动sockert服务端。

import time
from test_udp_consumer import f

for i in range(100000):
    time.sleep(2)
    f.push(i)

7.20 2021-09 新增 支持 nats 高性能消息队列

用法一如既往,只需要修改broker_kind的枚举,并在 funboost_config.py 配置好 NATS_URL 的值就完了。

@boost('test_queue66c', broker_kind=BrokerEnum.NATS)
def f(x, y):
    pass

7.21 2022-01 新增 @boost装饰器全局默认配置

boost装饰器没有亲自指定参数时候的全局默认值 ,用法见 4.15 章节 说明

7.22 2022-02 新增暂停消费功能

框架支持暂停消费功能和继续消费功能,用法见文档4.18

7.23 2022-04 消费者/boost装饰器 新增 broker_exclusive_config 参数

加上一个不同种类中间件非通用的配置,不同中间件自身独有的配置,不是所有中间件都兼容的配置,因为框架支持30种消息队列,消息队列不仅仅是一般的先进先出queue这么简单的概念,

可以看 4.20 章节

7.24 2022-04 新增用户 自定义记录消费状态结果函数钩子

可以通过设置 user_custom_record_process_info_func 的值为你的自定义函数,来记录消费状态及结果,用户可以自由发挥保存消费结果状态到任意地方

可以看 4.19章节

7.25 2022-04 新增用户灵活自由自定义扩展中间件和生产消费逻辑的功能

register_custom_broker 这个是增加的一种很强大的功能,用户可以自定义发布者和消费者,注册到框架中。boost装饰器就能自动使用你的消费者类和发布者类了。

可以看 4.21 章节

7.26 2022-07 新增内置以redis作为apscheduler存储的定时器,动态增删改查定时任务配置。

可以看4.4b章节的演示代码例子

7.27 2023-02 新增适配python 3.6-3.11所有版本

适配python 3.6-3.11所有版本

适配python 3.10 3.11 的asyncio并发模式,因为官方老是在新版本的asyncio模块,把asyncio的api入参改来改去的,现在适配了。

7.28 2023-02 新增 asyncio 语法生态下rpc获取执行结果

因为 async_result= fun.push() ,默认返回的是 AsyncResult 类型对象,里面的方法都是同步语法。
async_result.result 是一个耗时的函数, 解释一下result, 是property装饰的所以不用 async_result.result()
有的人直接在async def 的异步函数里面 print (async_result.result),如果消费函数消费需要耗时5秒,
那么意味rpc获取结果至少需要5秒才能返回,你这样写代码会发生灭顶之灾,asyncio生态流程里面一旦异步需要处处异步。
所以新增了 AioAsyncResult 类,和用户本来的asyncio编程生态更好的搭配。

7.29 2023-03 新增死信队列

抛出 ExceptionForPushToDlxqueue 类型错误,消息发送到单独另外的死信队列中,查看文档 4.24.c 和 4.24.d 4.24.e 章节。

7.30 2023-03 新增支持ctrl + c 退出程序

因为程序是多个子线程while 1的,ctrl+c无法结束程序。 现在框架增内部增加了下面的代码了,可以支持ctrl+c结束程序了。

def _interrupt_signal_handler(signal, frame):
    print('你按了 Ctrl+C  。 You pressed Ctrl+C!  结束程序!')
    # sys.exit(0)
    # noinspection PyUnresolvedReferences
    os._exit(0)  # os._exit才能更强力的迅速终止python,sys.exit只能退出主线程。


signal.signal(signal.SIGINT, _interrupt_signal_handler)

看4.6.5章节的演示代码例子

7.31 2023-04 新增支持 celery 作为 broker。

完全由celery框架来调度函数,发布函数和发布消息都是由celery框架来完成,但是用户无需学习celery语法和celery烦人的配置方式和烦人的celery目录结构,
用户对celery.Celery对象实例完全无需感知,用户不需要学习celery命令行启动消费和定时,funboost帮你自动搞定这些,用户无需接触celery命令行。

有的人担心funboost调度执行不稳定,有的人不对比瞎质疑funboost性能没有celery强,那么可以使用celery作为funboost中间件。
funboost只充当发布和启动消费的一层api,内部由celery驱动。 

funboost的好处是兼容30种消息队列或者叫ptython包,一统使用这些三方包的行为。用户切换中间件成本很低,无需知道每种中间件的语法差异。
就像sqlachemy能操作5种数据库,用户不需要知道mysql和sqlserver语法差异一样。
使用celery作为中间件,用户需要在 funboost_config.py  配置
CELERY_BROKER_URL(必须) 和 CELERY_RESULT_BACKEND (可以为None)
from funboost import boost,BrokerEnum
@boost(queue_1, broker_kind=BrokerEnum.CELERY)

python例子见 11.1章节

7.32 2023-04 新增支持 python 微服务框架 nameko 作为 broker。

nameko 是 外国人用的多的最知名python微服务框架,使用eventlet并发

funboost支持nameko作为执行调度和rpc实现,funboost只是提供统一的api交互。
from funboost import boost,BrokerEnum,ConcurrentModeEnum
@boost('test_nameko_queue', broker_kind=BrokerEnum.NAMEKO, concurrent_mode=ConcurrentModeEnum.EVENTLET)

python例子见 11.2 章节

7.33 2023-05 优化了apscheduler定式框架的动态删除添加定时任务

FsdfBackgroundScheduler 继承重写了BackgroundScheduler的 _main_loop 方法。

class FunboostBackgroundScheduler(BackgroundScheduler):
    def _main_loop(self):
        """原来的_main_loop 删除所有任务后wait_seconds 会变成None,无限等待。
        或者下一个需要运行的任务的wait_seconds是3600秒后,此时新加了一个动态任务需要3600秒后,
        现在最多只需要1秒就能扫描到动态新增的定时任务了。
        """
        MAX_WAIT_SECONDS_FOR_NEX_PROCESS_JOBS = 1
        wait_seconds = None
        while self.state == STATE_RUNNING:
            if wait_seconds is None:
                wait_seconds = MAX_WAIT_SECONDS_FOR_NEX_PROCESS_JOBS
            time.sleep(min(wait_seconds,MAX_WAIT_SECONDS_FOR_NEX_PROCESS_JOBS))  # 这个要取最小值,不然例如定时间隔0.1秒运行,不取最小值,不会每隔0.1秒运行。
            wait_seconds = self._process_jobs()

7.34 2023-05 重新实现了boost装饰器

对于一个呗@boost装饰的函数,到底应该怎么称呼它?

之前叫消费函数,消费函数是指的是boost装饰后的还是 原始函数本身,称呼不太明确。

之前的boost装饰器是使用函数来实现的,没有类型,现在的boost装饰器使用类来实现,一个函数被 boost装饰后,类型是 Booster,现在有类型了。

之前的boost装饰器是一个函数,在被装饰的函数 fun 本身附加consumer和publisher对象,以及各种方法。
返回的还是函数本身,但是附加了各种方法,方便用户 fun.push()  fun.consume() fun.get_message_count() 等等。
为了代码在pycahrm下补全犀利,还加了类型注释,boost的返回值指向一个为了补全犀利而写的 IdeAutoCompleteHelper类。

修改后的boost就是Booster类,现在boost返回的是 Booster类型的对象,补全效果很好。
对funboost的功能pycharm自动补全和函数本身的入参pycharm自动补全都很好。去掉了为了补全而写的 IdeAutoCompleteHelper。

现在一个函数被boost装饰后,他的类型就变成Booster了,可以称此函数为一个booster了。

重构之后的boost装饰器实现代码: https://github.com/ydf0509/funboost/blob/master/funboost/core/booster.py

重构之前的boost装饰器实现代码: https://github.com/ydf0509/funboost/blob/e299606a7271e24cae8dea7b9cbbcc400a4f4b0b/funboost/__init__old.py

7.35 2023-06 新增支持优先级队列

funboost支持任务优先级队列

见文档4.29

7.36 2023-07 新增支持 funboost远程杀死任务

funboost远程杀死任务

见文档4.30

7.37 2023-07 新增所有命名空间的日志和print都另外写入到一个总的文件中。

之前的funboost日志,每个命名空间的日志写入到不同的文件,每个队列名的消费者和发布者都有独立的日志命名空间,写入到不同的文件中,是为了用户方便排查。

例如你查func2的报错和运行记录,只需要到那个func2消费者.log的文件中去排查,这个文件日志不会包含别的消费函数的运行记录。

但有的人希望是项目中的所有日志 + print 写入到一个相同的文件中,方便排查上下文,这个几乎相当于 nohup 部署然后重定向标准输出到一个文件中了。
现在python代码级别对print和sys.stdout sys.stderr打了猴子补丁,支持所有print和logger打印另外写入到一个单独的文件中了。这是nb_log的新功能。

见nb_log文档 https://nb-log-doc.readthedocs.io/zh_CN/latest/articles/c10.html

10.1章节和1.1章节里面介绍了,怎么修改是否另外所有日志和print再单独写入到一个总的日志文件中。

7.38 2023-10 多线程并发模式,增加了支持async def的函数

对于async def 的函数,不需要boost装饰器指定 concurrent_mode=ConcurrentModeEnum.ASYNC ,每个线程内部会临时 loop= asyncio.new_event_loop(), 然后loop.run_unyil_cpmplete来运行async def的函数。

这种情况下 每个协程是运行在不同的loop中,这是假asyncio变成。

如果你想每个协程运行在一个loop里面,那就需要 boost装饰器指定 concurrent_mode=ConcurrentModeEnum.ASYNC,这是真asyncio编程。

7.39 2024-01 @booost装饰器入参变成pydantic Model类型, BoostParams类或子类

@boost(queue_test_f01', qps=0.2,broker_kind=BrokerEnum.REDIS_ACK_ABLE,)

建议把所有传参变为放在BoosterParams类或子类里面:

@boost(BoosterParams(queue_name='queue_test_f01', qps=0.2,broker_kind=BrokerEnum.REDIS_ACK_ABLE,))

7.40 2024-03 函数运行状态页面增加消息运行中状态

之前是只有消息运行完成后才会显示这条消息,运行中的消息不会显示,现在新增 running 状态的消息。

函数状态3.png

7.41 2024-03 新增 funboost_current_task 上下文

之前无法在用户的消费函数内部去获取消息的完全体,只能知道函数的入参,无法知道消息的发布时间 taskid等.

上下文就是类似flask的request对象,线程中任意地方可以获取,线程/协程隔离.

用户在任意消费函数中 
fct = funboost_current_task()
就能获取当前的任务消息了。

这个功能使得用户在用户函数中就能知道消息的完全体、 当前是哪台机器 、哪个进程、 第几次重试运行函数
消息的发布时间  消息的task_id 等等。

原来用户在消费函数中是无法获取这些信息的。

见文档4.31

详见文档4.31

7.42 2024-04 新增支持消费函数定义入参 **kwargs,用于消费任意json消息

见文档 4b.2 章节介绍.

https://visitor-badge.glitch.me/badge?page_id=distributed_framework