前段时间,阿里开源了qwen-agent,可以对长文档进行 RAG 增强的对话问题。但在对话之前,还缺了第一步——人们总是习惯先问一句“总结一下这篇文档说了什么”,然后再根据总结来具体提问。“长文总结”,其实是大模型应用要过的第一关。
在 IT 运维领域,情况也类似。我们有强大的搜索引擎技术,却总要先靠经验猜测几个关键字,就是因为没法预知海量日志说了啥。
日志聚类技术试图解决这个问题,但聚类和模式识别把变量部分抽象成占位符,也丢失了大量信息。怎么把这些变量信息尽可能保留住,成为当前需要克服的主要问题。
业界已经有一些简单的海量日志总结方案在尝试。这里给大家讲两个。
第一个,微软云 service bus 团队,他们在《Intelligent Solutions for Retroactive Anomaly Detection and Resolution with Log File Systems》论文中采用的方案是:
界面如图:
第二个,阿里云 Flink 团队,他们在《RCAgent: Cloud Root Cause Analysis by Autonomous Agents with Tool-Augmented Large Language Models》论文中,针对 Flink 日志设计了一套非常详尽的总结方案,我个人不是 Flink 专家,无法评判,贴出来供大家参考:
从二个团队的做法都可以看到,大家仅在出错时,才对错误时段的日志做总结,从”场景”层面缩小日志量、突出核心信息。但方案中如何保留变量信息、甚至扩展更多信息,就各有各的设计,比较依赖人的经验了。
今天,我参照类似的想法,并从上期介绍的《大模型时代的日志解析算法总结》里,引入 punct 分组和 Determinantal Point Process 算法,即尽量保持信息的多样性,又不过高要求大模型的上下文支持,实现快速高效的海量日志总结。
首先给大家看一眼在日志易仪表盘上的最终效果:
对应的日志易 SPL 语句如下:
* | eval msg=raw_message
| parse field=msg mode=sed "s/(\b)[a-zA-Z0-9]*//g"
| parse field=msg mode=sed "s/\s+/_/g"
| stats list(raw_message) as 'raw_message' by msg
| streamstats count() as cluster
| fields - msg
| mvexpand raw_message
| fit TFIDF analyzer="word" max_features=50 from raw_message
| dpp k=5
这段语句中大部分内容是为了照抄 punct 分组效果,实际上日志易本身自带了一些基础的机器学习能力,所以,我们可以直接在 SPL 中完成聚类、特征值提取,更直接的SPL实现方案:
* | eval msg=raw_message
| fit TFIDF analyzer="word" max_features=50 from raw_message
| fit DBSCAN eps=0.2 from raw_message_tfidf_*
| dpp k=5
所以,我们只需要额外再实现 DPP 采样和 LLM 调用即可。也就是上面语句中最后一段用到的自定义 SPL 指令:dpp。
日志易支持用户自己编写 python 程序,继承日志易特定类和实现方法,然后上传就可以使用。
程序中还可以复用日志易 fit 指令自带的 sklearn 机器学习库,不用操心安装部署问题。
#!python3
import os
import sys
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
executehome = "/opt/rizhiyi/parcels/splserver"
lib_path = os.path.join(executehome, 'bin', 'custom_commands', 'lib')
sys.path.insert(0, lib_path)
from src.cn.yottabyte.process.centralized_handler import CentralizedHandler
from src.cn.yottabyte.process.util import loggers
from src.cn.yottabyte.process.util import data_util
from src.cn.yottabyte.process.table import Table
logger = loggers.get_logger().getChild('DPPHandler')
class DPPHandler(CentralizedHandler):
def initialize(self, meta):
# 获取自定义指令运行参数,这里我设计
# k 来代表每个聚类的采样数量
# by_field 代表是输出每个聚类总结还是全局总结
self.by = None
args_dict, args_list = data_util.get_args_from_meta(meta)
if args_dict:
self.k = args_dict.get("k", "3")
self.by = args_dict.get("by_field")
return {'type': 'centralized'}
def execute(self, meta, table):
finished = meta.get("finished", False)
if finished:
# 自定义实现部分
...
else:
table = Table()
return meta, table
if __name__ == "__main__":
handler = DPPHandler()
handler.run()
handler.close()
接下来,我们直接从 LogBatcher 开源实现中,复制对应的 dpp_sample() 函数。
def dpp_sample(self, S, k):
# S: similarity matrix
# k: number of items to sample
n = S.shape[0]
# Initialize empty set Y
Y = set()
for _ in range(k):
best_i = -1
best_p = -1
for i in range(n):
if i not in Y:
# Compute determinant of submatrix
det_Yi = np.linalg.det(S[np.ix_(list(Y) + [i], list(Y) + [i])])
# Compute probability of adding i to Y
p_add = det_Yi / (1 + det_Yi)
if p_add > best_p:
best_p = p_add
best_i = i
# Add best item to Y
Y.add(best_i)
return list(Y)
注:我个人已经习惯一百行以下的代码让智谱清言实现。然后对比二者时学到一个有趣的知识点,DPP 有连续型和离散型两种不同实现,智谱默认会给数值连续型的采样方法,而 LogBatcher 里是针对文本离散型的。要注意差异。
然后就是 LLM 部分。作为实验,我直接调用公开的免费 API 实现。注意:百度千帆平台上提供的 ernie-speed 是无限制永久免费,而阿里云上提供的 qwen 系列是赠送一定量 token,且有 TPM 限速。
这部分代码就不贴了,百度和阿里官方文档中 HTTP 示例基本可以原样复制使用。如果采用 access_token 方式,建议额外存储一下,不用每次调用都重新获取。
实践发现,ernie-speed-128k 的总结效果确实烂到爆炸(复读了一遍 prompt 结尾的内容)!所以,综合考虑了效果与成本控制后,我决定:对每个聚类内的采样日志总结时,采用 ernie-speed,并发调用;而对 ernie-speed 输出的总结结果,再做二次总结时,则单独调用一次 qwen-plus。
def process_cluster(self, cluster_rows, features):
# 提取 tfidf 特征值,构建 numpy 数组
feature_values = np.array([[float(row[feature]) for feature in features] for row in cluster_rows])
# 使用 sklearn 中的方法构建相似性矩阵
S = cosine_similarity(feature_values)
# 应用 DPP 采样
sampled_indices = self.dpp_sample(S, int(self.k))
sampled_rows = [cluster_rows[i] for i in sampled_indices]
# 发送采样日志,由大模型生成摘要
content_parts = [
"你是 IT 运维和网络安全专家,请总结下面这段日志内容,输出尽量简短、非结构化、保留关键信息:"
] + [row['raw_message'] for row in sampled_rows]
content = "\n".join(content_parts)
summary = self.llm_summarize(content, 'ernie')
return summary
def execute(self, meta, table):
# 获取SPL输入的数据表中,有哪些 tfidf 特征向量字段
features = [field for field in table.fields if 'tfidf' in field]
finished = meta.get("finished", False)
if finished:
# 准备输出给SPL后续处理的数据表
table = Table()
table.fields = ['cluster', 'summary']
# cluster分组内数据,并发应用DPP采样、发给LLM总结
with ThreadPoolExecutor(max_workers=5) as executor: # 可以调整线程数
# 构建并发任务
future_to_cluster = {
executor.submit(self.process_cluster, cluster_rows, features): cluster_id
for cluster_id, cluster_rows in clusters.items()
}
# 收集结果,追加到准备好的SPL数据表里
for future in concurrent.futures.as_completed(future_to_cluster):
cluster_id = future_to_cluster[future]
try:
summary = future.result()
table.add_row({'cluster': cluster_id, 'summary': summary})
except Exception as exc:
logger.error(f'Cluster {cluster_id} generated an exception: {exc}')
# 不要求分组输出,二次总结
if not self.by:
total_table = Table()
total_table.fields = ['log_summary']
total_content_parts = [
"你是 IT 运维和网络安全专家,下面是日志聚类后的关键信息摘要,请通盘考虑,输出中文总结和分析建议:"
] + [row['summary'] for row in table.get_rows()]
# 聚类总结内容已经是多行文本了,不能简单的用换行来合并 prompt,必须用明确分割符来指明每段文本
total_content = "\n\n## 聚类摘要\n\n".join(total_content_parts)
total_summary = self.llm_summarize(total_content, 'qwen')
if total_summary is None:
logger.info("无法生成全局总结,请检查聚类总结内容。")
total_table.add_row({'log_summary': total_summary})
return meta, total_table
else:
return meta, table
注:qwen 的英文输出偏好微调一直被广泛吐槽。实验中发现哪怕 ernie 给出了中文总结,但因为包含一定比例的日志英文参数,qwen 就会输出全英文的总结!所以一定要在给 qwen 的 prompt 里明确写“输出中文”!
好了,方案就介绍到这里。我们可以看到,借助日志易 SPL 已有的能力,实现一套海量日志的 AI 总结,甚至连并发性能问题都考虑到位,只需要不到 200 行代码。有兴趣的读者,可以访问 GitHub,获取完整代码,部署在您自己的日志易环境上(不要忘记替换自己的大模型 API-KEY),也体验体验~