3.框架详细介绍

3.1 各种中间件选择的场景和优势

class BrokerEnum:
    RABBITMQ_AMQPSTORM = 'RABBITMQ_AMQPSTORM'  # 使用 amqpstorm 包操作rabbitmq  作为 分布式消息队列,支持消费确认.强烈推荐这个作为funboost中间件。
    RABBITMQ = RABBITMQ_AMQPSTORM

    RABBITMQ_RABBITPY = 'RABBITMQ_RABBITPY'  # 使用 rabbitpy 包操作rabbitmq  作为 分布式消息队列,支持消费确认。

    REDIS = 'REDIS'  # 使用 redis 的 list结构,brpop 作为分布式消息队列。随意重启和关闭会丢失大量消息,不支持消费确认。注重性能不在乎丢失消息可以选这个redis方案。

    MEMORY_QUEUE = 'MEMORY_QUEUE'  # 使用python queue.Queue实现的基于当前python进程的消息队列,不支持跨进程 跨脚本 跨机器共享任务,不支持持久化,适合一次性短期简单任务。
    LOCAL_PYTHON_QUEUE = MEMORY_QUEUE  # 别名,python本地queue就是基于python自带的语言的queue.Queue,消息存在python程序的内存中,不支持重启断点接续。

    RABBITMQ_PIKA = 'RABBITMQ_PIKA'  # 使用pika包操作rabbitmq  作为 分布式消息队列。

    MONGOMQ = 'MONGOMQ'  # 使用mongo的表中的行模拟的 作为分布式消息队列,支持消费确认。

    SQLITE_QUEUE = 'sqlite3'  # 使用基于sqlite3模拟消息队列,支持消费确认和持久化,但不支持跨机器共享任务,可以基于本机单机跨脚本和跨进程共享任务,好处是不需要安装中间件。
    PERSISTQUEUE = SQLITE_QUEUE  # PERSISTQUEUE的别名

    NSQ = 'NSQ'  # 基于nsq作为分布式消息队列,支持消费确认。

    KAFKA = 'KAFKA'  # 基于kafka作为分布式消息队列,如果随意重启会丢失消息,建议使用BrokerEnum.CONFLUENT_KAFKA。

    """基于confluent-kafka包,包的性能比kafka-python提升10倍。同时应对反复随意重启部署消费代码的场景,此消费者实现至少消费一次,第8种BrokerEnum.KAFKA是最多消费一次。"""
    KAFKA_CONFLUENT = 'KAFKA_CONFLUENT'
    CONFLUENT_KAFKA = KAFKA_CONFLUENT

    KAFKA_CONFLUENT_SASlPlAIN = 'KAFKA_CONFLUENT_SASlPlAIN'  # 可以设置账号密码的kafka

    REDIS_ACK_ABLE = 'REDIS_ACK_ABLE'  # 基于redis的 list + 临时unack的set队列,采用了 lua脚本操持了取任务和加到pengding为原子性,随意重启和掉线不会丢失任务。

    REDIS_PRIORITY = 'REDIS_PRIORITY'  # # 基于redis的多 list + 临时unack的set队列,blpop监听多个key,和rabbitmq的x-max-priority属性一样,支持任务优先级。看文档4.29优先级队列说明。

    SQLACHEMY = 'SQLACHEMY'  # 基于SQLACHEMY 的连接作为分布式消息队列中间件支持持久化和消费确认。支持mysql oracle sqlserver等5种数据库。

    ROCKETMQ = 'ROCKETMQ'  # 基于 rocketmq 作为分布式消息队列,这个中间件必须在linux下运行,win不支持。

    REDIS_STREAM = 'REDIS_STREAM'  # 基于redis 5.0 版本以后,使用 stream 数据结构作为分布式消息队列,支持消费确认和持久化和分组消费,是redis官方推荐的消息队列形式,比list结构更适合。

    ZEROMQ = 'ZEROMQ'  # 基于zeromq作为分布式消息队列,不需要安装中间件,可以支持跨机器但不支持持久化。

    RedisBrpopLpush = 'RedisBrpopLpush'  # 基于redis的list结构但是采用brpoplpush 双队列形式,和 redis_ack_able的实现差不多,实现上采用了原生命令就不需要lua脚本来实现取出和加入unack了。

    """
    操作 kombu 包,这个包也是celery的中间件依赖包,这个包可以操作10种中间件(例如rabbitmq redis),但没包括分布式函数调度框架的kafka nsq zeromq 等。
    同时 kombu 包的性能非常差,可以用原生redis的lpush和kombu的publish测试发布,使用brpop 和 kombu 的 drain_events测试消费,对比差距相差了5到10倍。
    由于性能差,除非是分布式函数调度框架没实现的中间件才选kombu方式(例如kombu支持亚马逊队列  qpid pyro 队列),否则强烈建议使用此框架的操作中间件方式而不是使用kombu。
    """
    KOMBU = 'KOMBU'

    """ 基于emq作为中间件的。这个和上面的中间件有很大不同,服务端不存储消息。所以不能先发布几十万个消息,然后再启动消费。mqtt优点是web前后端能交互,
    前端不能操作redis rabbitmq kafka,但很方便操作mqtt。这种使用场景是高实时的互联网接口。
    """
    MQTT = 'MQTT'

    HTTPSQS = 'HTTPSQS'  # httpsqs中间件实现的,基于http协议操作,dcoker安装此中间件简单。

    PULSAR = 'PULSAR'  # 最有潜力的下一代分布式消息系统。5年后会同时取代rabbitmq和kafka。

    UDP = 'UDP'  # 基于socket udp 实现的,需要先启动消费端再启动发布,支持分布式但不支持持久化,好处是不需要安装消息队列中间件软件。

    TCP = 'TCP'  # 基于socket tcp 实现的,需要先启动消费端再启动发布,支持分布式但不支持持久化,好处是不需要安装消息队列中间件软件。

    HTTP = 'HTTP'  # 基于http实现的,发布使用的urllib3,消费服务端使用的aiohttp.server实现的,支持分布式但不支持持久化,好处是不需要安装消息队列中间件软件。

    NATS = 'NATS'  # 高性能中间件nats,中间件服务端性能很好,。

    TXT_FILE = 'TXT_FILE'  # 磁盘txt文件作为消息队列,支持单机持久化,不支持多机分布式。不建议这个,用sqlite。

    PEEWEE = 'PEEWEE'  # peewee包操作mysql,使用表模拟消息队列

    REDIS_PUBSUB = 'REDIS_PUBSUB'  # 基于redis 发布订阅的,发布一个消息多个消费者都能收到同一条消息,但不支持持久化

    CELERY = 'CELERY'  # funboost支持celery框架来发布和消费任务,由celery框架来调度执行任务,但是写法简单远远暴击用户亲自使用celery的麻烦程度,
    # 用户永无无需关心和操作Celery对象实例,无需关心celery的task_routes和include配置,funboost来自动化设置这些celery配置。

    DRAMATIQ = 'DRAMATIQ'  # funboost使用 dramatiq 框架作为消息队列,dramatiq类似celery也是任务队列框架。用户使用funboost api来操作dramatiq核心调度。

    HUEY = 'HUEY'  # huey任务队列框架作为funboost调度核心

    RQ = 'RQ'  # rq任务队列框架作为funboost调度核心

    NAMEKO = 'NAMEKO'  # funboost支持python微服务框架nameko,用户无需掌握nameko api语法,就玩转python nameko微服务

