4b.使用框架的各种代码示例(高级进阶)

4b.1 日志模板中自动显示task_id

4b.1.1 日志模板中显示task_id

在 funboost_config.py 中设置如下 (43.0版本以后的配置默认就是待task_id的模板了)

如果用户的 funboost_config.py 是旧的日志模板,升级到43.0以后版本,需要修改 NB_LOG_FORMATER_INDEX_FOR_CONSUMER_AND_PUBLISHER 为新的带task_id的日志模板,日志中才能自动显示task_id

import logging
from nb_log import nb_log_config_default
class FunboostCommonConfig(DataClassBase):
    # nb_log包的第几个日志模板,内置了7个模板,可以在你当前项目根目录下的nb_log_config.py文件扩展模板。
    # NB_LOG_FORMATER_INDEX_FOR_CONSUMER_AND_PUBLISHER = 11  # 7是简短的不可跳转,5是可点击跳转的,11是可显示ip 进程 线程的模板,也可以亲自设置日志模板不传递数字。
    NB_LOG_FORMATER_INDEX_FOR_CONSUMER_AND_PUBLISHER = logging.Formatter(
        f'%(asctime)s-({nb_log_config_default.computer_ip},{nb_log_config_default.computer_name})-[p%(process)d_t%(thread)d] - %(name)s - "%(filename)s:%(lineno)d" - %(funcName)s - %(levelname)s - %(task_id)s - %(message)s',
        "%Y-%m-%d %H:%M:%S",)   # 这个是带task_id的日志模板,日志可以显示task_id,方便用户串联起来排查某一个人物消息的所有日志.

待task_id的日志模板如下图

img_62.png

4b.1.2 用户在消费函数中想自动显示task_id,方便搜索task_id的关键字来排查某条消息的所有日志.

用户使用 logger = LogManager('namexx',logger_cls=TaskIdLogger).get_logger_and_add_handlers(......) 的方式来创建logger,

关键是用户需要设置 logger_cls=TaskIdLogger

代码连接: https://github.com/ydf0509/funboost/blob/master/test_frame/test_funboost_current_task/test_current_task.py

代码如下:


import random
import time

from funboost import boost, FunctionResultStatusPersistanceConfig, BoosterParams
from funboost.core.current_task import funboost_current_task
from funboost.core.task_id_logger import TaskIdLogger
import nb_log
from funboost.funboost_config_deafult import FunboostCommonConfig
from nb_log import LogManager

LOG_FILENAME_QUEUE_FCT = 'queue_fct.log'
# 使用TaskIdLogger创建的日志配合带task_id的日志模板,每条日志会自动带上task_id,方便用户搜索日志,定位某一个任务id的所有日志。
task_id_logger = LogManager('namexx', logger_cls=TaskIdLogger).get_logger_and_add_handlers(
    log_filename='queue_fct.log',
    error_log_filename=nb_log.generate_error_file_name(LOG_FILENAME_QUEUE_FCT),
    formatter_template=FunboostCommonConfig.NB_LOG_FORMATER_INDEX_FOR_CONSUMER_AND_PUBLISHER, )

# 如果不使用TaskIdLogger来创建logger还想使用task_id的日志模板,需要用户在打印日志时候手动传 extra={'task_id': fct.task_id}
common_logger = nb_log.get_logger('namexx2',formatter_template=FunboostCommonConfig.NB_LOG_FORMATER_INDEX_FOR_CONSUMER_AND_PUBLISHER)



