test_metadata_workflow.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 测试元数据工作流
  5. 用于验证 /api/meta/check 和 /api/meta/node/add 接口
  6. """
  7. import requests
  8. import json
  9. from datetime import datetime
  10. # 配置
  11. BASE_URL = "http://192.168.3.143:5000"
  12. TEST_METADATA = {
  13. "name_zh": "其他费用定额",
  14. "data_type": "string",
  15. "description": "医疗行业费用元数据"
  16. }
  17. def print_section(title):
  18. """打印分节标题"""
  19. print("\n" + "="*60)
  20. print(f" {title}")
  21. print("="*60 + "\n")
  22. def test_check_metadata(name_zh):
  23. """测试检查元数据接口"""
  24. print_section(f"步骤 1: 检查元数据是否存在")
  25. url = f"{BASE_URL}/api/meta/check"
  26. params = {"name_zh": name_zh}
  27. print(f"请求 URL: {url}")
  28. print(f"请求参数: {params}")
  29. try:
  30. response = requests.get(url, params=params, timeout=10)
  31. print(f"\n响应状态码: {response.status_code}")
  32. print(f"响应内容:")
  33. print(json.dumps(response.json(), indent=2, ensure_ascii=False))
  34. if response.status_code == 200:
  35. data = response.json()
  36. if data.get("code") == 200:
  37. exists = data.get("data", {}).get("exists", False)
  38. print(f"\n✅ 检查成功!元数据 '{name_zh}' {'已存在' if exists else '不存在'}")
  39. return exists
  40. else:
  41. print(f"\n❌ 检查失败: {data.get('msg', '未知错误')}")
  42. return None
  43. else:
  44. print(f"\n❌ HTTP 错误: {response.status_code}")
  45. return None
  46. except requests.Timeout:
  47. print("\n❌ 请求超时!")
  48. return None
  49. except requests.ConnectionError:
  50. print(f"\n❌ 无法连接到服务器: {BASE_URL}")
  51. print("请确保:")
  52. print(" 1. 服务器地址正确")
  53. print(" 2. 服务已启动")
  54. print(" 3. 网络连接正常")
  55. return None
  56. except Exception as e:
  57. print(f"\n❌ 错误: {str(e)}")
  58. return None
  59. def test_create_metadata(name_zh, data_type, description):
  60. """测试创建元数据接口"""
  61. print_section(f"步骤 2: 创建元数据")
  62. url = f"{BASE_URL}/api/meta/node/add"
  63. payload = {
  64. "name_zh": name_zh,
  65. "data_type": data_type,
  66. "describe": description,
  67. "source": "workflow-test",
  68. "status": True
  69. }
  70. print(f"请求 URL: {url}")
  71. print(f"请求数据:")
  72. print(json.dumps(payload, indent=2, ensure_ascii=False))
  73. try:
  74. response = requests.post(
  75. url,
  76. json=payload,
  77. headers={"Content-Type": "application/json"},
  78. timeout=10
  79. )
  80. print(f"\n响应状态码: {response.status_code}")
  81. print(f"响应内容:")
  82. print(json.dumps(response.json(), indent=2, ensure_ascii=False))
  83. if response.status_code == 200:
  84. data = response.json()
  85. if data.get("code") == 200:
  86. print(f"\n✅ 创建成功!元数据 '{name_zh}' 已创建")
  87. return True
  88. else:
  89. print(f"\n❌ 创建失败: {data.get('msg', '未知错误')}")
  90. return False
  91. else:
  92. print(f"\n❌ HTTP 错误: {response.status_code}")
  93. return False
  94. except requests.Timeout:
  95. print("\n❌ 请求超时!")
  96. return False
  97. except requests.ConnectionError:
  98. print(f"\n❌ 无法连接到服务器: {BASE_URL}")
  99. return False
  100. except Exception as e:
  101. print(f"\n❌ 错误: {str(e)}")
  102. return False
  103. def main():
  104. """主测试流程"""
  105. print("\n" + "🚀 " * 30)
  106. print(" 元数据工作流测试")
  107. print("🚀 " * 30)
  108. print(f"\n测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  109. print(f"服务器: {BASE_URL}")
  110. print(f"\n测试元数据信息:")
  111. print(f" - 中文名: {TEST_METADATA['name_zh']}")
  112. print(f" - 类型: {TEST_METADATA['data_type']}")
  113. print(f" - 描述: {TEST_METADATA['description']}")
  114. # 步骤 1: 检查元数据是否存在
  115. exists = test_check_metadata(TEST_METADATA['name_zh'])
  116. if exists is None:
  117. print("\n❌ 检查接口失败,无法继续测试")
  118. return
  119. # 步骤 2: 根据检查结果决定是否创建
  120. if exists:
  121. print_section("测试结果")
  122. print(f"⚠️ 元数据 '{TEST_METADATA['name_zh']}' 已存在,无需创建")
  123. print("\n如果需要测试创建功能,请:")
  124. print(" 1. 修改 TEST_METADATA['name_zh'] 为不存在的名称")
  125. print(" 2. 或者先删除现有的元数据")
  126. else:
  127. print(f"\n元数据 '{TEST_METADATA['name_zh']}' 不存在,继续创建...")
  128. success = test_create_metadata(
  129. TEST_METADATA['name_zh'],
  130. TEST_METADATA['data_type'],
  131. TEST_METADATA['description']
  132. )
  133. if success:
  134. # 再次检查确认创建成功
  135. print_section("步骤 3: 确认创建结果")
  136. exists_after = test_check_metadata(TEST_METADATA['name_zh'])
  137. if exists_after:
  138. print_section("✅ 测试结果")
  139. print("🎉 所有测试通过!")
  140. print(f" - 检查接口正常 ✅")
  141. print(f" - 创建接口正常 ✅")
  142. print(f" - 元数据 '{TEST_METADATA['name_zh']}' 已成功创建 ✅")
  143. else:
  144. print_section("⚠️ 测试结果")
  145. print("创建接口返回成功,但再次检查时未找到元数据")
  146. print("可能原因:数据未同步或检查接口有问题")
  147. else:
  148. print_section("❌ 测试结果")
  149. print("创建元数据失败")
  150. print("\n" + "🏁 " * 30)
  151. print(" 测试完成")
  152. print("🏁 " * 30 + "\n")
  153. if __name__ == "__main__":
  154. try:
  155. main()
  156. except KeyboardInterrupt:
  157. print("\n\n⚠️ 测试被用户中断")
  158. except Exception as e:
  159. print(f"\n\n❌ 测试过程中发生错误: {str(e)}")
  160. import traceback
  161. traceback.print_exc()