| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341 |
- """
- 测试数据加工可视化 API
- 用于测试 /api/dataservice/products/23/lineage-visualization 接口
- 通过图谱和数据库表查询 warehouse_inventory_summary 和 test_product_inventory 表中的数据
- """
- import json
- import os
- import sys
- from datetime import datetime
- # 设置为生产环境以连接远程数据库
- os.environ["FLASK_ENV"] = "production"
- # 添加项目路径
- sys.path.insert(0, ".")
- from sqlalchemy import text
- from app import create_app, db
- from app.models.data_product import DataProduct
- def get_product_info(app, product_id: int):
- """获取数据产品基本信息"""
- with app.app_context():
- product = DataProduct.query.get(product_id)
- if product:
- print(f"\n{'=' * 60}")
- print(f"数据产品信息 (ID: {product_id})")
- print(f"{'=' * 60}")
- print(f" 产品名称: {product.product_name}")
- print(f" 英文名称: {product.product_name_en}")
- print(f" 目标表名: {product.target_table}")
- print(f" 目标Schema: {product.target_schema}")
- print(f" 关联DataFlow ID: {product.source_dataflow_id}")
- print(f" 记录数: {product.record_count}")
- print(f" 列数: {product.column_count}")
- return product.to_dict()
- else:
- print(f"数据产品 ID={product_id} 不存在")
- return None
- def get_sample_data_from_table(app, schema: str, table_name: str, limit: int = 1):
- """从指定表获取样例数据"""
- with app.app_context():
- try:
- # 检查表是否存在
- check_sql = text("""
- SELECT EXISTS (
- SELECT FROM information_schema.tables
- WHERE table_schema = :schema
- AND table_name = :table
- )
- """)
- exists = db.session.execute(
- check_sql, {"schema": schema, "table": table_name}
- ).scalar()
- if not exists:
- print(f"表 {schema}.{table_name} 不存在")
- return None
- # 获取表的列信息
- columns_sql = text("""
- SELECT column_name, data_type
- FROM information_schema.columns
- WHERE table_schema = :schema AND table_name = :table
- ORDER BY ordinal_position
- """)
- columns = db.session.execute(
- columns_sql, {"schema": schema, "table": table_name}
- ).fetchall()
- print(f"\n{'=' * 60}")
- print(f"表结构: {schema}.{table_name}")
- print(f"{'=' * 60}")
- for col_name, col_type in columns:
- print(f" {col_name}: {col_type}")
- # 获取样例数据
- query_sql = text(f'SELECT * FROM "{schema}"."{table_name}" LIMIT :limit')
- result = db.session.execute(query_sql, {"limit": limit})
- rows = result.fetchall()
- column_names = list(result.keys())
- if rows:
- sample_data = dict(zip(column_names, rows[0]))
- print("\n样例数据:")
- print(f"{'-' * 60}")
- for key, value in sample_data.items():
- print(f" {key}: {value}")
- return sample_data
- else:
- print(f"表 {schema}.{table_name} 中没有数据")
- return None
- except Exception as e:
- print(f"查询表 {schema}.{table_name} 失败: {str(e)}")
- return None
- def query_table_data(app, schema: str, table_name: str):
- """查询指定表的所有数据"""
- with app.app_context():
- try:
- check_sql = text("""
- SELECT EXISTS (
- SELECT FROM information_schema.tables
- WHERE table_schema = :schema
- AND table_name = :table
- )
- """)
- exists = db.session.execute(
- check_sql, {"schema": schema, "table": table_name}
- ).scalar()
- if not exists:
- print(f"表 {schema}.{table_name} 不存在")
- return None
- # 获取数据总数
- count_sql = text(f'SELECT COUNT(*) FROM "{schema}"."{table_name}"')
- count = db.session.execute(count_sql).scalar()
- # 获取样例数据
- query_sql = text(f'SELECT * FROM "{schema}"."{table_name}" LIMIT 5')
- result = db.session.execute(query_sql)
- rows = result.fetchall()
- column_names = list(result.keys())
- print(f"\n{'=' * 60}")
- print(f"表数据: {schema}.{table_name} (总计 {count} 条记录)")
- print(f"{'=' * 60}")
- if rows:
- # 打印列名
- print(f"列名: {column_names}")
- print(f"{'-' * 60}")
- for i, row in enumerate(rows):
- print(f"[{i + 1}] {dict(zip(column_names, row))}")
- return [dict(zip(column_names, row)) for row in rows]
- else:
- print("表中没有数据")
- return []
- except Exception as e:
- print(f"查询表 {schema}.{table_name} 失败: {str(e)}")
- return None
- def test_lineage_visualization_api(app, product_id: int, sample_data: dict):
- """测试血缘可视化 API(直接调用服务层)"""
- from app.core.data_service.data_product_service import DataProductService
- with app.app_context():
- print(f"\n{'=' * 60}")
- print(f"测试血缘可视化: product_id={product_id}")
- print(f"{'=' * 60}")
- print("请求体 sample_data:")
- print(json.dumps(sample_data, ensure_ascii=False, indent=2, default=str))
- try:
- result = DataProductService.get_data_lineage_visualization(
- product_id=product_id,
- sample_data=sample_data,
- )
- print("\n响应内容:")
- print(json.dumps(result, ensure_ascii=False, indent=2, default=str))
- # 打印节点详情
- if result.get("nodes"):
- print(f"\n{'=' * 60}")
- print(f"节点详情 (共 {len(result['nodes'])} 个)")
- print(f"{'=' * 60}")
- for node in result["nodes"]:
- print(f"\n 节点 ID: {node.get('id')}")
- print(f" 中文名: {node.get('name_zh')}")
- print(f" 英文名: {node.get('name_en')}")
- print(f" 类型: {node.get('node_type')}")
- print(f" 标签: {node.get('labels')}")
- print(f" 是目标节点: {node.get('is_target')}")
- print(f" 是源节点: {node.get('is_source')}")
- if node.get("matched_fields"):
- print(f" 匹配字段数: {len(node['matched_fields'])}")
- for field in node["matched_fields"]:
- print(
- f" - {field.get('name_zh')} ({field.get('name_en')}): {field.get('value')}"
- )
- if node.get("matched_data"):
- print(" 匹配数据:")
- for data in node["matched_data"][:2]: # 只显示前2条
- print(f" {data}")
- # 打印关系详情
- if result.get("lines"):
- print(f"\n{'=' * 60}")
- print(f"关系详情 (共 {len(result['lines'])} 条)")
- print(f"{'=' * 60}")
- for line in result["lines"]:
- rel_type = line.get("text") or line.get("type", "UNKNOWN")
- print(f" {line['from']} --[{rel_type}]--> {line['to']}")
- return result
- except Exception as e:
- import traceback
- print(f"调用失败: {str(e)}")
- traceback.print_exc()
- return None
- def query_neo4j_business_domain(app, table_name: str):
- """查询 Neo4j 中与表名对应的 BusinessDomain 节点"""
- with app.app_context():
- from app.services.neo4j_driver import neo4j_driver
- print(f"\n{'=' * 60}")
- print(f"查询 Neo4j BusinessDomain: {table_name}")
- print(f"{'=' * 60}")
- with neo4j_driver.get_session() as session:
- # 通过表名查找 BusinessDomain
- query = """
- MATCH (bd:BusinessDomain)
- WHERE bd.name_en = $table_name OR bd.name = $table_name
- RETURN id(bd) as bd_id, bd.name_zh as name_zh, bd.name_en as name_en,
- labels(bd) as labels, bd.description as description
- """
- result = session.run(query, {"table_name": table_name}).data()
- if result:
- for bd in result:
- print(f" ID: {bd['bd_id']}")
- print(f" 中文名: {bd['name_zh']}")
- print(f" 英文名: {bd['name_en']}")
- print(f" 标签: {bd['labels']}")
- print(f" 描述: {bd.get('description', 'N/A')}")
- print("-" * 40)
- return result
- else:
- print(f"未找到与 {table_name} 对应的 BusinessDomain 节点")
- # 尝试模糊匹配
- fuzzy_query = """
- MATCH (bd:BusinessDomain)
- WHERE bd.name_en CONTAINS $keyword OR bd.name CONTAINS $keyword
- OR bd.name_zh CONTAINS $keyword
- RETURN id(bd) as bd_id, bd.name_zh as name_zh, bd.name_en as name_en,
- labels(bd) as labels
- LIMIT 10
- """
- fuzzy_result = session.run(
- fuzzy_query, {"keyword": table_name.split("_")[0]}
- ).data()
- if fuzzy_result:
- print("\n可能相关的 BusinessDomain 节点:")
- for bd in fuzzy_result:
- print(
- f" - {bd['name_zh']} ({bd['name_en']}) [ID: {bd['bd_id']}]"
- )
- return None
- def main():
- print(f"\n{'#' * 60}")
- print("# 数据加工可视化 API 测试脚本")
- print(f"# 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- print(f"{'#' * 60}")
- # 创建应用
- app = create_app()
- product_id = 23
- # 1. 获取数据产品信息
- product_info = get_product_info(app, product_id)
- if not product_info:
- print("无法获取数据产品信息,退出测试")
- return
- target_table = product_info.get("target_table")
- target_schema = product_info.get("target_schema", "public")
- # 2. 查询目标表数据
- print(f"\n\n{'#' * 60}")
- print("# 查询目标表数据")
- print(f"{'#' * 60}")
- sample_data = get_sample_data_from_table(app, target_schema, target_table)
- # 3. 查询用户提到的两个表(在 dags schema 中)
- print(f"\n\n{'#' * 60}")
- print("# 查询 warehouse_inventory_summary 表 (dags schema)")
- print(f"{'#' * 60}")
- query_table_data(app, "dags", "warehouse_inventory_summary")
- print(f"\n\n{'#' * 60}")
- print("# 查询 test_product_inventory 表 (dags schema)")
- print(f"{'#' * 60}")
- query_table_data(app, "dags", "test_product_inventory")
- # 4. 查询 Neo4j 中的相关节点
- print(f"\n\n{'#' * 60}")
- print("# 查询 Neo4j 图谱节点")
- print(f"{'#' * 60}")
- query_neo4j_business_domain(app, target_table)
- query_neo4j_business_domain(app, "warehouse_inventory_summary")
- query_neo4j_business_domain(app, "test_product_inventory")
- # 5. 测试 API
- if sample_data:
- print(f"\n\n{'#' * 60}")
- print("# 调用血缘可视化 API")
- print(f"{'#' * 60}")
- test_lineage_visualization_api(app, product_id, sample_data)
- else:
- print("\n没有样例数据,跳过 API 测试")
- # 尝试使用手动构造的测试数据
- print("\n尝试使用手动构造的测试数据...")
- manual_sample_data = {
- "产品名称": "测试产品",
- "仓库名称": "测试仓库",
- "库存数量": 100,
- }
- test_lineage_visualization_api(app, product_id, manual_sample_data)
- if __name__ == "__main__":
- main()
|