""" 测试数据加工可视化 API 用于测试 /api/dataservice/products/23/lineage-visualization 接口 通过图谱和数据库表查询 warehouse_inventory_summary 和 test_product_inventory 表中的数据 """ import json import os import sys from datetime import datetime # 设置为生产环境以连接远程数据库 os.environ["FLASK_ENV"] = "production" # 添加项目路径 sys.path.insert(0, ".") from sqlalchemy import text from app import create_app, db from app.models.data_product import DataProduct def get_product_info(app, product_id: int): """获取数据产品基本信息""" with app.app_context(): product = DataProduct.query.get(product_id) if product: print(f"\n{'=' * 60}") print(f"数据产品信息 (ID: {product_id})") print(f"{'=' * 60}") print(f" 产品名称: {product.product_name}") print(f" 英文名称: {product.product_name_en}") print(f" 目标表名: {product.target_table}") print(f" 目标Schema: {product.target_schema}") print(f" 关联DataFlow ID: {product.source_dataflow_id}") print(f" 记录数: {product.record_count}") print(f" 列数: {product.column_count}") return product.to_dict() else: print(f"数据产品 ID={product_id} 不存在") return None def get_sample_data_from_table(app, schema: str, table_name: str, limit: int = 1): """从指定表获取样例数据""" with app.app_context(): try: # 检查表是否存在 check_sql = text(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = :schema AND table_name = :table ) """) exists = db.session.execute( check_sql, {"schema": schema, "table": table_name} ).scalar() if not exists: print(f"表 {schema}.{table_name} 不存在") return None # 获取表的列信息 columns_sql = text(""" SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = :schema AND table_name = :table ORDER BY ordinal_position """) columns = db.session.execute( columns_sql, {"schema": schema, "table": table_name} ).fetchall() print(f"\n{'=' * 60}") print(f"表结构: {schema}.{table_name}") print(f"{'=' * 60}") for col_name, col_type in columns: print(f" {col_name}: {col_type}") # 获取样例数据 query_sql = text(f'SELECT * FROM "{schema}"."{table_name}" LIMIT :limit') result = db.session.execute(query_sql, {"limit": limit}) rows = result.fetchall() column_names = list(result.keys()) if rows: sample_data = dict(zip(column_names, rows[0])) print("\n样例数据:") print(f"{'-' * 60}") for key, value in sample_data.items(): print(f" {key}: {value}") return sample_data else: print(f"表 {schema}.{table_name} 中没有数据") return None except Exception as e: print(f"查询表 {schema}.{table_name} 失败: {str(e)}") return None def query_table_data(app, schema: str, table_name: str): """查询指定表的所有数据""" with app.app_context(): try: check_sql = text(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = :schema AND table_name = :table ) """) exists = db.session.execute( check_sql, {"schema": schema, "table": table_name} ).scalar() if not exists: print(f"表 {schema}.{table_name} 不存在") return None # 获取数据总数 count_sql = text(f'SELECT COUNT(*) FROM "{schema}"."{table_name}"') count = db.session.execute(count_sql).scalar() # 获取样例数据 query_sql = text(f'SELECT * FROM "{schema}"."{table_name}" LIMIT 5') result = db.session.execute(query_sql) rows = result.fetchall() column_names = list(result.keys()) print(f"\n{'=' * 60}") print(f"表数据: {schema}.{table_name} (总计 {count} 条记录)") print(f"{'=' * 60}") if rows: # 打印列名 print(f"列名: {column_names}") print(f"{'-' * 60}") for i, row in enumerate(rows): print(f"[{i + 1}] {dict(zip(column_names, row))}") return [dict(zip(column_names, row)) for row in rows] else: print("表中没有数据") return [] except Exception as e: print(f"查询表 {schema}.{table_name} 失败: {str(e)}") return None def test_lineage_visualization_api(app, product_id: int, sample_data: dict): """测试血缘可视化 API(直接调用服务层)""" from app.core.data_service.data_product_service import DataProductService with app.app_context(): print(f"\n{'=' * 60}") print(f"测试血缘可视化: product_id={product_id}") print(f"{'=' * 60}") print("请求体 sample_data:") print(json.dumps(sample_data, ensure_ascii=False, indent=2, default=str)) try: result = DataProductService.get_data_lineage_visualization( product_id=product_id, sample_data=sample_data, ) print("\n响应内容:") print(json.dumps(result, ensure_ascii=False, indent=2, default=str)) # 打印节点详情 if result.get("nodes"): print(f"\n{'=' * 60}") print(f"节点详情 (共 {len(result['nodes'])} 个)") print(f"{'=' * 60}") for node in result["nodes"]: print(f"\n 节点 ID: {node.get('id')}") print(f" 中文名: {node.get('name_zh')}") print(f" 英文名: {node.get('name_en')}") print(f" 类型: {node.get('node_type')}") print(f" 标签: {node.get('labels')}") print(f" 是目标节点: {node.get('is_target')}") print(f" 是源节点: {node.get('is_source')}") if node.get("matched_fields"): print(f" 匹配字段数: {len(node['matched_fields'])}") for field in node["matched_fields"]: print( f" - {field.get('name_zh')} ({field.get('name_en')}): {field.get('value')}" ) if node.get("matched_data"): print(" 匹配数据:") for data in node["matched_data"][:2]: # 只显示前2条 print(f" {data}") # 打印关系详情 if result.get("lines"): print(f"\n{'=' * 60}") print(f"关系详情 (共 {len(result['lines'])} 条)") print(f"{'=' * 60}") for line in result["lines"]: rel_type = line.get("text") or line.get("type", "UNKNOWN") print(f" {line['from']} --[{rel_type}]--> {line['to']}") return result except Exception as e: import traceback print(f"调用失败: {str(e)}") traceback.print_exc() return None def query_neo4j_business_domain(app, table_name: str): """查询 Neo4j 中与表名对应的 BusinessDomain 节点""" with app.app_context(): from app.services.neo4j_driver import neo4j_driver print(f"\n{'=' * 60}") print(f"查询 Neo4j BusinessDomain: {table_name}") print(f"{'=' * 60}") with neo4j_driver.get_session() as session: # 通过表名查找 BusinessDomain query = """ MATCH (bd:BusinessDomain) WHERE bd.name_en = $table_name OR bd.name = $table_name RETURN id(bd) as bd_id, bd.name_zh as name_zh, bd.name_en as name_en, labels(bd) as labels, bd.description as description """ result = session.run(query, {"table_name": table_name}).data() if result: for bd in result: print(f" ID: {bd['bd_id']}") print(f" 中文名: {bd['name_zh']}") print(f" 英文名: {bd['name_en']}") print(f" 标签: {bd['labels']}") print(f" 描述: {bd.get('description', 'N/A')}") print("-" * 40) return result else: print(f"未找到与 {table_name} 对应的 BusinessDomain 节点") # 尝试模糊匹配 fuzzy_query = """ MATCH (bd:BusinessDomain) WHERE bd.name_en CONTAINS $keyword OR bd.name CONTAINS $keyword OR bd.name_zh CONTAINS $keyword RETURN id(bd) as bd_id, bd.name_zh as name_zh, bd.name_en as name_en, labels(bd) as labels LIMIT 10 """ fuzzy_result = session.run( fuzzy_query, {"keyword": table_name.split("_")[0]} ).data() if fuzzy_result: print("\n可能相关的 BusinessDomain 节点:") for bd in fuzzy_result: print( f" - {bd['name_zh']} ({bd['name_en']}) [ID: {bd['bd_id']}]" ) return None def main(): print(f"\n{'#' * 60}") print("# 数据加工可视化 API 测试脚本") print(f"# 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"{'#' * 60}") # 创建应用 app = create_app() product_id = 23 # 1. 获取数据产品信息 product_info = get_product_info(app, product_id) if not product_info: print("无法获取数据产品信息,退出测试") return target_table = product_info.get("target_table") target_schema = product_info.get("target_schema", "public") # 2. 查询目标表数据 print(f"\n\n{'#' * 60}") print("# 查询目标表数据") print(f"{'#' * 60}") sample_data = get_sample_data_from_table(app, target_schema, target_table) # 3. 查询用户提到的两个表(在 dags schema 中) print(f"\n\n{'#' * 60}") print("# 查询 warehouse_inventory_summary 表 (dags schema)") print(f"{'#' * 60}") query_table_data(app, "dags", "warehouse_inventory_summary") print(f"\n\n{'#' * 60}") print("# 查询 test_product_inventory 表 (dags schema)") print(f"{'#' * 60}") query_table_data(app, "dags", "test_product_inventory") # 4. 查询 Neo4j 中的相关节点 print(f"\n\n{'#' * 60}") print("# 查询 Neo4j 图谱节点") print(f"{'#' * 60}") query_neo4j_business_domain(app, target_table) query_neo4j_business_domain(app, "warehouse_inventory_summary") query_neo4j_business_domain(app, "test_product_inventory") # 5. 测试 API if sample_data: print(f"\n\n{'#' * 60}") print("# 调用血缘可视化 API") print(f"{'#' * 60}") test_lineage_visualization_api(app, product_id, sample_data) else: print("\n没有样例数据,跳过 API 测试") # 尝试使用手动构造的测试数据 print("\n尝试使用手动构造的测试数据...") manual_sample_data = { "产品名称": "测试产品", "仓库名称": "测试仓库", "库存数量": 100, } test_lineage_visualization_api(app, product_id, manual_sample_data) if __name__ == "__main__": main()