你项目根目录下自动生成的 funboost_config.py 文件中修改配置,会被自动读取到。

此文件按需修改,例如你使用redis中间件作为消息队列,可以不用管rabbitmq mongodb kafka啥的配置。
但有3个功能例外,如果你需要使用rpc模式或者分布式控频或者任务过滤功能,无论设置使用何种消息队列中间件都需要把redis连接配置好,
如果@boost装饰器设置is_using_rpc_mode为True或者 is_using_distributed_frequency_control为True或do_task_filtering=True则需要把redis连接配置好,默认是False。

3.2 框架支持的函数调度并发模式种类详细介绍

1、threading 多线程,使用自定义的可缩小、节制开启新线程的自定义线程池,不是直接用官方内置concurrent.futures.ThreadpoolExecutor
   此线程池非常智能,配合qps参数,任何场景可以无脑开500线程,真正的做到智能扩张,智能自动缩小。
   这线程池是智能线程池,由于非常好用,为这个线程池做了独立的pypi包,可以单独用于没有使用此框架的项目。

2、gevent    需要在运行起点的脚本首行打 gevent 猴子补丁。

3、eventlet  需要在运行起点的脚本首行打 eventlet 猴子补丁。

4、asyncio  async异步,主要是针对消费函数已经定义成了   async def fun(x)  这种情况,这种情况不能直接使用多线程,
   因为执行  fun(1)  后得到的并不是所想象的函数最终结果,而是得到的一个协程对象,所以针对已经定义成异步函数了的,需要使用此种并发模式。
   框架不鼓励用户定义异步函数,你就用同步的直观方式思维定义函数就行了,其余的并发调度交给框架就行了。

