| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 测试元数据工作流
- 用于验证 /api/meta/check 和 /api/meta/node/add 接口
- """
- import requests
- import json
- from datetime import datetime
- # 配置
- BASE_URL = "http://192.168.3.143:5000"
- TEST_METADATA = {
- "name_zh": "其他费用定额",
- "data_type": "string",
- "description": "医疗行业费用元数据"
- }
- def print_section(title):
- """打印分节标题"""
- print("\n" + "="*60)
- print(f" {title}")
- print("="*60 + "\n")
- def test_check_metadata(name_zh):
- """测试检查元数据接口"""
- print_section(f"步骤 1: 检查元数据是否存在")
-
- url = f"{BASE_URL}/api/meta/check"
- params = {"name_zh": name_zh}
-
- print(f"请求 URL: {url}")
- print(f"请求参数: {params}")
-
- try:
- response = requests.get(url, params=params, timeout=10)
- print(f"\n响应状态码: {response.status_code}")
- print(f"响应内容:")
- print(json.dumps(response.json(), indent=2, ensure_ascii=False))
-
- if response.status_code == 200:
- data = response.json()
- if data.get("code") == 200:
- exists = data.get("data", {}).get("exists", False)
- print(f"\n✅ 检查成功!元数据 '{name_zh}' {'已存在' if exists else '不存在'}")
- return exists
- else:
- print(f"\n❌ 检查失败: {data.get('msg', '未知错误')}")
- return None
- else:
- print(f"\n❌ HTTP 错误: {response.status_code}")
- return None
-
- except requests.Timeout:
- print("\n❌ 请求超时!")
- return None
- except requests.ConnectionError:
- print(f"\n❌ 无法连接到服务器: {BASE_URL}")
- print("请确保:")
- print(" 1. 服务器地址正确")
- print(" 2. 服务已启动")
- print(" 3. 网络连接正常")
- return None
- except Exception as e:
- print(f"\n❌ 错误: {str(e)}")
- return None
- def test_create_metadata(name_zh, data_type, description):
- """测试创建元数据接口"""
- print_section(f"步骤 2: 创建元数据")
-
- url = f"{BASE_URL}/api/meta/node/add"
- payload = {
- "name_zh": name_zh,
- "data_type": data_type,
- "describe": description,
- "source": "workflow-test",
- "status": True
- }
-
- print(f"请求 URL: {url}")
- print(f"请求数据:")
- print(json.dumps(payload, indent=2, ensure_ascii=False))
-
- try:
- response = requests.post(
- url,
- json=payload,
- headers={"Content-Type": "application/json"},
- timeout=10
- )
- print(f"\n响应状态码: {response.status_code}")
- print(f"响应内容:")
- print(json.dumps(response.json(), indent=2, ensure_ascii=False))
-
- if response.status_code == 200:
- data = response.json()
- if data.get("code") == 200:
- print(f"\n✅ 创建成功!元数据 '{name_zh}' 已创建")
- return True
- else:
- print(f"\n❌ 创建失败: {data.get('msg', '未知错误')}")
- return False
- else:
- print(f"\n❌ HTTP 错误: {response.status_code}")
- return False
-
- except requests.Timeout:
- print("\n❌ 请求超时!")
- return False
- except requests.ConnectionError:
- print(f"\n❌ 无法连接到服务器: {BASE_URL}")
- return False
- except Exception as e:
- print(f"\n❌ 错误: {str(e)}")
- return False
- def main():
- """主测试流程"""
- print("\n" + "🚀 " * 30)
- print(" 元数据工作流测试")
- print("🚀 " * 30)
-
- print(f"\n测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- print(f"服务器: {BASE_URL}")
- print(f"\n测试元数据信息:")
- print(f" - 中文名: {TEST_METADATA['name_zh']}")
- print(f" - 类型: {TEST_METADATA['data_type']}")
- print(f" - 描述: {TEST_METADATA['description']}")
-
- # 步骤 1: 检查元数据是否存在
- exists = test_check_metadata(TEST_METADATA['name_zh'])
-
- if exists is None:
- print("\n❌ 检查接口失败,无法继续测试")
- return
-
- # 步骤 2: 根据检查结果决定是否创建
- if exists:
- print_section("测试结果")
- print(f"⚠️ 元数据 '{TEST_METADATA['name_zh']}' 已存在,无需创建")
- print("\n如果需要测试创建功能,请:")
- print(" 1. 修改 TEST_METADATA['name_zh'] 为不存在的名称")
- print(" 2. 或者先删除现有的元数据")
- else:
- print(f"\n元数据 '{TEST_METADATA['name_zh']}' 不存在,继续创建...")
- success = test_create_metadata(
- TEST_METADATA['name_zh'],
- TEST_METADATA['data_type'],
- TEST_METADATA['description']
- )
-
- if success:
- # 再次检查确认创建成功
- print_section("步骤 3: 确认创建结果")
- exists_after = test_check_metadata(TEST_METADATA['name_zh'])
-
- if exists_after:
- print_section("✅ 测试结果")
- print("🎉 所有测试通过!")
- print(f" - 检查接口正常 ✅")
- print(f" - 创建接口正常 ✅")
- print(f" - 元数据 '{TEST_METADATA['name_zh']}' 已成功创建 ✅")
- else:
- print_section("⚠️ 测试结果")
- print("创建接口返回成功,但再次检查时未找到元数据")
- print("可能原因:数据未同步或检查接口有问题")
- else:
- print_section("❌ 测试结果")
- print("创建元数据失败")
-
- print("\n" + "🏁 " * 30)
- print(" 测试完成")
- print("🏁 " * 30 + "\n")
- if __name__ == "__main__":
- try:
- main()
- except KeyboardInterrupt:
- print("\n\n⚠️ 测试被用户中断")
- except Exception as e:
- print(f"\n\n❌ 测试过程中发生错误: {str(e)}")
- import traceback
- traceback.print_exc()
|