#!/usr/bin/env python3 """ 数据血缘可视化功能测试脚本 此脚本用于: 1. 在 Neo4j 中创建模拟的血缘关系数据 2. 在 PostgreSQL 中创建对应的数据产品记录 3. 测试血缘可视化 API 功能 """ from __future__ import annotations import json import logging import sys from datetime import datetime # 设置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) def create_test_data_in_neo4j(neo4j_session) -> dict: """ 在 Neo4j 中创建测试血缘数据 创建的图结构: (DataResource:用户基础数据) -[INPUT]-> (DataFlow:用户数据清洗) (DataFlow:用户数据清洗) -[OUTPUT]-> (BusinessDomain:用户画像) (BusinessDomain:用户画像) -[INPUT]-> (DataFlow:用户标签生成) (DataFlow:用户标签生成) -[OUTPUT]-> (BusinessDomain:用户标签库) Returns: dict: 包含创建的节点 ID """ logger.info("开始在 Neo4j 中创建测试数据...") created_ids = {} # 1. 创建源头节点(同时具有 BusinessDomain 和 DataResource 标签) create_source_query = """ MERGE (n:BusinessDomain:DataResource {name_en: 'user_base_info_test'}) ON CREATE SET n.name_zh = '用户基础数据(测试)', n.describe = '测试用的用户原始数据表', n.type = 'source', n.created_at = $created_at RETURN id(n) as node_id """ result = neo4j_session.run( create_source_query, {"created_at": datetime.now().isoformat()} ).single() created_ids["source_bd"] = result["node_id"] logger.info(f"创建源节点 (DataResource): ID={result['node_id']}") # 2. 创建第一个 DataFlow 节点 create_df1_query = """ MERGE (n:DataFlow {name_en: 'user_data_clean_test'}) ON CREATE SET n.name_zh = '用户数据清洗(测试)', n.describe = '清洗用户基础数据', n.script_type = 'sql', n.status = 'active', n.created_at = $created_at RETURN id(n) as node_id """ result = neo4j_session.run( create_df1_query, {"created_at": datetime.now().isoformat()} ).single() created_ids["dataflow_1"] = result["node_id"] logger.info(f"创建 DataFlow 1: ID={result['node_id']}") # 3. 创建中间 BusinessDomain 节点 create_mid_bd_query = """ MERGE (n:BusinessDomain {name_en: 'user_profile_test'}) ON CREATE SET n.name_zh = '用户画像(测试)', n.describe = '用户画像数据', n.type = 'table', n.created_at = $created_at RETURN id(n) as node_id """ result = neo4j_session.run( create_mid_bd_query, {"created_at": datetime.now().isoformat()} ).single() created_ids["mid_bd"] = result["node_id"] logger.info(f"创建中间 BusinessDomain: ID={result['node_id']}") # 4. 创建第二个 DataFlow 节点 create_df2_query = """ MERGE (n:DataFlow {name_en: 'user_tag_generate_test'}) ON CREATE SET n.name_zh = '用户标签生成(测试)', n.describe = '生成用户标签', n.script_type = 'python', n.status = 'active', n.created_at = $created_at RETURN id(n) as node_id """ result = neo4j_session.run( create_df2_query, {"created_at": datetime.now().isoformat()} ).single() created_ids["dataflow_2"] = result["node_id"] logger.info(f"创建 DataFlow 2: ID={result['node_id']}") # 5. 创建目标 BusinessDomain 节点 create_target_bd_query = """ MERGE (n:BusinessDomain {name_en: 'user_tag_library_test'}) ON CREATE SET n.name_zh = '用户标签库(测试)', n.describe = '最终的用户标签数据产品', n.type = 'table', n.created_at = $created_at RETURN id(n) as node_id """ result = neo4j_session.run( create_target_bd_query, {"created_at": datetime.now().isoformat()} ).single() created_ids["target_bd"] = result["node_id"] logger.info(f"创建目标 BusinessDomain: ID={result['node_id']}") # 6. 创建 DataMeta 节点并关联到各个 BusinessDomain meta_fields = [ {"name_zh": "用户ID", "name_en": "user_id", "data_type": "integer"}, {"name_zh": "姓名", "name_en": "name", "data_type": "string"}, {"name_zh": "年龄", "name_en": "age", "data_type": "integer"}, {"name_zh": "用户标签", "name_en": "user_tag", "data_type": "string"}, {"name_zh": "画像分数", "name_en": "profile_score", "data_type": "float"}, ] for field in meta_fields: create_meta_query = """ MERGE (m:DataMeta {name_en: $name_en + '_test'}) ON CREATE SET m.name_zh = $name_zh, m.data_type = $data_type, m.created_at = $created_at RETURN id(m) as meta_id """ result = neo4j_session.run( create_meta_query, { "name_zh": field["name_zh"], "name_en": field["name_en"], "data_type": field["data_type"], "created_at": datetime.now().isoformat(), }, ).single() meta_id = result["meta_id"] logger.info(f"创建 DataMeta: {field['name_zh']}, ID={meta_id}") # 将前三个字段关联到所有 BusinessDomain for bd_key in ["source_bd", "mid_bd", "target_bd"]: create_includes_query = """ MATCH (bd), (m:DataMeta) WHERE id(bd) = $bd_id AND id(m) = $meta_id MERGE (bd)-[:INCLUDES]->(m) """ neo4j_session.run( create_includes_query, {"bd_id": created_ids[bd_key], "meta_id": meta_id}, ) # 7. 创建 INPUT/OUTPUT 关系 logger.info("创建血缘关系...") # source_bd -[INPUT]-> dataflow_1 neo4j_session.run( """ MATCH (source), (df:DataFlow) WHERE id(source) = $source_id AND id(df) = $df_id MERGE (source)-[:INPUT]->(df) """, {"source_id": created_ids["source_bd"], "df_id": created_ids["dataflow_1"]}, ) logger.info("创建关系: source_bd -[INPUT]-> dataflow_1") # dataflow_1 -[OUTPUT]-> mid_bd neo4j_session.run( """ MATCH (df:DataFlow), (target) WHERE id(df) = $df_id AND id(target) = $target_id MERGE (df)-[:OUTPUT]->(target) """, {"df_id": created_ids["dataflow_1"], "target_id": created_ids["mid_bd"]}, ) logger.info("创建关系: dataflow_1 -[OUTPUT]-> mid_bd") # mid_bd -[INPUT]-> dataflow_2 neo4j_session.run( """ MATCH (source), (df:DataFlow) WHERE id(source) = $source_id AND id(df) = $df_id MERGE (source)-[:INPUT]->(df) """, {"source_id": created_ids["mid_bd"], "df_id": created_ids["dataflow_2"]}, ) logger.info("创建关系: mid_bd -[INPUT]-> dataflow_2") # dataflow_2 -[OUTPUT]-> target_bd neo4j_session.run( """ MATCH (df:DataFlow), (target) WHERE id(df) = $df_id AND id(target) = $target_id MERGE (df)-[:OUTPUT]->(target) """, {"df_id": created_ids["dataflow_2"], "target_id": created_ids["target_bd"]}, ) logger.info("创建关系: dataflow_2 -[OUTPUT]-> target_bd") logger.info("Neo4j 测试数据创建完成") return created_ids def create_test_data_product(db_session, neo4j_ids: dict) -> int: """ 在 PostgreSQL 中创建测试数据产品 Args: db_session: SQLAlchemy 会话 neo4j_ids: Neo4j 中创建的节点 ID Returns: int: 创建的数据产品 ID """ from sqlalchemy import text logger.info("在 PostgreSQL 中创建测试数据产品...") # 检查是否已存在 check_query = text(""" SELECT id FROM data_products WHERE product_name_en = 'user_tag_library_test' """) result = db_session.execute(check_query).fetchone() if result: product_id = result[0] logger.info(f"测试数据产品已存在,ID={product_id}") return product_id # 创建数据产品 insert_query = text(""" INSERT INTO data_products ( product_name, product_name_en, description, source_dataflow_id, source_dataflow_name, target_table, target_schema, record_count, column_count, status, created_by, created_at, updated_at ) VALUES ( '用户标签库(测试)', 'user_tag_library_test', '测试血缘可视化功能的数据产品', :dataflow_id, '用户标签生成(测试)', 'user_tag_library_test', 'public', 1000, 5, 'active', 'test_script', NOW(), NOW() ) RETURNING id """) result = db_session.execute( insert_query, {"dataflow_id": neo4j_ids.get("dataflow_2")}, ) product_id = result.fetchone()[0] db_session.commit() logger.info(f"创建测试数据产品成功,ID={product_id}") return product_id def test_lineage_visualization_api(app_client, product_id: int) -> bool: """ 测试血缘可视化 API Args: app_client: Flask 测试客户端 product_id: 数据产品 ID Returns: bool: 测试是否成功 """ logger.info(f"测试血缘可视化 API,product_id={product_id}") sample_data = { "用户ID": 12345, "姓名": "张三", "年龄": 28, "用户标签": "高价值用户", "画像分数": 0.85, } response = app_client.post( f"/api/dataservice/products/{product_id}/lineage-visualization", data=json.dumps({"sample_data": sample_data}), content_type="application/json", ) response_data = json.loads(response.data) logger.info(f"API 响应状态码: {response.status_code}") logger.info( f"API 响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}" ) # 验证响应 if response_data.get("code") == 200: data = response_data.get("data", {}) nodes = data.get("nodes", []) lines = data.get("lines", []) depth = data.get("lineage_depth", 0) logger.info(f"节点数量: {len(nodes)}") logger.info(f"关系数量: {len(lines)}") logger.info(f"血缘深度: {depth}") # 验证基本结构 if len(nodes) >= 2 and len(lines) >= 1: logger.info("✅ 血缘可视化 API 测试通过!") return True else: logger.warning("⚠️ 返回的节点或关系数量不足") return False else: logger.error(f"❌ API 返回错误: {response_data.get('message')}") return False def cleanup_test_data(neo4j_session, db_session) -> None: """ 清理测试数据 Args: neo4j_session: Neo4j 会话 db_session: SQLAlchemy 会话 """ logger.info("清理测试数据...") # 清理 Neo4j 测试数据 cleanup_neo4j_query = """ MATCH (n) WHERE n.name_en ENDS WITH '_test' DETACH DELETE n """ neo4j_session.run(cleanup_neo4j_query) logger.info("Neo4j 测试数据已清理") # 清理 PostgreSQL 测试数据 from sqlalchemy import text cleanup_pg_query = text(""" DELETE FROM data_products WHERE product_name_en = 'user_tag_library_test' """) db_session.execute(cleanup_pg_query) db_session.commit() logger.info("PostgreSQL 测试数据已清理") def main() -> int: """主函数""" logger.info("=" * 60) logger.info("开始执行数据血缘可视化功能测试") logger.info("=" * 60) # 添加项目路径 import os project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, project_root) logger.info(f"项目路径: {project_root}") try: # 导入应用 logger.info("正在导入应用...") from app import create_app, db logger.info("成功导入 create_app 和 db") from app.services.neo4j_driver import neo4j_driver logger.info("成功导入 neo4j_driver") logger.info("正在创建应用...") app = create_app() logger.info("应用创建成功") with app.app_context(): logger.info("进入应用上下文") # 获取 Neo4j 会话 with neo4j_driver.get_session() as neo4j_session: # 1. 创建 Neo4j 测试数据 neo4j_ids = create_test_data_in_neo4j(neo4j_session) # 2. 创建 PostgreSQL 测试数据 product_id = create_test_data_product(db.session, neo4j_ids) # 3. 测试 API with app.test_client() as client: test_result = test_lineage_visualization_api(client, product_id) # 4. 询问是否清理测试数据 if "--cleanup" in sys.argv: cleanup_test_data(neo4j_session, db.session) else: logger.info("测试数据保留(使用 --cleanup 参数可清理)") if test_result: logger.info("=" * 60) logger.info("✅ 所有测试通过!") logger.info("=" * 60) return 0 else: logger.error("=" * 60) logger.error("❌ 测试失败!") logger.error("=" * 60) return 1 except Exception as e: logger.exception(f"测试执行失败: {str(e)}") return 1 if __name__ == "__main__": sys.exit(main())