test_lineage_visualization.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. """
  2. 测试数据加工可视化 API
  3. 用于测试 /api/dataservice/products/23/lineage-visualization 接口
  4. 通过图谱和数据库表查询 warehouse_inventory_summary 和 test_product_inventory 表中的数据
  5. """
  6. import json
  7. import os
  8. import sys
  9. from datetime import datetime
  10. # 设置为生产环境以连接远程数据库
  11. os.environ["FLASK_ENV"] = "production"
  12. # 添加项目路径
  13. sys.path.insert(0, ".")
  14. from sqlalchemy import text
  15. from app import create_app, db
  16. from app.models.data_product import DataProduct
  17. def get_product_info(app, product_id: int):
  18. """获取数据产品基本信息"""
  19. with app.app_context():
  20. product = DataProduct.query.get(product_id)
  21. if product:
  22. print(f"\n{'=' * 60}")
  23. print(f"数据产品信息 (ID: {product_id})")
  24. print(f"{'=' * 60}")
  25. print(f" 产品名称: {product.product_name}")
  26. print(f" 英文名称: {product.product_name_en}")
  27. print(f" 目标表名: {product.target_table}")
  28. print(f" 目标Schema: {product.target_schema}")
  29. print(f" 关联DataFlow ID: {product.source_dataflow_id}")
  30. print(f" 记录数: {product.record_count}")
  31. print(f" 列数: {product.column_count}")
  32. return product.to_dict()
  33. else:
  34. print(f"数据产品 ID={product_id} 不存在")
  35. return None
  36. def get_sample_data_from_table(app, schema: str, table_name: str, limit: int = 1):
  37. """从指定表获取样例数据"""
  38. with app.app_context():
  39. try:
  40. # 检查表是否存在
  41. check_sql = text("""
  42. SELECT EXISTS (
  43. SELECT FROM information_schema.tables
  44. WHERE table_schema = :schema
  45. AND table_name = :table
  46. )
  47. """)
  48. exists = db.session.execute(
  49. check_sql, {"schema": schema, "table": table_name}
  50. ).scalar()
  51. if not exists:
  52. print(f"表 {schema}.{table_name} 不存在")
  53. return None
  54. # 获取表的列信息
  55. columns_sql = text("""
  56. SELECT column_name, data_type
  57. FROM information_schema.columns
  58. WHERE table_schema = :schema AND table_name = :table
  59. ORDER BY ordinal_position
  60. """)
  61. columns = db.session.execute(
  62. columns_sql, {"schema": schema, "table": table_name}
  63. ).fetchall()
  64. print(f"\n{'=' * 60}")
  65. print(f"表结构: {schema}.{table_name}")
  66. print(f"{'=' * 60}")
  67. for col_name, col_type in columns:
  68. print(f" {col_name}: {col_type}")
  69. # 获取样例数据
  70. query_sql = text(f'SELECT * FROM "{schema}"."{table_name}" LIMIT :limit')
  71. result = db.session.execute(query_sql, {"limit": limit})
  72. rows = result.fetchall()
  73. column_names = list(result.keys())
  74. if rows:
  75. sample_data = dict(zip(column_names, rows[0]))
  76. print("\n样例数据:")
  77. print(f"{'-' * 60}")
  78. for key, value in sample_data.items():
  79. print(f" {key}: {value}")
  80. return sample_data
  81. else:
  82. print(f"表 {schema}.{table_name} 中没有数据")
  83. return None
  84. except Exception as e:
  85. print(f"查询表 {schema}.{table_name} 失败: {str(e)}")
  86. return None
  87. def query_table_data(app, schema: str, table_name: str):
  88. """查询指定表的所有数据"""
  89. with app.app_context():
  90. try:
  91. check_sql = text("""
  92. SELECT EXISTS (
  93. SELECT FROM information_schema.tables
  94. WHERE table_schema = :schema
  95. AND table_name = :table
  96. )
  97. """)
  98. exists = db.session.execute(
  99. check_sql, {"schema": schema, "table": table_name}
  100. ).scalar()
  101. if not exists:
  102. print(f"表 {schema}.{table_name} 不存在")
  103. return None
  104. # 获取数据总数
  105. count_sql = text(f'SELECT COUNT(*) FROM "{schema}"."{table_name}"')
  106. count = db.session.execute(count_sql).scalar()
  107. # 获取样例数据
  108. query_sql = text(f'SELECT * FROM "{schema}"."{table_name}" LIMIT 5')
  109. result = db.session.execute(query_sql)
  110. rows = result.fetchall()
  111. column_names = list(result.keys())
  112. print(f"\n{'=' * 60}")
  113. print(f"表数据: {schema}.{table_name} (总计 {count} 条记录)")
  114. print(f"{'=' * 60}")
  115. if rows:
  116. # 打印列名
  117. print(f"列名: {column_names}")
  118. print(f"{'-' * 60}")
  119. for i, row in enumerate(rows):
  120. print(f"[{i + 1}] {dict(zip(column_names, row))}")
  121. return [dict(zip(column_names, row)) for row in rows]
  122. else:
  123. print("表中没有数据")
  124. return []
  125. except Exception as e:
  126. print(f"查询表 {schema}.{table_name} 失败: {str(e)}")
  127. return None
  128. def test_lineage_visualization_api(app, product_id: int, sample_data: dict):
  129. """测试血缘可视化 API(直接调用服务层)"""
  130. from app.core.data_service.data_product_service import DataProductService
  131. with app.app_context():
  132. print(f"\n{'=' * 60}")
  133. print(f"测试血缘可视化: product_id={product_id}")
  134. print(f"{'=' * 60}")
  135. print("请求体 sample_data:")
  136. print(json.dumps(sample_data, ensure_ascii=False, indent=2, default=str))
  137. try:
  138. result = DataProductService.get_data_lineage_visualization(
  139. product_id=product_id,
  140. sample_data=sample_data,
  141. )
  142. print("\n响应内容:")
  143. print(json.dumps(result, ensure_ascii=False, indent=2, default=str))
  144. # 打印节点详情
  145. if result.get("nodes"):
  146. print(f"\n{'=' * 60}")
  147. print(f"节点详情 (共 {len(result['nodes'])} 个)")
  148. print(f"{'=' * 60}")
  149. for node in result["nodes"]:
  150. print(f"\n 节点 ID: {node.get('id')}")
  151. print(f" 中文名: {node.get('name_zh')}")
  152. print(f" 英文名: {node.get('name_en')}")
  153. print(f" 类型: {node.get('node_type')}")
  154. print(f" 标签: {node.get('labels')}")
  155. print(f" 是目标节点: {node.get('is_target')}")
  156. print(f" 是源节点: {node.get('is_source')}")
  157. if node.get("matched_fields"):
  158. print(f" 匹配字段数: {len(node['matched_fields'])}")
  159. for field in node["matched_fields"]:
  160. print(
  161. f" - {field.get('name_zh')} ({field.get('name_en')}): {field.get('value')}"
  162. )
  163. if node.get("matched_data"):
  164. print(" 匹配数据:")
  165. for data in node["matched_data"][:2]: # 只显示前2条
  166. print(f" {data}")
  167. # 打印关系详情
  168. if result.get("lines"):
  169. print(f"\n{'=' * 60}")
  170. print(f"关系详情 (共 {len(result['lines'])} 条)")
  171. print(f"{'=' * 60}")
  172. for line in result["lines"]:
  173. rel_type = line.get("text") or line.get("type", "UNKNOWN")
  174. print(f" {line['from']} --[{rel_type}]--> {line['to']}")
  175. return result
  176. except Exception as e:
  177. import traceback
  178. print(f"调用失败: {str(e)}")
  179. traceback.print_exc()
  180. return None
  181. def query_neo4j_business_domain(app, table_name: str):
  182. """查询 Neo4j 中与表名对应的 BusinessDomain 节点"""
  183. with app.app_context():
  184. from app.services.neo4j_driver import neo4j_driver
  185. print(f"\n{'=' * 60}")
  186. print(f"查询 Neo4j BusinessDomain: {table_name}")
  187. print(f"{'=' * 60}")
  188. with neo4j_driver.get_session() as session:
  189. # 通过表名查找 BusinessDomain
  190. query = """
  191. MATCH (bd:BusinessDomain)
  192. WHERE bd.name_en = $table_name OR bd.name = $table_name
  193. RETURN id(bd) as bd_id, bd.name_zh as name_zh, bd.name_en as name_en,
  194. labels(bd) as labels, bd.description as description
  195. """
  196. result = session.run(query, {"table_name": table_name}).data()
  197. if result:
  198. for bd in result:
  199. print(f" ID: {bd['bd_id']}")
  200. print(f" 中文名: {bd['name_zh']}")
  201. print(f" 英文名: {bd['name_en']}")
  202. print(f" 标签: {bd['labels']}")
  203. print(f" 描述: {bd.get('description', 'N/A')}")
  204. print("-" * 40)
  205. return result
  206. else:
  207. print(f"未找到与 {table_name} 对应的 BusinessDomain 节点")
  208. # 尝试模糊匹配
  209. fuzzy_query = """
  210. MATCH (bd:BusinessDomain)
  211. WHERE bd.name_en CONTAINS $keyword OR bd.name CONTAINS $keyword
  212. OR bd.name_zh CONTAINS $keyword
  213. RETURN id(bd) as bd_id, bd.name_zh as name_zh, bd.name_en as name_en,
  214. labels(bd) as labels
  215. LIMIT 10
  216. """
  217. fuzzy_result = session.run(
  218. fuzzy_query, {"keyword": table_name.split("_")[0]}
  219. ).data()
  220. if fuzzy_result:
  221. print("\n可能相关的 BusinessDomain 节点:")
  222. for bd in fuzzy_result:
  223. print(
  224. f" - {bd['name_zh']} ({bd['name_en']}) [ID: {bd['bd_id']}]"
  225. )
  226. return None
  227. def main():
  228. print(f"\n{'#' * 60}")
  229. print("# 数据加工可视化 API 测试脚本")
  230. print(f"# 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  231. print(f"{'#' * 60}")
  232. # 创建应用
  233. app = create_app()
  234. product_id = 23
  235. # 1. 获取数据产品信息
  236. product_info = get_product_info(app, product_id)
  237. if not product_info:
  238. print("无法获取数据产品信息,退出测试")
  239. return
  240. target_table = product_info.get("target_table")
  241. target_schema = product_info.get("target_schema", "public")
  242. # 2. 查询目标表数据
  243. print(f"\n\n{'#' * 60}")
  244. print("# 查询目标表数据")
  245. print(f"{'#' * 60}")
  246. sample_data = get_sample_data_from_table(app, target_schema, target_table)
  247. # 3. 查询用户提到的两个表(在 dags schema 中)
  248. print(f"\n\n{'#' * 60}")
  249. print("# 查询 warehouse_inventory_summary 表 (dags schema)")
  250. print(f"{'#' * 60}")
  251. query_table_data(app, "dags", "warehouse_inventory_summary")
  252. print(f"\n\n{'#' * 60}")
  253. print("# 查询 test_product_inventory 表 (dags schema)")
  254. print(f"{'#' * 60}")
  255. query_table_data(app, "dags", "test_product_inventory")
  256. # 4. 查询 Neo4j 中的相关节点
  257. print(f"\n\n{'#' * 60}")
  258. print("# 查询 Neo4j 图谱节点")
  259. print(f"{'#' * 60}")
  260. query_neo4j_business_domain(app, target_table)
  261. query_neo4j_business_domain(app, "warehouse_inventory_summary")
  262. query_neo4j_business_domain(app, "test_product_inventory")
  263. # 5. 测试 API
  264. if sample_data:
  265. print(f"\n\n{'#' * 60}")
  266. print("# 调用血缘可视化 API")
  267. print(f"{'#' * 60}")
  268. test_lineage_visualization_api(app, product_id, sample_data)
  269. else:
  270. print("\n没有样例数据,跳过 API 测试")
  271. # 尝试使用手动构造的测试数据
  272. print("\n尝试使用手动构造的测试数据...")
  273. manual_sample_data = {
  274. "产品名称": "测试产品",
  275. "仓库名称": "测试仓库",
  276. "库存数量": 100,
  277. }
  278. test_lineage_visualization_api(app, product_id, manual_sample_data)
  279. if __name__ == "__main__":
  280. main()