#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 测试元数据工作流 用于验证 /api/meta/check 和 /api/meta/node/add 接口 """ import requests import json from datetime import datetime # 配置 BASE_URL = "http://192.168.3.143:5000" TEST_METADATA = { "name_zh": "其他费用定额", "data_type": "string", "description": "医疗行业费用元数据" } def print_section(title): """打印分节标题""" print("\n" + "="*60) print(f" {title}") print("="*60 + "\n") def test_check_metadata(name_zh): """测试检查元数据接口""" print_section(f"步骤 1: 检查元数据是否存在") url = f"{BASE_URL}/api/meta/check" params = {"name_zh": name_zh} print(f"请求 URL: {url}") print(f"请求参数: {params}") try: response = requests.get(url, params=params, timeout=10) print(f"\n响应状态码: {response.status_code}") print(f"响应内容:") print(json.dumps(response.json(), indent=2, ensure_ascii=False)) if response.status_code == 200: data = response.json() if data.get("code") == 200: exists = data.get("data", {}).get("exists", False) print(f"\n✅ 检查成功!元数据 '{name_zh}' {'已存在' if exists else '不存在'}") return exists else: print(f"\n❌ 检查失败: {data.get('msg', '未知错误')}") return None else: print(f"\n❌ HTTP 错误: {response.status_code}") return None except requests.Timeout: print("\n❌ 请求超时!") return None except requests.ConnectionError: print(f"\n❌ 无法连接到服务器: {BASE_URL}") print("请确保:") print(" 1. 服务器地址正确") print(" 2. 服务已启动") print(" 3. 网络连接正常") return None except Exception as e: print(f"\n❌ 错误: {str(e)}") return None def test_create_metadata(name_zh, data_type, description): """测试创建元数据接口""" print_section(f"步骤 2: 创建元数据") url = f"{BASE_URL}/api/meta/node/add" payload = { "name_zh": name_zh, "data_type": data_type, "describe": description, "source": "workflow-test", "status": True } print(f"请求 URL: {url}") print(f"请求数据:") print(json.dumps(payload, indent=2, ensure_ascii=False)) try: response = requests.post( url, json=payload, headers={"Content-Type": "application/json"}, timeout=10 ) print(f"\n响应状态码: {response.status_code}") print(f"响应内容:") print(json.dumps(response.json(), indent=2, ensure_ascii=False)) if response.status_code == 200: data = response.json() if data.get("code") == 200: print(f"\n✅ 创建成功!元数据 '{name_zh}' 已创建") return True else: print(f"\n❌ 创建失败: {data.get('msg', '未知错误')}") return False else: print(f"\n❌ HTTP 错误: {response.status_code}") return False except requests.Timeout: print("\n❌ 请求超时!") return False except requests.ConnectionError: print(f"\n❌ 无法连接到服务器: {BASE_URL}") return False except Exception as e: print(f"\n❌ 错误: {str(e)}") return False def main(): """主测试流程""" print("\n" + "🚀 " * 30) print(" 元数据工作流测试") print("🚀 " * 30) print(f"\n测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"服务器: {BASE_URL}") print(f"\n测试元数据信息:") print(f" - 中文名: {TEST_METADATA['name_zh']}") print(f" - 类型: {TEST_METADATA['data_type']}") print(f" - 描述: {TEST_METADATA['description']}") # 步骤 1: 检查元数据是否存在 exists = test_check_metadata(TEST_METADATA['name_zh']) if exists is None: print("\n❌ 检查接口失败,无法继续测试") return # 步骤 2: 根据检查结果决定是否创建 if exists: print_section("测试结果") print(f"⚠️ 元数据 '{TEST_METADATA['name_zh']}' 已存在,无需创建") print("\n如果需要测试创建功能,请:") print(" 1. 修改 TEST_METADATA['name_zh'] 为不存在的名称") print(" 2. 或者先删除现有的元数据") else: print(f"\n元数据 '{TEST_METADATA['name_zh']}' 不存在,继续创建...") success = test_create_metadata( TEST_METADATA['name_zh'], TEST_METADATA['data_type'], TEST_METADATA['description'] ) if success: # 再次检查确认创建成功 print_section("步骤 3: 确认创建结果") exists_after = test_check_metadata(TEST_METADATA['name_zh']) if exists_after: print_section("✅ 测试结果") print("🎉 所有测试通过!") print(f" - 检查接口正常 ✅") print(f" - 创建接口正常 ✅") print(f" - 元数据 '{TEST_METADATA['name_zh']}' 已成功创建 ✅") else: print_section("⚠️ 测试结果") print("创建接口返回成功,但再次检查时未找到元数据") print("可能原因:数据未同步或检查接口有问题") else: print_section("❌ 测试结果") print("创建元数据失败") print("\n" + "🏁 " * 30) print(" 测试完成") print("🏁 " * 30 + "\n") if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n\n⚠️ 测试被用户中断") except Exception as e: print(f"\n\n❌ 测试过程中发生错误: {str(e)}") import traceback traceback.print_exc()