@boost(BoosterParams(queue_name='queue_test_fct', qps=2, concurrent_num=5, log_filename=LOG_FILENAME_QUEUE_FCT))
def f(a, b):
    fct = funboost_current_task()  # 线程/协程隔离级别的上下文

    # 以下的每一条日志都会自带task_id显示,方便用户串联起来排查问题。
    fct.logger.warning('如果不想亲自创建logger对象,可以使用fct.logger来记录日志,fct.logger是当前队列的消费者logger对象')
    task_id_logger.info(fct.function_result_status.task_id)  # 获取消息的任务id
    task_id_logger.debug(fct.function_result_status.run_times)  # 获取消息是第几次重试运行
    task_id_logger.info(fct.full_msg)  # 获取消息的完全体。出了a和b的值意外,还有发布时间 task_id等。
    task_id_logger.debug(fct.function_result_status.publish_time_str)  # 获取消息的发布时间
    task_id_logger.debug(fct.function_result_status.get_status_dict())  # 获取任务的信息,可以转成字典看。

    # 如果 用户不是使用TaskIdLogger插件的logger对象,那么要在模板中显示task_id,
    common_logger.debug('假设logger不是TaskIdLogger类型的,想使用带task_id的日志模板,那么需要使用extra={"task_id":fct.task_id}', extra={'task_id': fct.task_id})

    time.sleep(2)
    task_id_logger.debug(f'哈哈 a: {a}')
    task_id_logger.debug(f'哈哈 b: {b}')
    task_id_logger.info(a + b)
    if random.random() > 0.99:
        raise Exception(f'{a} {b} 模拟出错啦')

    return a + b


if __name__ == '__main__':
    # f(5, 6)  # 可以直接调用

    for i in range(0, 200):
        f.push(i, b=i * 2)

    f.consume()

运行如图:

img_63.png

可以看到每条日志自动就显示了task_id, 这样的好处是可以通过搜索 task_id,来排查用户的某条消息的整条链路情况.

4b.1.3 一键全局使用 TaskIdLogger 代替 logging.Logger 的方式

import logging
logging.setLoggerClass(TaskIdLogger)  # 越早运行越好,这样就不需要每次都设置TaskIdLogger来实例化logger了。

4b.1.4 能在消费函数的整个链路里面的调用的任意函数获取task_id的原理

fct = funboost_current_task()

funboost_current_task() 因为是线程 /协程 级别隔离的,就是线程/协程上下文.

4b.2 支持消费函数定义入参 **kwargs ,用于消费任意json消息

有时候,消息是已存在的,而且别的部门没有使用funboost,且消息中字段达到几十上百个,用户不希望一个个字段的定义函数入参.

如果是funboost来发布不定项的入参,通过设置 should_check_publish_func_params=False,让 publisher 不再校验发布入参

代码如下:

import time
from funboost import boost, BrokerEnum, BoosterParams, funboost_current_task


@boost(boost_params=BoosterParams(queue_name="task_queue_name2c", qps=5, broker_kind=BrokerEnum.REDIS, log_level=10, should_check_publish_func_params=False))  # 入参包括20种,运行控制方式非常多,想得到的控制都会有。
def task_fun(**kwargs):
    print(kwargs)  # 打印函数入参
    fct = funboost_current_task()
    print(fct.full_msg) # 打印消息体.
    time.sleep(3)  # 框架会自动并发绕开这个阻塞,无论函数内部随机耗时多久都能自动调节并发达到每秒运行 5 次 这个 task_fun 函数的目的。


if __name__ == "__main__":
    for i in range(10):
        task_fun.push(x=i, y=i * 2, x3=6, x4=8, x5={'k1': i, 'k2': i * 2, 'k3': i * 3})  # 发布者发布任务
        task_fun.publish(dict(x=i, y=i * 2, x3=6, x4=8, x5={'k1': i, 'k2': i * 2, 'k3': i * 3}))  # 发布者发布任务
        task_fun.publisher.send_msg(dict(m=i, n=i * 2,  z=[4,5,6]))
    task_fun.consume()  # 消费者启动循环调度并发消费任务

运行如图: img_66.png

假设 task_queue_name2c 队列是别的部门发布的,或者你希望向 task_queue_name2c 队列中发布任意消息,那么可以使用send_msg

或者通过设置 should_check_publish_func_params=False 后使用 push或者publish来发布消息.

这样task_fun 支持消息任意消息,只要消息是json就行了.