5、开启多进程启动多个consumer,此模式是 多进程  + 上面4种的其中一种并发方式,充分利用多核和充分利用io,用法如下。可以实现 多进程 叠加 协程并发。
# 这种是多进程方式,一次编写能够兼容win和linux的运行。

from funboost import boost, BrokerEnum, ConcurrentModeEnum
import os

@boost('test_multi_process_queue',broker_kind=BrokerEnum.REDIS_ACK_ABLE,
           concurrent_mode=ConcurrentModeEnum.THREADING,)
def fff(x):
    print(x * 10,os.getpid())

if __name__ == '__main__':
    fff.multi_process_consume(6)  # 一次性启动6进程叠加多线程。

3.3 框架最最重要的boost装饰器参数说明

class BoosterParams(BaseJsonAbleModel):
    """
    pydatinc pycharm编程代码补全,请安装 pydantic插件, 在pycharm的  file -> settings -> Plugins -> 输入 pydantic 搜索,点击安装 pydantic 插件.

    @boost的传参必须是此类或者继承此类,如果你不想每个装饰器入参都很多,你可以写一个子类继承BoosterParams, 传参这个子类,例如下面的 BoosterParamsComplete
    """

    queue_name: str  # 队列名字,必传项,每个函数要使用不同的队列名字.

    """如果设置了qps,并且cocurrent_num是默认的50,会自动开了500并发,由于是采用的智能线程池任务少时候不会真开那么多线程而且会自动缩小线程数量。具体看ThreadPoolExecutorShrinkAble的说明
    由于有很好用的qps控制运行频率和智能扩大缩小的线程池,此框架建议不需要理会和设置并发数量只需要关心qps就行了,框架的并发是自适应并发数量,这一点很强很好用。"""
    concurrent_mode: str = ConcurrentModeEnum.THREADING  # 并发模式,支持THREADING,GEVENT,EVENTLET,ASYNC,SINGLE_THREAD并发,multi_process_consume 支持协程/线程 叠加多进程并发,性能炸裂.
    concurrent_num: int = 50  # 并发数量,并发种类由concurrent_mode决定
    specify_concurrent_pool: FunboostBaseConcurrentPool = None  # 使用指定的线程池/携程池,可以多个消费者共使用一个线程池,节约线程.不为None时候。threads_num失效
    specify_async_loop: asyncio.AbstractEventLoop = None  # 指定的async的loop循环,设置并发模式为async才能起作用。 有些包例如aiohttp,请求和httpclient的实例化不能处在两个不同的loop中,可以传过来.

    """qps:
    强悍的控制功能,指定1秒内的函数执行次数,例如可以是小数0.01代表每100秒执行一次,也可以是50代表1秒执行50次.为None则不控频。 设置qps时候,不需要指定并发数量,funboost的能够自适应智能动态调节并发池大小."""
    qps: typing.Union[float, int] = None
    """is_using_distributed_frequency_control:
    是否使用分布式空频(依赖redis统计消费者数量,然后频率平分),默认只对当前实例化的消费者空频有效。假如实例化了2个qps为10的使用同一队列名的消费者,并且都启动,则每秒运行次数会达到20。
    如果使用分布式空频则所有消费者加起来的总运行次数是10。"""
    is_using_distributed_frequency_control: bool = False

    is_send_consumer_hearbeat_to_redis: bool = False  # 是否将发布者的心跳发送到redis,有些功能的实现需要统计活跃消费者。因为有的中间件不是真mq。这个功能,需要安装redis.

    """max_retry_times:
    最大自动重试次数,当函数发生错误,立即自动重试运行n次,对一些特殊不稳定情况会有效果。
    可以在函数中主动抛出重试的异常ExceptionForRetry,框架也会立即自动重试。
    主动抛出ExceptionForRequeue异常,则当前 消息会重返中间件,
    主动抛出 ExceptionForPushToDlxqueue  异常,可以使消息发送到单独的死信队列中,死信队列的名字是 队列名字 + _dlx。"""
    max_retry_times: int = 3
    is_push_to_dlx_queue_when_retry_max_times: bool = False  # 函数达到最大重试次数仍然没成功,是否发送到死信队列,死信队列的名字是 队列名字 + _dlx。

    consumin_function_decorator: typing.Callable = None  # 函数的装饰器。因为此框架做参数自动转指点,需要获取精准的入参名称,不支持在消费函数上叠加 @ *args  **kwargs的装饰器,如果想用装饰器可以这里指定。
    function_timeout: typing.Union[int, float] = 0  # 超时秒数,函数运行超过这个时间,则自动杀死函数。为0是不限制。

    log_level: int = logging.DEBUG  # 消费者和发布者的日志级别,建议设置DEBUG级别,不然无法知道正在运行什么消息
    logger_prefix: str = ''  # 日志名字前缀,可以设置前缀
    create_logger_file: bool = True  # 发布者和消费者是否创建文件文件日志,为False则只打印控制台不写文件.
    log_filename: typing.Union[str, None] = None  # 消费者发布者的文件日志名字.如果为None,则自动使用 funboost.队列 名字作为文件日志名字.  日志文件夹是在nb_log_config.py的 LOG_PATH中决定的.
    is_show_message_get_from_broker: bool = False  # 运行时候,是否记录从消息队列获取出来的消息内容
    is_print_detail_exception: bool = True  # 消费函数出错时候,是否打印详细的报错堆栈,为False则只打印简略的报错信息不包含堆栈.

    msg_expire_senconds: typing.Union[float, int] = None  # 消息过期时间,可以设置消息是多久之前发布的就丢弃这条消息,不运行. 为None则永不丢弃

    do_task_filtering: bool = False  # 是否对函数入参进行过滤去重.
    task_filtering_expire_seconds: int = 0  # 任务过滤的失效期,为0则永久性过滤任务。例如设置过滤过期时间是1800秒 , 30分钟前发布过1 + 2 的任务,现在仍然执行,如果是30分钟以内发布过这个任务,则不执行1 + 2

    function_result_status_persistance_conf: FunctionResultStatusPersistanceConfig = FunctionResultStatusPersistanceConfig(
        is_save_result=False, is_save_status=False, expire_seconds=7 * 24 * 3600, is_use_bulk_insert=False)  # 是否保存函数的入参,运行结果和运行状态到mongodb。这一步用于后续的参数追溯,任务统计和web展示,需要安装mongo。

    user_custom_record_process_info_func: typing.Callable = None  # 提供一个用户自定义的保存消息处理记录到某个地方例如mysql数据库的函数,函数仅仅接受一个入参,入参类型是 FunctionResultStatus,用户可以打印参数

    is_using_rpc_mode: bool = False  # 是否使用rpc模式,可以在发布端获取消费端的结果回调,但消耗一定性能,使用async_result.result时候会等待阻塞住当前线程。

    is_support_remote_kill_task: bool = False  # 是否支持远程任务杀死功能,如果任务数量少,单个任务耗时长,确实需要远程发送命令来杀死正在运行的函数,才设置为true,否则不建议开启此功能。

    is_do_not_run_by_specify_time_effect: bool = False  # 是否使不运行的时间段生效
    do_not_run_by_specify_time: tuple = ('10:00:00', '22:00:00')  # 不运行的时间段,在这个时间段自动不运行函数.

    schedule_tasks_on_main_thread: bool = False  # 直接在主线程调度任务,意味着不能直接在当前主线程同时开启两个消费者。

    consuming_function: typing.Callable = None  # 消费函数,在@boost时候不用指定,因为装饰器知道下面的函数.

    broker_kind: str = BrokerEnum.PERSISTQUEUE  # 中间件选型见3.1章节 https://funboost.readthedocs.io/zh/latest/articles/c3.html

    broker_exclusive_config: dict = {}  # 加上一个不同种类中间件非通用的配置,不同中间件自身独有的配置,不是所有中间件都兼容的配置,因为框架支持30种消息队列,消息队列不仅仅是一般的先进先出queue这么简单的概念,
    # 例如kafka支持消费者组,rabbitmq也支持各种独特概念例如各种ack机制 复杂路由机制,每一种消息队列都有独特的配置参数意义,可以通过这里传递。

   

