| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425 |
- #!/usr/bin/env python3
- """
- 数据血缘可视化功能测试脚本
- 此脚本用于:
- 1. 在 Neo4j 中创建模拟的血缘关系数据
- 2. 在 PostgreSQL 中创建对应的数据产品记录
- 3. 测试血缘可视化 API 功能
- """
- from __future__ import annotations
- import json
- import logging
- import sys
- from datetime import datetime
- # 设置日志
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s - %(levelname)s - %(message)s",
- )
- logger = logging.getLogger(__name__)
- def create_test_data_in_neo4j(neo4j_session) -> dict:
- """
- 在 Neo4j 中创建测试血缘数据
- 创建的图结构:
- (DataResource:用户基础数据) -[INPUT]-> (DataFlow:用户数据清洗)
- (DataFlow:用户数据清洗) -[OUTPUT]-> (BusinessDomain:用户画像)
- (BusinessDomain:用户画像) -[INPUT]-> (DataFlow:用户标签生成)
- (DataFlow:用户标签生成) -[OUTPUT]-> (BusinessDomain:用户标签库)
- Returns:
- dict: 包含创建的节点 ID
- """
- logger.info("开始在 Neo4j 中创建测试数据...")
- created_ids = {}
- # 1. 创建源头节点(同时具有 BusinessDomain 和 DataResource 标签)
- create_source_query = """
- MERGE (n:BusinessDomain:DataResource {name_en: 'user_base_info_test'})
- ON CREATE SET
- n.name_zh = '用户基础数据(测试)',
- n.describe = '测试用的用户原始数据表',
- n.type = 'source',
- n.created_at = $created_at
- RETURN id(n) as node_id
- """
- result = neo4j_session.run(
- create_source_query, {"created_at": datetime.now().isoformat()}
- ).single()
- created_ids["source_bd"] = result["node_id"]
- logger.info(f"创建源节点 (DataResource): ID={result['node_id']}")
- # 2. 创建第一个 DataFlow 节点
- create_df1_query = """
- MERGE (n:DataFlow {name_en: 'user_data_clean_test'})
- ON CREATE SET
- n.name_zh = '用户数据清洗(测试)',
- n.describe = '清洗用户基础数据',
- n.script_type = 'sql',
- n.status = 'active',
- n.created_at = $created_at
- RETURN id(n) as node_id
- """
- result = neo4j_session.run(
- create_df1_query, {"created_at": datetime.now().isoformat()}
- ).single()
- created_ids["dataflow_1"] = result["node_id"]
- logger.info(f"创建 DataFlow 1: ID={result['node_id']}")
- # 3. 创建中间 BusinessDomain 节点
- create_mid_bd_query = """
- MERGE (n:BusinessDomain {name_en: 'user_profile_test'})
- ON CREATE SET
- n.name_zh = '用户画像(测试)',
- n.describe = '用户画像数据',
- n.type = 'table',
- n.created_at = $created_at
- RETURN id(n) as node_id
- """
- result = neo4j_session.run(
- create_mid_bd_query, {"created_at": datetime.now().isoformat()}
- ).single()
- created_ids["mid_bd"] = result["node_id"]
- logger.info(f"创建中间 BusinessDomain: ID={result['node_id']}")
- # 4. 创建第二个 DataFlow 节点
- create_df2_query = """
- MERGE (n:DataFlow {name_en: 'user_tag_generate_test'})
- ON CREATE SET
- n.name_zh = '用户标签生成(测试)',
- n.describe = '生成用户标签',
- n.script_type = 'python',
- n.status = 'active',
- n.created_at = $created_at
- RETURN id(n) as node_id
- """
- result = neo4j_session.run(
- create_df2_query, {"created_at": datetime.now().isoformat()}
- ).single()
- created_ids["dataflow_2"] = result["node_id"]
- logger.info(f"创建 DataFlow 2: ID={result['node_id']}")
- # 5. 创建目标 BusinessDomain 节点
- create_target_bd_query = """
- MERGE (n:BusinessDomain {name_en: 'user_tag_library_test'})
- ON CREATE SET
- n.name_zh = '用户标签库(测试)',
- n.describe = '最终的用户标签数据产品',
- n.type = 'table',
- n.created_at = $created_at
- RETURN id(n) as node_id
- """
- result = neo4j_session.run(
- create_target_bd_query, {"created_at": datetime.now().isoformat()}
- ).single()
- created_ids["target_bd"] = result["node_id"]
- logger.info(f"创建目标 BusinessDomain: ID={result['node_id']}")
- # 6. 创建 DataMeta 节点并关联到各个 BusinessDomain
- meta_fields = [
- {"name_zh": "用户ID", "name_en": "user_id", "data_type": "integer"},
- {"name_zh": "姓名", "name_en": "name", "data_type": "string"},
- {"name_zh": "年龄", "name_en": "age", "data_type": "integer"},
- {"name_zh": "用户标签", "name_en": "user_tag", "data_type": "string"},
- {"name_zh": "画像分数", "name_en": "profile_score", "data_type": "float"},
- ]
- for field in meta_fields:
- create_meta_query = """
- MERGE (m:DataMeta {name_en: $name_en + '_test'})
- ON CREATE SET
- m.name_zh = $name_zh,
- m.data_type = $data_type,
- m.created_at = $created_at
- RETURN id(m) as meta_id
- """
- result = neo4j_session.run(
- create_meta_query,
- {
- "name_zh": field["name_zh"],
- "name_en": field["name_en"],
- "data_type": field["data_type"],
- "created_at": datetime.now().isoformat(),
- },
- ).single()
- meta_id = result["meta_id"]
- logger.info(f"创建 DataMeta: {field['name_zh']}, ID={meta_id}")
- # 将前三个字段关联到所有 BusinessDomain
- for bd_key in ["source_bd", "mid_bd", "target_bd"]:
- create_includes_query = """
- MATCH (bd), (m:DataMeta)
- WHERE id(bd) = $bd_id AND id(m) = $meta_id
- MERGE (bd)-[:INCLUDES]->(m)
- """
- neo4j_session.run(
- create_includes_query,
- {"bd_id": created_ids[bd_key], "meta_id": meta_id},
- )
- # 7. 创建 INPUT/OUTPUT 关系
- logger.info("创建血缘关系...")
- # source_bd -[INPUT]-> dataflow_1
- neo4j_session.run(
- """
- MATCH (source), (df:DataFlow)
- WHERE id(source) = $source_id AND id(df) = $df_id
- MERGE (source)-[:INPUT]->(df)
- """,
- {"source_id": created_ids["source_bd"], "df_id": created_ids["dataflow_1"]},
- )
- logger.info("创建关系: source_bd -[INPUT]-> dataflow_1")
- # dataflow_1 -[OUTPUT]-> mid_bd
- neo4j_session.run(
- """
- MATCH (df:DataFlow), (target)
- WHERE id(df) = $df_id AND id(target) = $target_id
- MERGE (df)-[:OUTPUT]->(target)
- """,
- {"df_id": created_ids["dataflow_1"], "target_id": created_ids["mid_bd"]},
- )
- logger.info("创建关系: dataflow_1 -[OUTPUT]-> mid_bd")
- # mid_bd -[INPUT]-> dataflow_2
- neo4j_session.run(
- """
- MATCH (source), (df:DataFlow)
- WHERE id(source) = $source_id AND id(df) = $df_id
- MERGE (source)-[:INPUT]->(df)
- """,
- {"source_id": created_ids["mid_bd"], "df_id": created_ids["dataflow_2"]},
- )
- logger.info("创建关系: mid_bd -[INPUT]-> dataflow_2")
- # dataflow_2 -[OUTPUT]-> target_bd
- neo4j_session.run(
- """
- MATCH (df:DataFlow), (target)
- WHERE id(df) = $df_id AND id(target) = $target_id
- MERGE (df)-[:OUTPUT]->(target)
- """,
- {"df_id": created_ids["dataflow_2"], "target_id": created_ids["target_bd"]},
- )
- logger.info("创建关系: dataflow_2 -[OUTPUT]-> target_bd")
- logger.info("Neo4j 测试数据创建完成")
- return created_ids
- def create_test_data_product(db_session, neo4j_ids: dict) -> int:
- """
- 在 PostgreSQL 中创建测试数据产品
- Args:
- db_session: SQLAlchemy 会话
- neo4j_ids: Neo4j 中创建的节点 ID
- Returns:
- int: 创建的数据产品 ID
- """
- from sqlalchemy import text
- logger.info("在 PostgreSQL 中创建测试数据产品...")
- # 检查是否已存在
- check_query = text("""
- SELECT id FROM data_products
- WHERE product_name_en = 'user_tag_library_test'
- """)
- result = db_session.execute(check_query).fetchone()
- if result:
- product_id = result[0]
- logger.info(f"测试数据产品已存在,ID={product_id}")
- return product_id
- # 创建数据产品
- insert_query = text("""
- INSERT INTO data_products (
- product_name, product_name_en, description,
- source_dataflow_id, source_dataflow_name,
- target_table, target_schema,
- record_count, column_count,
- status, created_by, created_at, updated_at
- ) VALUES (
- '用户标签库(测试)', 'user_tag_library_test',
- '测试血缘可视化功能的数据产品',
- :dataflow_id, '用户标签生成(测试)',
- 'user_tag_library_test', 'public',
- 1000, 5,
- 'active', 'test_script', NOW(), NOW()
- ) RETURNING id
- """)
- result = db_session.execute(
- insert_query,
- {"dataflow_id": neo4j_ids.get("dataflow_2")},
- )
- product_id = result.fetchone()[0]
- db_session.commit()
- logger.info(f"创建测试数据产品成功,ID={product_id}")
- return product_id
- def test_lineage_visualization_api(app_client, product_id: int) -> bool:
- """
- 测试血缘可视化 API
- Args:
- app_client: Flask 测试客户端
- product_id: 数据产品 ID
- Returns:
- bool: 测试是否成功
- """
- logger.info(f"测试血缘可视化 API,product_id={product_id}")
- sample_data = {
- "用户ID": 12345,
- "姓名": "张三",
- "年龄": 28,
- "用户标签": "高价值用户",
- "画像分数": 0.85,
- }
- response = app_client.post(
- f"/api/dataservice/products/{product_id}/lineage-visualization",
- data=json.dumps({"sample_data": sample_data}),
- content_type="application/json",
- )
- response_data = json.loads(response.data)
- logger.info(f"API 响应状态码: {response.status_code}")
- logger.info(
- f"API 响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}"
- )
- # 验证响应
- if response_data.get("code") == 200:
- data = response_data.get("data", {})
- nodes = data.get("nodes", [])
- lines = data.get("lines", [])
- depth = data.get("lineage_depth", 0)
- logger.info(f"节点数量: {len(nodes)}")
- logger.info(f"关系数量: {len(lines)}")
- logger.info(f"血缘深度: {depth}")
- # 验证基本结构
- if len(nodes) >= 2 and len(lines) >= 1:
- logger.info("✅ 血缘可视化 API 测试通过!")
- return True
- else:
- logger.warning("⚠️ 返回的节点或关系数量不足")
- return False
- else:
- logger.error(f"❌ API 返回错误: {response_data.get('message')}")
- return False
- def cleanup_test_data(neo4j_session, db_session) -> None:
- """
- 清理测试数据
- Args:
- neo4j_session: Neo4j 会话
- db_session: SQLAlchemy 会话
- """
- logger.info("清理测试数据...")
- # 清理 Neo4j 测试数据
- cleanup_neo4j_query = """
- MATCH (n)
- WHERE n.name_en ENDS WITH '_test'
- DETACH DELETE n
- """
- neo4j_session.run(cleanup_neo4j_query)
- logger.info("Neo4j 测试数据已清理")
- # 清理 PostgreSQL 测试数据
- from sqlalchemy import text
- cleanup_pg_query = text("""
- DELETE FROM data_products
- WHERE product_name_en = 'user_tag_library_test'
- """)
- db_session.execute(cleanup_pg_query)
- db_session.commit()
- logger.info("PostgreSQL 测试数据已清理")
- def main() -> int:
- """主函数"""
- logger.info("=" * 60)
- logger.info("开始执行数据血缘可视化功能测试")
- logger.info("=" * 60)
- # 添加项目路径
- import os
- project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.insert(0, project_root)
- logger.info(f"项目路径: {project_root}")
- try:
- # 导入应用
- logger.info("正在导入应用...")
- from app import create_app, db
- logger.info("成功导入 create_app 和 db")
- from app.services.neo4j_driver import neo4j_driver
- logger.info("成功导入 neo4j_driver")
- logger.info("正在创建应用...")
- app = create_app()
- logger.info("应用创建成功")
- with app.app_context():
- logger.info("进入应用上下文")
- # 获取 Neo4j 会话
- with neo4j_driver.get_session() as neo4j_session:
- # 1. 创建 Neo4j 测试数据
- neo4j_ids = create_test_data_in_neo4j(neo4j_session)
- # 2. 创建 PostgreSQL 测试数据
- product_id = create_test_data_product(db.session, neo4j_ids)
- # 3. 测试 API
- with app.test_client() as client:
- test_result = test_lineage_visualization_api(client, product_id)
- # 4. 询问是否清理测试数据
- if "--cleanup" in sys.argv:
- cleanup_test_data(neo4j_session, db.session)
- else:
- logger.info("测试数据保留(使用 --cleanup 参数可清理)")
- if test_result:
- logger.info("=" * 60)
- logger.info("✅ 所有测试通过!")
- logger.info("=" * 60)
- return 0
- else:
- logger.error("=" * 60)
- logger.error("❌ 测试失败!")
- logger.error("=" * 60)
- return 1
- except Exception as e:
- logger.exception(f"测试执行失败: {str(e)}")
- return 1
- if __name__ == "__main__":
- sys.exit(main())
|