metadata_only_generator.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533
  1. """
  2. 元数据生成器 - 仅生成metadata.txt和db_query_decision_prompt.txt
  3. 不生成Question-SQL对,只提取主题并生成元数据文件
  4. """
  5. import argparse
  6. import asyncio
  7. import sys
  8. import os
  9. from pathlib import Path
  10. from typing import List, Dict, Any
  11. from datetime import datetime
  12. from data_pipeline.analyzers import MDFileAnalyzer, ThemeExtractor
  13. from data_pipeline.validators import FileCountValidator
  14. from data_pipeline.utils.logger import setup_logging
  15. from core.vanna_llm_factory import create_vanna_instance
  16. class MetadataOnlyGenerator:
  17. """仅生成元数据文件的生成器"""
  18. def __init__(self,
  19. output_dir: str,
  20. table_list_file: str,
  21. business_context: str,
  22. db_name: str = None):
  23. """
  24. 初始化元数据生成器
  25. Args:
  26. output_dir: 输出目录(包含DDL和MD文件)
  27. table_list_file: 表清单文件路径
  28. business_context: 业务上下文
  29. db_name: 数据库名称
  30. """
  31. self.output_dir = Path(output_dir)
  32. self.table_list_file = table_list_file
  33. self.business_context = business_context
  34. self.db_name = db_name or "db"
  35. # 初始化组件
  36. self.validator = FileCountValidator()
  37. self.md_analyzer = MDFileAnalyzer(output_dir)
  38. self.vn = None
  39. self.theme_extractor = None
  40. print(f"🎯 元数据生成器初始化完成")
  41. print(f"📁 输出目录: {output_dir}")
  42. print(f"🏢 业务背景: {business_context}")
  43. print(f"💾 数据库: {self.db_name}")
  44. async def generate_metadata_only(self) -> Dict[str, Any]:
  45. """
  46. 仅生成元数据文件
  47. Returns:
  48. 生成结果报告
  49. """
  50. try:
  51. print("🚀 开始生成元数据文件...")
  52. # 1. 验证文件数量
  53. print("📋 验证文件数量...")
  54. validation_result = self.validator.validate(self.table_list_file, str(self.output_dir))
  55. if not validation_result.is_valid:
  56. print(f"❌ 文件验证失败: {validation_result.error}")
  57. if validation_result.missing_ddl:
  58. print(f"缺失DDL文件: {validation_result.missing_ddl}")
  59. if validation_result.missing_md:
  60. print(f"缺失MD文件: {validation_result.missing_md}")
  61. raise ValueError(f"文件验证失败: {validation_result.error}")
  62. print(f"✅ 文件验证通过: {validation_result.table_count}个表")
  63. # 2. 读取所有MD文件内容
  64. print("📖 读取MD文件...")
  65. md_contents = await self.md_analyzer.read_all_md_files()
  66. # 3. 初始化LLM相关组件
  67. self._initialize_llm_components()
  68. # 4. 提取分析主题
  69. print("🎯 提取分析主题...")
  70. themes = await self.theme_extractor.extract_themes(md_contents)
  71. print(f"✅ 成功提取 {len(themes)} 个分析主题")
  72. for i, theme in enumerate(themes):
  73. topic_name = theme.get('topic_name', theme.get('name', ''))
  74. description = theme.get('description', '')
  75. print(f" {i+1}. {topic_name}: {description}")
  76. # 5. 生成metadata.txt文件
  77. print("📝 生成metadata.txt...")
  78. metadata_file = await self._generate_metadata_file(themes)
  79. # 6. 生成metadata_detail.md文件
  80. print("📝 生成metadata_detail.md...")
  81. metadata_md_file = await self._generate_metadata_md_file(themes)
  82. # 7. 生成db_query_decision_prompt.txt文件
  83. print("📝 生成db_query_decision_prompt.txt...")
  84. decision_prompt_file = await self._generate_decision_prompt_file(themes, md_contents)
  85. # 8. 生成报告
  86. report = {
  87. 'success': True,
  88. 'total_themes': len(themes),
  89. 'metadata_file': str(metadata_file) if metadata_file else None,
  90. 'metadata_md_file': str(metadata_md_file) if metadata_md_file else None,
  91. 'decision_prompt_file': str(decision_prompt_file) if decision_prompt_file else None,
  92. 'themes': themes
  93. }
  94. self._print_summary(report)
  95. return report
  96. except Exception as e:
  97. print(f"❌ 元数据生成失败: {e}")
  98. raise
  99. def _initialize_llm_components(self):
  100. """初始化LLM相关组件"""
  101. if not self.vn:
  102. print("🤖 初始化LLM组件...")
  103. self.vn = create_vanna_instance()
  104. self.theme_extractor = ThemeExtractor(self.vn, self.business_context)
  105. async def _generate_metadata_file(self, themes: List[Dict]):
  106. """生成metadata.txt文件,包含INSERT语句"""
  107. metadata_file = self.output_dir / "metadata.txt"
  108. try:
  109. with open(metadata_file, 'w', encoding='utf-8') as f:
  110. f.write("-- Schema Tools生成的主题元数据\n")
  111. f.write(f"-- 业务背景: {self.business_context}\n")
  112. f.write(f"-- 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
  113. f.write(f"-- 数据库: {self.db_name}\n\n")
  114. f.write("-- 创建表(如果不存在)\n")
  115. f.write("CREATE TABLE IF NOT EXISTS metadata (\n")
  116. f.write(" id SERIAL PRIMARY KEY, -- 主键\n")
  117. f.write(" topic_name VARCHAR(100) NOT NULL, -- 业务主题名称\n")
  118. f.write(" description TEXT, -- 业务主体说明\n")
  119. f.write(" related_tables TEXT[],\t\t\t -- 相关表名\n")
  120. f.write(" biz_entities TEXT[], -- 主要业务实体名称\n")
  121. f.write(" biz_metrics TEXT[], -- 主要业务指标名称\n")
  122. f.write(" created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP -- 插入时间\n")
  123. f.write(");\n\n")
  124. f.write("-- 插入主题数据\n")
  125. for theme in themes:
  126. # 获取字段值,使用新格式
  127. topic_name = theme.get('topic_name', theme.get('name', ''))
  128. description = theme.get('description', '')
  129. # 处理related_tables
  130. related_tables = theme.get('related_tables', [])
  131. if isinstance(related_tables, list):
  132. tables_str = ','.join(related_tables)
  133. else:
  134. tables_str = ''
  135. # 处理biz_entities
  136. biz_entities = theme.get('biz_entities', [])
  137. if isinstance(biz_entities, list):
  138. entities_str = ','.join(biz_entities)
  139. else:
  140. entities_str = ''
  141. # 处理biz_metrics
  142. biz_metrics = theme.get('biz_metrics', [])
  143. if isinstance(biz_metrics, list):
  144. metrics_str = ','.join(biz_metrics)
  145. else:
  146. metrics_str = ''
  147. # 生成INSERT语句
  148. f.write("INSERT INTO metadata(topic_name, description, related_tables, biz_entities, biz_metrics) VALUES\n")
  149. f.write("(\n")
  150. f.write(f" '{self._escape_sql_string(topic_name)}',\n")
  151. f.write(f" '{self._escape_sql_string(description)}',\n")
  152. f.write(f" '{tables_str}',\n")
  153. f.write(f" '{entities_str}',\n")
  154. f.write(f" '{metrics_str}'\n")
  155. f.write(");\n\n")
  156. print(f"✅ metadata.txt文件已生成: {metadata_file}")
  157. return metadata_file
  158. except Exception as e:
  159. print(f"❌ 生成metadata.txt文件失败: {e}")
  160. return None
  161. async def _generate_metadata_md_file(self, themes: List[Dict]):
  162. """生成metadata_detail.md文件"""
  163. metadata_md_file = self.output_dir / "metadata_detail.md"
  164. try:
  165. # 从themes中收集示例数据
  166. sample_tables = set()
  167. sample_entities = set()
  168. sample_metrics = set()
  169. for theme in themes:
  170. related_tables = theme.get('related_tables', [])
  171. if isinstance(related_tables, list):
  172. sample_tables.update(related_tables[:2]) # 取前2个表作为示例
  173. biz_entities = theme.get('biz_entities', [])
  174. if isinstance(biz_entities, list):
  175. sample_entities.update(biz_entities[:3]) # 取前3个实体作为示例
  176. biz_metrics = theme.get('biz_metrics', [])
  177. if isinstance(biz_metrics, list):
  178. sample_metrics.update(biz_metrics[:3]) # 取前3个指标作为示例
  179. # 转换为字符串格式,避免硬编码特定行业内容
  180. tables_example = ', '.join(list(sample_tables)[:2]) if sample_tables else '数据表1, 数据表2'
  181. entities_example = ', '.join(list(sample_entities)[:3]) if sample_entities else '业务实体1, 业务实体2, 业务实体3'
  182. metrics_example = ', '.join(list(sample_metrics)[:3]) if sample_metrics else '业务指标1, 业务指标2, 业务指标3'
  183. with open(metadata_md_file, 'w', encoding='utf-8') as f:
  184. f.write("## metadata(存储分析主题元数据)\n\n")
  185. f.write("`metadata` 主要描述了当前数据库包含了哪些数据内容,哪些分析主题,哪些指标等等。\n\n")
  186. f.write("字段列表:\n\n")
  187. f.write("- `id` (serial) - 主键ID [主键, 非空]\n")
  188. f.write("- `topic_name` (varchar(100)) - 业务主题名称 [非空]\n")
  189. f.write("- `description` (text) - 业务主题说明\n")
  190. f.write(f"- `related_tables` (text[]) - 涉及的数据表 [示例: {tables_example}]\n")
  191. f.write(f"- `biz_entities` (text[]) - 主要业务实体名称 [示例: {entities_example}]\n")
  192. f.write(f"- `biz_metrics` (text[]) - 主要业务指标名称 [示例: {metrics_example}]\n")
  193. f.write("- `created_at` (timestamp) - 插入时间 [默认值: `CURRENT_TIMESTAMP`]\n\n")
  194. f.write("字段补充说明:\n\n")
  195. f.write("- `id` 为主键,自增;\n")
  196. f.write("- `related_tables` 用于建立主题与具体明细表的依赖关系;\n")
  197. f.write("- `biz_entities` 表示主题关注的核心对象,例如服务区、车辆、公司;\n")
  198. f.write("- `biz_metrics` 表示该主题关注的业务分析指标,例如营收对比、趋势变化、占比结构等。\n")
  199. print(f"✅ metadata_detail.md文件已生成: {metadata_md_file}")
  200. return metadata_md_file
  201. except Exception as e:
  202. print(f"❌ 生成metadata_detail.md文件失败: {e}")
  203. return None
  204. async def _generate_decision_prompt_file(self, themes: List[Dict], md_contents: str):
  205. """生成db_query_decision_prompt.txt文件"""
  206. decision_prompt_file = self.output_dir / "db_query_decision_prompt.txt"
  207. try:
  208. # 使用LLM动态生成决策提示内容
  209. decision_content = await self._generate_decision_prompt_with_llm(themes, md_contents)
  210. # 写入文件
  211. with open(decision_prompt_file, 'w', encoding='utf-8') as f:
  212. f.write(decision_content)
  213. print(f"✅ db_query_decision_prompt.txt文件已生成: {decision_prompt_file}")
  214. return decision_prompt_file
  215. except Exception as e:
  216. print(f"❌ 生成db_query_decision_prompt.txt文件失败: {e}")
  217. # 如果LLM调用失败,使用回退方案
  218. try:
  219. fallback_content = await self._generate_fallback_decision_content(themes)
  220. with open(decision_prompt_file, 'w', encoding='utf-8') as f:
  221. f.write(fallback_content)
  222. print(f"⚠️ 使用回退方案生成了 {decision_prompt_file}")
  223. return decision_prompt_file
  224. except Exception as fallback_error:
  225. print(f"❌ 回退方案也失败: {fallback_error}")
  226. return None
  227. async def _generate_decision_prompt_with_llm(self, themes: List[Dict], md_contents: str) -> str:
  228. """使用LLM动态生成db_query_decision_prompt.txt的完整内容(基于纯表结构分析)"""
  229. try:
  230. # 构建LLM提示词 - 让LLM完全独立分析表结构
  231. prompt = f"""你是一位资深的数据分析师,请直接分析以下数据库的表结构,独立判断业务范围和数据范围。
  232. 业务背景:{self.business_context}
  233. 数据库表结构详细信息:
  234. {md_contents}
  235. 分析任务:
  236. 请你直接从表结构、字段名称、字段类型、示例数据中推断业务逻辑,不要参考任何预设的业务主题。
  237. 分析要求:
  238. 1. **业务范围**:根据表名和核心业务字段,用一句话概括这个数据库支撑的业务领域
  239. 2. **数据范围**:根据具体的数据字段(如金额、数量、类型等),用一句话概括涉及的数据类型和范围
  240. 3. **核心业务实体**:从非技术字段中识别主要的业务对象(如表中的维度字段)
  241. 4. **关键业务指标**:从数值型字段和枚举字段中识别可以进行分析的指标
  242. 技术字段过滤规则(请忽略以下字段):
  243. - 主键字段:id、主键ID等
  244. - 时间戳字段:create_ts、update_ts、delete_ts、created_at、updated_at等
  245. - 版本字段:version、版本号等
  246. - 操作人字段:created_by、updated_by、deleted_by等
  247. 请直接生成以下格式的业务分析报告(请严格按照格式,不要添加额外内容):
  248. === 数据库业务范围 ===
  249. 当前数据库存储的是[业务描述]的相关数据,主要涉及[数据范围],包含以下业务数据:
  250. 核心业务实体:
  251. - 实体类型1:详细描述(基于实际字段和业务场景),主要字段:字段1、字段2、字段3
  252. - 实体类型2:详细描述,主要字段:字段1、字段2、字段3
  253. 关键业务指标:
  254. - 指标类型1:详细描述(基于实际数值字段和分析需求)
  255. - 指标类型2:详细描述
  256. 要求:
  257. 1. 请完全基于表结构进行独立分析,从字段的业务含义出发,准确反映数据库的实际业务范围
  258. 2. 严格按照上述格式输出,不要添加分析依据、总结或其他额外内容
  259. 3. 输出内容到"指标类型2:详细描述"结束即可"""
  260. # 调用LLM生成内容
  261. response = await asyncio.to_thread(
  262. self.vn.chat_with_llm,
  263. question=prompt,
  264. system_prompt="你是一个专业的数据分析师,擅长从业务角度总结数据库的业务范围和核心实体。请基于实际的表结构和字段信息生成准确的业务描述。"
  265. )
  266. return response.strip()
  267. except Exception as e:
  268. print(f"❌ LLM生成决策提示内容失败: {e}")
  269. # 回退方案:生成基础内容
  270. return await self._generate_fallback_decision_content(themes)
  271. async def _generate_fallback_decision_content(self, themes: List[Dict]) -> str:
  272. """生成回退的决策提示内容(尝试用简化LLM调用)"""
  273. content = f"=== 数据库业务范围 ===\n"
  274. # 尝试用简化的LLM调用获取数据范围
  275. try:
  276. # 构建简化的提示词
  277. entities_sample = []
  278. metrics_sample = []
  279. for theme in themes[:3]: # 只取前3个主题作为示例
  280. biz_entities = theme.get('biz_entities', [])
  281. if isinstance(biz_entities, list):
  282. entities_sample.extend(biz_entities[:2])
  283. biz_metrics = theme.get('biz_metrics', [])
  284. if isinstance(biz_metrics, list):
  285. metrics_sample.extend(biz_metrics[:2])
  286. # 简化的提示词
  287. simple_prompt = f"""基于以下信息,用一句话概括{self.business_context}涉及的数据范围:
  288. 业务实体示例:{', '.join(entities_sample[:5])}
  289. 业务指标示例:{', '.join(metrics_sample[:5])}
  290. 请只回答数据范围,格式如:某某数据、某某信息、某某统计等"""
  291. data_range = await asyncio.to_thread(
  292. self.vn.chat_with_llm,
  293. question=simple_prompt,
  294. system_prompt="请用简洁的语言概括数据范围。"
  295. )
  296. data_range = data_range.strip()
  297. # 如果LLM返回内容合理,则使用
  298. if data_range and len(data_range) < 100:
  299. content += f"当前数据库存储的是{self.business_context}的相关数据,主要涉及{data_range},包含以下业务数据:\n"
  300. else:
  301. raise Exception("LLM返回内容不合理")
  302. except Exception as e:
  303. print(f"⚠️ 简化LLM调用也失败,使用完全兜底方案: {e}")
  304. # 真正的最后兜底
  305. content += f"当前数据库存储的是{self.business_context}的相关数据,主要涉及相关业务数据,包含以下业务数据:\n"
  306. content += "核心业务实体:\n"
  307. # 收集所有实体
  308. all_entities = set()
  309. for theme in themes:
  310. biz_entities = theme.get('biz_entities', [])
  311. if isinstance(biz_entities, list):
  312. all_entities.update(biz_entities)
  313. for entity in list(all_entities)[:8]:
  314. content += f"- {entity}:{entity}相关的业务信息\n"
  315. content += "关键业务指标:\n"
  316. # 收集所有指标
  317. all_metrics = set()
  318. for theme in themes:
  319. biz_metrics = theme.get('biz_metrics', [])
  320. if isinstance(biz_metrics, list):
  321. all_metrics.update(biz_metrics)
  322. for metric in list(all_metrics)[:8]:
  323. content += f"- {metric}:{metric}相关的分析指标\n"
  324. return content
  325. def _escape_sql_string(self, value: str) -> str:
  326. """转义SQL字符串中的特殊字符"""
  327. if not value:
  328. return ""
  329. # 转义单引号
  330. return value.replace("'", "''")
  331. def _print_summary(self, report: Dict):
  332. """打印总结信息"""
  333. print("=" * 60)
  334. print("📊 元数据生成总结")
  335. print(f" ✅ 分析主题数: {report['total_themes']}")
  336. print(f" 📄 metadata.txt: {'✅ 已生成' if report['metadata_file'] else '❌ 生成失败'}")
  337. print(f" 📄 metadata_detail.md: {'✅ 已生成' if report['metadata_md_file'] else '❌ 生成失败'}")
  338. print(f" 📄 db_query_decision_prompt.txt: {'✅ 已生成' if report['decision_prompt_file'] else '❌ 生成失败'}")
  339. print("=" * 60)
  340. def setup_argument_parser():
  341. """设置命令行参数解析器"""
  342. parser = argparse.ArgumentParser(
  343. description='元数据生成器 - 仅生成metadata.txt和db_query_decision_prompt.txt',
  344. formatter_class=argparse.RawDescriptionHelpFormatter,
  345. epilog="""
  346. 示例用法:
  347. # 基本使用
  348. python -m data_pipeline.metadata_only_generator --output-dir ./data_pipeline/training_data --table-list ./data_pipeline/tables.txt --business-context "高速公路服务区管理系统"
  349. # 指定数据库名称
  350. python -m data_pipeline.metadata_only_generator --output-dir ./data_pipeline/training_data --table-list ./data_pipeline/tables.txt --business-context "电商系统" --db-name ecommerce_db
  351. # 启用详细日志
  352. python -m data_pipeline.metadata_only_generator --output-dir ./data_pipeline/training_data --table-list ./data_pipeline/tables.txt --business-context "管理系统" --verbose
  353. """
  354. )
  355. # 必需参数
  356. parser.add_argument(
  357. '--output-dir',
  358. required=True,
  359. help='包含DDL和MD文件的输出目录'
  360. )
  361. parser.add_argument(
  362. '--table-list',
  363. required=True,
  364. help='表清单文件路径(用于验证文件数量)'
  365. )
  366. parser.add_argument(
  367. '--business-context',
  368. required=True,
  369. help='业务上下文描述'
  370. )
  371. # 可选参数
  372. parser.add_argument(
  373. '--db-name',
  374. help='数据库名称(用于输出文件命名)'
  375. )
  376. parser.add_argument(
  377. '--verbose', '-v',
  378. action='store_true',
  379. help='启用详细日志输出'
  380. )
  381. parser.add_argument(
  382. '--log-file',
  383. help='日志文件路径'
  384. )
  385. return parser
  386. async def main():
  387. """主入口函数"""
  388. parser = setup_argument_parser()
  389. args = parser.parse_args()
  390. # 设置日志
  391. setup_logging(
  392. verbose=args.verbose,
  393. log_file=args.log_file
  394. )
  395. # 验证参数
  396. output_path = Path(args.output_dir)
  397. if not output_path.exists():
  398. print(f"错误: 输出目录不存在: {args.output_dir}")
  399. sys.exit(1)
  400. if not os.path.exists(args.table_list):
  401. print(f"错误: 表清单文件不存在: {args.table_list}")
  402. sys.exit(1)
  403. try:
  404. # 创建生成器
  405. generator = MetadataOnlyGenerator(
  406. output_dir=args.output_dir,
  407. table_list_file=args.table_list,
  408. business_context=args.business_context,
  409. db_name=args.db_name
  410. )
  411. # 执行生成
  412. report = await generator.generate_metadata_only()
  413. # 输出结果
  414. if report['success']:
  415. print("\n🎉 元数据文件生成成功!")
  416. exit_code = 0
  417. else:
  418. print("\n❌ 元数据文件生成失败")
  419. exit_code = 1
  420. sys.exit(exit_code)
  421. except KeyboardInterrupt:
  422. print("\n\n⏹️ 用户中断,程序退出")
  423. sys.exit(130)
  424. except Exception as e:
  425. print(f"\n❌ 程序执行失败: {e}")
  426. if args.verbose:
  427. import traceback
  428. traceback.print_exc()
  429. sys.exit(1)
  430. if __name__ == "__main__":
  431. asyncio.run(main())