关于boost参数太多的说明:

有人会抱怨入参超多很复杂,是因为要实现一切控制方式,实现的运行控制手段非常丰富,所以参数就会多。

看这个里面的参数解释非常重要,几乎能想到的控制功能全部都有。比如有人说日志太多,不想看那么详细的提示日志
,早就通过参数提供实现了,自己抱怨参数多又以为没提供这个功能,简直是自相矛盾。

想入参参数少那就看新增的那个10行代码的函数的最精简乞丐版实现的分布式函数执行框架,演示最本质实现原理。“ 
这个例子的框架啥控制手段都没有,参数自然就很少。

乞丐版分布式函数调度框架的代码在 

funboost/beggar_version_implementation/beggar_redis_consumer.py

3.3.2 funboost 重要公有方法大全介绍

仔细看以下代码注释,函数的功能

import json
import time

from funboost import boost, BrokerEnum,PriorityConsumingControlConfig,BoosterParams


@boost(BoosterParams(queue_name='queue1', broker_kind=BrokerEnum.REDIS, qps=0.2))
def f(x, y):
    return x + y


@boost(BoosterParams(queue_name='queue2', broker_kind=BrokerEnum.REDIS, qps=7))
def f2(a, b):
    return a - b


if __name__ == '__main__':
    f.clear()  # 清空f函数对应的queue1所有消息
    for i in range(10):
        f.push(i, i * 2)  # 使用push发布消息到queue1,push的入参和正常调用函数一样
        f2.publish({'a': i, 'b': i * 2},priority_control_config=PriorityConsumingControlConfig(msg_expire_senconds=30))  # # 使用publish发布消息到queue2,publish的入参第一个参数是一个字典,把所有参数组成一个字典,还可以传入其他参数。publish更强大。

    print(f.get_message_count())  # 获取消息队列中的消息数量
    f.consume()  # 在当前进程启动多线程/协程消费
    f2.multi_process_consume(3)  # 启动3个进程,每个进程内部都启动多线程/协程消费,性能炸裂。
