test_data_lineage_visualization.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. #!/usr/bin/env python3
  2. """
  3. 数据血缘可视化功能测试脚本
  4. 此脚本用于:
  5. 1. 在 Neo4j 中创建模拟的血缘关系数据
  6. 2. 在 PostgreSQL 中创建对应的数据产品记录
  7. 3. 测试血缘可视化 API 功能
  8. """
  9. from __future__ import annotations
  10. import json
  11. import logging
  12. import sys
  13. from datetime import datetime
  14. # 设置日志
  15. logging.basicConfig(
  16. level=logging.INFO,
  17. format="%(asctime)s - %(levelname)s - %(message)s",
  18. )
  19. logger = logging.getLogger(__name__)
  20. def create_test_data_in_neo4j(neo4j_session) -> dict:
  21. """
  22. 在 Neo4j 中创建测试血缘数据
  23. 创建的图结构:
  24. (DataResource:用户基础数据) -[INPUT]-> (DataFlow:用户数据清洗)
  25. (DataFlow:用户数据清洗) -[OUTPUT]-> (BusinessDomain:用户画像)
  26. (BusinessDomain:用户画像) -[INPUT]-> (DataFlow:用户标签生成)
  27. (DataFlow:用户标签生成) -[OUTPUT]-> (BusinessDomain:用户标签库)
  28. Returns:
  29. dict: 包含创建的节点 ID
  30. """
  31. logger.info("开始在 Neo4j 中创建测试数据...")
  32. created_ids = {}
  33. # 1. 创建源头节点(同时具有 BusinessDomain 和 DataResource 标签)
  34. create_source_query = """
  35. MERGE (n:BusinessDomain:DataResource {name_en: 'user_base_info_test'})
  36. ON CREATE SET
  37. n.name_zh = '用户基础数据(测试)',
  38. n.describe = '测试用的用户原始数据表',
  39. n.type = 'source',
  40. n.created_at = $created_at
  41. RETURN id(n) as node_id
  42. """
  43. result = neo4j_session.run(
  44. create_source_query, {"created_at": datetime.now().isoformat()}
  45. ).single()
  46. created_ids["source_bd"] = result["node_id"]
  47. logger.info(f"创建源节点 (DataResource): ID={result['node_id']}")
  48. # 2. 创建第一个 DataFlow 节点
  49. create_df1_query = """
  50. MERGE (n:DataFlow {name_en: 'user_data_clean_test'})
  51. ON CREATE SET
  52. n.name_zh = '用户数据清洗(测试)',
  53. n.describe = '清洗用户基础数据',
  54. n.script_type = 'sql',
  55. n.status = 'active',
  56. n.created_at = $created_at
  57. RETURN id(n) as node_id
  58. """
  59. result = neo4j_session.run(
  60. create_df1_query, {"created_at": datetime.now().isoformat()}
  61. ).single()
  62. created_ids["dataflow_1"] = result["node_id"]
  63. logger.info(f"创建 DataFlow 1: ID={result['node_id']}")
  64. # 3. 创建中间 BusinessDomain 节点
  65. create_mid_bd_query = """
  66. MERGE (n:BusinessDomain {name_en: 'user_profile_test'})
  67. ON CREATE SET
  68. n.name_zh = '用户画像(测试)',
  69. n.describe = '用户画像数据',
  70. n.type = 'table',
  71. n.created_at = $created_at
  72. RETURN id(n) as node_id
  73. """
  74. result = neo4j_session.run(
  75. create_mid_bd_query, {"created_at": datetime.now().isoformat()}
  76. ).single()
  77. created_ids["mid_bd"] = result["node_id"]
  78. logger.info(f"创建中间 BusinessDomain: ID={result['node_id']}")
  79. # 4. 创建第二个 DataFlow 节点
  80. create_df2_query = """
  81. MERGE (n:DataFlow {name_en: 'user_tag_generate_test'})
  82. ON CREATE SET
  83. n.name_zh = '用户标签生成(测试)',
  84. n.describe = '生成用户标签',
  85. n.script_type = 'python',
  86. n.status = 'active',
  87. n.created_at = $created_at
  88. RETURN id(n) as node_id
  89. """
  90. result = neo4j_session.run(
  91. create_df2_query, {"created_at": datetime.now().isoformat()}
  92. ).single()
  93. created_ids["dataflow_2"] = result["node_id"]
  94. logger.info(f"创建 DataFlow 2: ID={result['node_id']}")
  95. # 5. 创建目标 BusinessDomain 节点
  96. create_target_bd_query = """
  97. MERGE (n:BusinessDomain {name_en: 'user_tag_library_test'})
  98. ON CREATE SET
  99. n.name_zh = '用户标签库(测试)',
  100. n.describe = '最终的用户标签数据产品',
  101. n.type = 'table',
  102. n.created_at = $created_at
  103. RETURN id(n) as node_id
  104. """
  105. result = neo4j_session.run(
  106. create_target_bd_query, {"created_at": datetime.now().isoformat()}
  107. ).single()
  108. created_ids["target_bd"] = result["node_id"]
  109. logger.info(f"创建目标 BusinessDomain: ID={result['node_id']}")
  110. # 6. 创建 DataMeta 节点并关联到各个 BusinessDomain
  111. meta_fields = [
  112. {"name_zh": "用户ID", "name_en": "user_id", "data_type": "integer"},
  113. {"name_zh": "姓名", "name_en": "name", "data_type": "string"},
  114. {"name_zh": "年龄", "name_en": "age", "data_type": "integer"},
  115. {"name_zh": "用户标签", "name_en": "user_tag", "data_type": "string"},
  116. {"name_zh": "画像分数", "name_en": "profile_score", "data_type": "float"},
  117. ]
  118. for field in meta_fields:
  119. create_meta_query = """
  120. MERGE (m:DataMeta {name_en: $name_en + '_test'})
  121. ON CREATE SET
  122. m.name_zh = $name_zh,
  123. m.data_type = $data_type,
  124. m.created_at = $created_at
  125. RETURN id(m) as meta_id
  126. """
  127. result = neo4j_session.run(
  128. create_meta_query,
  129. {
  130. "name_zh": field["name_zh"],
  131. "name_en": field["name_en"],
  132. "data_type": field["data_type"],
  133. "created_at": datetime.now().isoformat(),
  134. },
  135. ).single()
  136. meta_id = result["meta_id"]
  137. logger.info(f"创建 DataMeta: {field['name_zh']}, ID={meta_id}")
  138. # 将前三个字段关联到所有 BusinessDomain
  139. for bd_key in ["source_bd", "mid_bd", "target_bd"]:
  140. create_includes_query = """
  141. MATCH (bd), (m:DataMeta)
  142. WHERE id(bd) = $bd_id AND id(m) = $meta_id
  143. MERGE (bd)-[:INCLUDES]->(m)
  144. """
  145. neo4j_session.run(
  146. create_includes_query,
  147. {"bd_id": created_ids[bd_key], "meta_id": meta_id},
  148. )
  149. # 7. 创建 INPUT/OUTPUT 关系
  150. logger.info("创建血缘关系...")
  151. # source_bd -[INPUT]-> dataflow_1
  152. neo4j_session.run(
  153. """
  154. MATCH (source), (df:DataFlow)
  155. WHERE id(source) = $source_id AND id(df) = $df_id
  156. MERGE (source)-[:INPUT]->(df)
  157. """,
  158. {"source_id": created_ids["source_bd"], "df_id": created_ids["dataflow_1"]},
  159. )
  160. logger.info("创建关系: source_bd -[INPUT]-> dataflow_1")
  161. # dataflow_1 -[OUTPUT]-> mid_bd
  162. neo4j_session.run(
  163. """
  164. MATCH (df:DataFlow), (target)
  165. WHERE id(df) = $df_id AND id(target) = $target_id
  166. MERGE (df)-[:OUTPUT]->(target)
  167. """,
  168. {"df_id": created_ids["dataflow_1"], "target_id": created_ids["mid_bd"]},
  169. )
  170. logger.info("创建关系: dataflow_1 -[OUTPUT]-> mid_bd")
  171. # mid_bd -[INPUT]-> dataflow_2
  172. neo4j_session.run(
  173. """
  174. MATCH (source), (df:DataFlow)
  175. WHERE id(source) = $source_id AND id(df) = $df_id
  176. MERGE (source)-[:INPUT]->(df)
  177. """,
  178. {"source_id": created_ids["mid_bd"], "df_id": created_ids["dataflow_2"]},
  179. )
  180. logger.info("创建关系: mid_bd -[INPUT]-> dataflow_2")
  181. # dataflow_2 -[OUTPUT]-> target_bd
  182. neo4j_session.run(
  183. """
  184. MATCH (df:DataFlow), (target)
  185. WHERE id(df) = $df_id AND id(target) = $target_id
  186. MERGE (df)-[:OUTPUT]->(target)
  187. """,
  188. {"df_id": created_ids["dataflow_2"], "target_id": created_ids["target_bd"]},
  189. )
  190. logger.info("创建关系: dataflow_2 -[OUTPUT]-> target_bd")
  191. logger.info("Neo4j 测试数据创建完成")
  192. return created_ids
  193. def create_test_data_product(db_session, neo4j_ids: dict) -> int:
  194. """
  195. 在 PostgreSQL 中创建测试数据产品
  196. Args:
  197. db_session: SQLAlchemy 会话
  198. neo4j_ids: Neo4j 中创建的节点 ID
  199. Returns:
  200. int: 创建的数据产品 ID
  201. """
  202. from sqlalchemy import text
  203. logger.info("在 PostgreSQL 中创建测试数据产品...")
  204. # 检查是否已存在
  205. check_query = text("""
  206. SELECT id FROM data_products
  207. WHERE product_name_en = 'user_tag_library_test'
  208. """)
  209. result = db_session.execute(check_query).fetchone()
  210. if result:
  211. product_id = result[0]
  212. logger.info(f"测试数据产品已存在,ID={product_id}")
  213. return product_id
  214. # 创建数据产品
  215. insert_query = text("""
  216. INSERT INTO data_products (
  217. product_name, product_name_en, description,
  218. source_dataflow_id, source_dataflow_name,
  219. target_table, target_schema,
  220. record_count, column_count,
  221. status, created_by, created_at, updated_at
  222. ) VALUES (
  223. '用户标签库(测试)', 'user_tag_library_test',
  224. '测试血缘可视化功能的数据产品',
  225. :dataflow_id, '用户标签生成(测试)',
  226. 'user_tag_library_test', 'public',
  227. 1000, 5,
  228. 'active', 'test_script', NOW(), NOW()
  229. ) RETURNING id
  230. """)
  231. result = db_session.execute(
  232. insert_query,
  233. {"dataflow_id": neo4j_ids.get("dataflow_2")},
  234. )
  235. product_id = result.fetchone()[0]
  236. db_session.commit()
  237. logger.info(f"创建测试数据产品成功,ID={product_id}")
  238. return product_id
  239. def test_lineage_visualization_api(app_client, product_id: int) -> bool:
  240. """
  241. 测试血缘可视化 API
  242. Args:
  243. app_client: Flask 测试客户端
  244. product_id: 数据产品 ID
  245. Returns:
  246. bool: 测试是否成功
  247. """
  248. logger.info(f"测试血缘可视化 API,product_id={product_id}")
  249. sample_data = {
  250. "用户ID": 12345,
  251. "姓名": "张三",
  252. "年龄": 28,
  253. "用户标签": "高价值用户",
  254. "画像分数": 0.85,
  255. }
  256. response = app_client.post(
  257. f"/api/dataservice/products/{product_id}/lineage-visualization",
  258. data=json.dumps({"sample_data": sample_data}),
  259. content_type="application/json",
  260. )
  261. response_data = json.loads(response.data)
  262. logger.info(f"API 响应状态码: {response.status_code}")
  263. logger.info(
  264. f"API 响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}"
  265. )
  266. # 验证响应
  267. if response_data.get("code") == 200:
  268. data = response_data.get("data", {})
  269. nodes = data.get("nodes", [])
  270. lines = data.get("lines", [])
  271. depth = data.get("lineage_depth", 0)
  272. logger.info(f"节点数量: {len(nodes)}")
  273. logger.info(f"关系数量: {len(lines)}")
  274. logger.info(f"血缘深度: {depth}")
  275. # 验证基本结构
  276. if len(nodes) >= 2 and len(lines) >= 1:
  277. logger.info("✅ 血缘可视化 API 测试通过!")
  278. return True
  279. else:
  280. logger.warning("⚠️ 返回的节点或关系数量不足")
  281. return False
  282. else:
  283. logger.error(f"❌ API 返回错误: {response_data.get('message')}")
  284. return False
  285. def cleanup_test_data(neo4j_session, db_session) -> None:
  286. """
  287. 清理测试数据
  288. Args:
  289. neo4j_session: Neo4j 会话
  290. db_session: SQLAlchemy 会话
  291. """
  292. logger.info("清理测试数据...")
  293. # 清理 Neo4j 测试数据
  294. cleanup_neo4j_query = """
  295. MATCH (n)
  296. WHERE n.name_en ENDS WITH '_test'
  297. DETACH DELETE n
  298. """
  299. neo4j_session.run(cleanup_neo4j_query)
  300. logger.info("Neo4j 测试数据已清理")
  301. # 清理 PostgreSQL 测试数据
  302. from sqlalchemy import text
  303. cleanup_pg_query = text("""
  304. DELETE FROM data_products
  305. WHERE product_name_en = 'user_tag_library_test'
  306. """)
  307. db_session.execute(cleanup_pg_query)
  308. db_session.commit()
  309. logger.info("PostgreSQL 测试数据已清理")
  310. def main() -> int:
  311. """主函数"""
  312. logger.info("=" * 60)
  313. logger.info("开始执行数据血缘可视化功能测试")
  314. logger.info("=" * 60)
  315. # 添加项目路径
  316. import os
  317. project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  318. sys.path.insert(0, project_root)
  319. logger.info(f"项目路径: {project_root}")
  320. try:
  321. # 导入应用
  322. logger.info("正在导入应用...")
  323. from app import create_app, db
  324. logger.info("成功导入 create_app 和 db")
  325. from app.services.neo4j_driver import neo4j_driver
  326. logger.info("成功导入 neo4j_driver")
  327. logger.info("正在创建应用...")
  328. app = create_app()
  329. logger.info("应用创建成功")
  330. with app.app_context():
  331. logger.info("进入应用上下文")
  332. # 获取 Neo4j 会话
  333. with neo4j_driver.get_session() as neo4j_session:
  334. # 1. 创建 Neo4j 测试数据
  335. neo4j_ids = create_test_data_in_neo4j(neo4j_session)
  336. # 2. 创建 PostgreSQL 测试数据
  337. product_id = create_test_data_product(db.session, neo4j_ids)
  338. # 3. 测试 API
  339. with app.test_client() as client:
  340. test_result = test_lineage_visualization_api(client, product_id)
  341. # 4. 询问是否清理测试数据
  342. if "--cleanup" in sys.argv:
  343. cleanup_test_data(neo4j_session, db.session)
  344. else:
  345. logger.info("测试数据保留(使用 --cleanup 参数可清理)")
  346. if test_result:
  347. logger.info("=" * 60)
  348. logger.info("✅ 所有测试通过!")
  349. logger.info("=" * 60)
  350. return 0
  351. else:
  352. logger.error("=" * 60)
  353. logger.error("❌ 测试失败!")
  354. logger.error("=" * 60)
  355. return 1
  356. except Exception as e:
  357. logger.exception(f"测试执行失败: {str(e)}")
  358. return 1
  359. if __name__ == "__main__":
  360. sys.exit(main())