4b.使用框架的各种代码示例(高级进阶)
4b.1 日志模板中自动显示task_id
4b.1.1 日志模板中显示task_id
在 funboost_config.py 中设置如下 (43.0版本以后的配置默认就是待task_id的模板了)
如果用户的 funboost_config.py 是旧的日志模板,升级到43.0以后版本,需要修改 NB_LOG_FORMATER_INDEX_FOR_CONSUMER_AND_PUBLISHER 为新的带task_id的日志模板,日志中才能自动显示task_id
import logging
from nb_log import nb_log_config_default
class FunboostCommonConfig(DataClassBase):
# nb_log包的第几个日志模板,内置了7个模板,可以在你当前项目根目录下的nb_log_config.py文件扩展模板。
# NB_LOG_FORMATER_INDEX_FOR_CONSUMER_AND_PUBLISHER = 11 # 7是简短的不可跳转,5是可点击跳转的,11是可显示ip 进程 线程的模板,也可以亲自设置日志模板不传递数字。
NB_LOG_FORMATER_INDEX_FOR_CONSUMER_AND_PUBLISHER = logging.Formatter(
f'%(asctime)s-({nb_log_config_default.computer_ip},{nb_log_config_default.computer_name})-[p%(process)d_t%(thread)d] - %(name)s - "%(filename)s:%(lineno)d" - %(funcName)s - %(levelname)s - %(task_id)s - %(message)s',
"%Y-%m-%d %H:%M:%S",) # 这个是带task_id的日志模板,日志可以显示task_id,方便用户串联起来排查某一个人物消息的所有日志.
待task_id的日志模板如下图
4b.1.2 用户在消费函数中想自动显示task_id,方便搜索task_id的关键字来排查某条消息的所有日志.
用户使用 logger = LogManager('namexx',logger_cls=TaskIdLogger).get_logger_and_add_handlers(......) 的方式来创建logger,
关键是用户需要设置 logger_cls=TaskIdLogger
代码如下:
import random
import time
from funboost import boost, FunctionResultStatusPersistanceConfig, BoosterParams
from funboost.core.current_task import funboost_current_task
from funboost.core.task_id_logger import TaskIdLogger
import nb_log
from funboost.funboost_config_deafult import FunboostCommonConfig
from nb_log import LogManager
LOG_FILENAME_QUEUE_FCT = 'queue_fct.log'
# 使用TaskIdLogger创建的日志配合带task_id的日志模板,每条日志会自动带上task_id,方便用户搜索日志,定位某一个任务id的所有日志。
task_id_logger = LogManager('namexx', logger_cls=TaskIdLogger).get_logger_and_add_handlers(
log_filename='queue_fct.log',
error_log_filename=nb_log.generate_error_file_name(LOG_FILENAME_QUEUE_FCT),
formatter_template=FunboostCommonConfig.NB_LOG_FORMATER_INDEX_FOR_CONSUMER_AND_PUBLISHER, )
# 如果不使用TaskIdLogger来创建logger还想使用task_id的日志模板,需要用户在打印日志时候手动传 extra={'task_id': fct.task_id}
common_logger = nb_log.get_logger('namexx2',formatter_template=FunboostCommonConfig.NB_LOG_FORMATER_INDEX_FOR_CONSUMER_AND_PUBLISHER)
@boost(BoosterParams(queue_name='queue_test_fct', qps=2, concurrent_num=5, log_filename=LOG_FILENAME_QUEUE_FCT))
def f(a, b):
fct = funboost_current_task() # 线程/协程隔离级别的上下文
# 以下的每一条日志都会自带task_id显示,方便用户串联起来排查问题。
fct.logger.warning('如果不想亲自创建logger对象,可以使用fct.logger来记录日志,fct.logger是当前队列的消费者logger对象')
task_id_logger.info(fct.function_result_status.task_id) # 获取消息的任务id
task_id_logger.debug(fct.function_result_status.run_times) # 获取消息是第几次重试运行
task_id_logger.info(fct.full_msg) # 获取消息的完全体。出了a和b的值意外,还有发布时间 task_id等。
task_id_logger.debug(fct.function_result_status.publish_time_str) # 获取消息的发布时间
task_id_logger.debug(fct.function_result_status.get_status_dict()) # 获取任务的信息,可以转成字典看。
# 如果 用户不是使用TaskIdLogger插件的logger对象,那么要在模板中显示task_id,
common_logger.debug('假设logger不是TaskIdLogger类型的,想使用带task_id的日志模板,那么需要使用extra={"task_id":fct.task_id}', extra={'task_id': fct.task_id})
time.sleep(2)
task_id_logger.debug(f'哈哈 a: {a}')
task_id_logger.debug(f'哈哈 b: {b}')
task_id_logger.info(a + b)
if random.random() > 0.99:
raise Exception(f'{a} {b} 模拟出错啦')
return a + b
if __name__ == '__main__':
# f(5, 6) # 可以直接调用
for i in range(0, 200):
f.push(i, b=i * 2)
f.consume()
运行如图:
可以看到每条日志自动就显示了task_id, 这样的好处是可以通过搜索 task_id,来排查用户的某条消息的整条链路情况.
4b.1.3 一键全局使用 TaskIdLogger 代替 logging.Logger 的方式
import logging
logging.setLoggerClass(TaskIdLogger) # 越早运行越好,这样就不需要每次都设置TaskIdLogger来实例化logger了。
4b.1.4 能在消费函数的整个链路里面的调用的任意函数获取task_id的原理
fct = funboost_current_task()
funboost_current_task() 因为是线程 /协程 级别隔离的,就是线程/协程上下文.
4b.2 支持消费函数定义入参 **kwargs ,用于消费任意json消息
有时候,消息是已存在的,而且别的部门没有使用funboost,且消息中字段达到几十上百个,用户不希望一个个字段的定义函数入参.
如果是funboost来发布不定项的入参,通过设置 should_check_publish_func_params=False,让 publisher 不再校验发布入参
代码如下:
import time
from funboost import boost, BrokerEnum, BoosterParams, funboost_current_task
@boost(boost_params=BoosterParams(queue_name="task_queue_name2c", qps=5, broker_kind=BrokerEnum.REDIS, log_level=10, should_check_publish_func_params=False)) # 入参包括20种,运行控制方式非常多,想得到的控制都会有。
def task_fun(**kwargs):
print(kwargs) # 打印函数入参
fct = funboost_current_task()
print(fct.full_msg) # 打印消息体.
time.sleep(3) # 框架会自动并发绕开这个阻塞,无论函数内部随机耗时多久都能自动调节并发达到每秒运行 5 次 这个 task_fun 函数的目的。
if __name__ == "__main__":
for i in range(10):
task_fun.push(x=i, y=i * 2, x3=6, x4=8, x5={'k1': i, 'k2': i * 2, 'k3': i * 3}) # 发布者发布任务
task_fun.publish(dict(x=i, y=i * 2, x3=6, x4=8, x5={'k1': i, 'k2': i * 2, 'k3': i * 3})) # 发布者发布任务
task_fun.publisher.send_msg(dict(m=i, n=i * 2, z=[4,5,6]))
task_fun.consume() # 消费者启动循环调度并发消费任务
运行如图:
假设 task_queue_name2c 队列是别的部门发布的,或者你希望向 task_queue_name2c 队列中发布任意消息,那么可以使用send_msg
或者通过设置 should_check_publish_func_params=False 后使用 push或者publish来发布消息.
这样task_fun 支持消息任意消息,只要消息是json就行了.