重要方法就是 @boost装饰器的入参,被@boost装饰的消费函数自动有funboost框架的功能
其中最常见的是:
push,推送消息到消息队列
consume, 在当前进程启动多线程/协程消费
multi_process_consume(n) ,启动多个进程,每个进程内部叠加多线程/协程,性能更强.

冷门方法

除以上方法外,还有其他的不常用的方法,在第四章有介绍,
在pycharm中可以代码补全有哪些方法,自己按照方法名字就能猜出是什么意思了。也可以点进去boost装饰器里面去,里面有每个方法的注释说明。
例如 f.pause_consume() 可以从python解释器的外部远程,让已经启动queue1的消费函数停止消费,f.continue_consume() 继续消费。

3.3.3 boost装饰器 的 concurrent_num 和 qps 之间的关系。

 concurrent_num:并发数量。
    qps qps是有个很有趣的参数,能精确控制函数每秒运行多少次。
    concurrent_num和qps存在着一定的关系。
    
    例如对于下面这个函数
    
    def func(x):
           time.sleep(2)
           print(x)

    1)如果设置 concurrent_num = 1000(或100万)  qps = 10
    那么一秒钟会执行10次func函数。如果不指定qps的值,则不进行控频,消费框架会平均每秒钟会执行50次函数func。

    如果设置concurrent_num = 1000  qps = 5   
    那么一秒钟会执行5次func函数。所以可以看到,当你不知道要开多少并发合适的时候,可以粗暴开1000个线程,但要设置一个qps。
   
    那为什么次框架,可以让你粗暴的设置1000设置100万线程呢,并不是做了数字截取,判断线程设置大于多少就自动调小了,此消费框架并没有这样去实现。
    而是次框架使用的非concurrent.tutures.ThreadpoolExecutor,是使用的自定义的  ThreadPoolExecutorShrinkAble 线程池,
    此线程池其中之一的功能就是节制开更多的线程,因为对于上面的休眠2秒的func函数,如果设置concurrent_num = 1000000  qps = 5,
    正常来说开10个线程足够实现每秒执行5次了,此框架在调节线程新增线程时候进行了更多的判断,所以原生线程池不可以设置100万大小,
    而ThreadPoolExecutorShrinkAble可以设置为100万大小。

    此外ThreadPoolExecutorShrinkAble 实现了线程池自动缩小的功能,这也是原生concurrent.tutures.ThreadpoolExecutor没有的功能。
    自动缩小是什么意思呢,比如一段时间任务非常密集1秒钟来了几百个任务,所以当时开启了很多线程来应付,但一段时间后每分钟只来了个把任务,
    此时 ThreadPoolExecutorShrinkAble 能够自动缩小线程池,
    ThreadPoolExecutorShrinkAble实现了java ThreadpoolExecutor的KeepAliveTime参数的功能,
    原生concurrent.tutures.ThreadpoolExecutor线程池即使以后永久不来新任务,之前开的线程数量一致保持这。

    关于 ThreadPoolExecutorShrinkAble 的厉害之处,可以参考 https://github.com/ydf0509/threadpool_executor_shrink_able
    
    最终关于 concurrent_num 大小设置为多少,看自己需求,上面说的100万是举个例子,
    实际这个参数还被用作为线程池的任务队列的有界队列的大小,所以一定要设置为1000以下,否则如果设置为100万,
    从消息中间件预取出的消息过多,造成python内存大、单个消费者掏空消息队列中间件造成别的新启动的消费者无任务可消费、
    对于不支持消费确认类型的中间件的随意重启会丢失大量正在运行的任务 等不利影响。

    2)上面的func函数,设置 concurrent_num = 1  qps = 100,那会如何呢?
       由于你设置的并发是1,对于一个需要2秒运行完成的函数,显然平均每2秒才能执行1次,就是框架真正的只能达到0.5个qps。
       所以 concurrent_num 和 qps,既有关系,也不是绝对的关系。
    
    在对一个随机消耗时间的函数进行并发控制时候,如果函数的运行时间是0.5到20秒任意时间不确定的徘徊,你可以设置 concurrent_num = 100,
    如果合作方要求了只能1秒钟能让你使用多少次,例如需要精确控频10次,可以设置qps =10,concurrent_num随便搞个 一两百 两三百就行了,
    因为是智能的克制的调节线程池大小的,所以不会真的达到concurrent_num的值。

    3)qps是个小数可以小于1,如果要设置10秒执行一次函数,则设置qps=0.1

    这主要是介绍了 concurrent_num 和qps的关系和设置值,qps是优先,但受到concurrent_num的约束。

3.4 框架的乞丐精简版实现方式

由于框架的功能十分多,如果没学习36种设计模式,就很难看懂源码,现在演示精简实现原理

此精简例子十分之简单明了,就是死循环从中间件取任务然后丢到线程池里面执行。

此代码在 funboost/beggar_version_implementation/beggar_redis_consumer.py

这样简单明了,演示了基本原理,但是这个缺少消费确认(随意重启代码会造成大量任务丢失) qps恒定等20种功能。

def start_consuming_message(queue_name, consume_function, threads_num=50):
    pool = ThreadPoolExecutor(threads_num)
    while True:
        try:
            redis_task = redis.brpop(queue_name, timeout=60)
            if redis_task:
                task_str = redis_task[1].decode()
                print(f'从redis的 {queue_name} 队列中 取出的消息是: {task_str}')
                pool.submit(consume_function, **json.loads(task_str))
            else:
                print(f'redis的 {queue_name} 队列中没有任务')
        except redis.RedisError as e:
            print(e)


if __name__ == '__main__':
    import time


    def add(x, y):
        time.sleep(5)
        print(f'{x} + {y} 的结果是 {x + y}')

    # 推送任务
    for i in range(100):
        print(i)
        redis.lpush('test_beggar_redis_consumer_queue', json.dumps(dict(x=i, y=i * 2)))


    start_consuming_message('test_beggar_redis_consumer_queue', consume_function=add, threads_num=10)

3.5 框架的任务消费确认

此框架可以确保客户端任何时候 随意断电 粗暴重启代码 随意关机,任务万无一失。

3.4演示的精简版框架,实现redis的list的push和pop来模拟消息队列,很明显不靠谱,kill 9 重启代码或者重启电脑很容易会丢失大量任务。

分布式一致性消息传递、事件处理等场景中十分重要,分为3种情况:
At most Onece:最多一次,如果算子处理事件失败,算子将不再尝试该事件。
At Least Onece:至少一次,如果算子处理事件失败,算子会再次尝试该处理事件,直到有一次成功。
Exactly-Once:严格地,有且仅处理一次,通常有两种方法实现。

3.4实现的是最多一次,框架在多种中间件使用消费确认实现了万无一失 ,达到了Exactly-Once。
Exactly-Once是最好的也是实现难度最复杂的;At most Onece通常是最差的方式,也是最简单的实现方式。

框架在使用rabbitmq,内置默认了确认消费。

框架在使用redis作为中间件时候,有很多种实现方式,REDIS 是最不靠谱的会丢失消息。
REDIS_ACK_ABLE 、 REDIS_STREAM、 RedisBrpopLpush BrokerKind 这三种都是实现了确认消费。

3.6 框架的设计规范原则

源码实现思路基本90%遵守了oop的6个设计原则,很容易扩展中间件。
1、单一职责原则——SRP
2、开闭原则——OCP
3、里式替换原则——LSP
4、依赖倒置原则——DIP
5、接口隔离原则——ISP
6、迪米特原则——LOD

最主要是大量使用了模板模式、工厂模式、策略模式、鸭子类。
可以仿照源码中实现中间件的例子,只需要继承发布者、消费者基类后实现几个抽象方法即可添加新的中间件。