| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- """
- 测试数据指标删除 API 接口
- """
- import requests
- import json
- # 配置
- BASE_URL = "http://localhost:5500"
- API_ENDPOINT = f"{BASE_URL}/api/data_metric/delete"
- def test_delete_metric(metric_id):
- """
- 测试删除指定ID的数据指标
-
- Args:
- metric_id: 指标节点ID
- """
- print(f"\n{'='*80}")
- print(f"测试删除数据指标: ID={metric_id}")
- print(f"{'='*80}")
-
- # 准备请求数据
- payload = {"id": metric_id}
- headers = {"Content-Type": "application/json"}
-
- try:
- # 发送删除请求
- print(f"\n发送请求: POST {API_ENDPOINT}")
- print(f"请求数据: {json.dumps(payload, ensure_ascii=False)}")
-
- response = requests.post(API_ENDPOINT, json=payload, headers=headers)
-
- print(f"\n响应状态码: {response.status_code}")
- print(f"响应内容:")
-
- # 解析响应
- result = response.json()
- print(json.dumps(result, ensure_ascii=False, indent=2))
-
- # 判断结果
- if result.get("code") == 200:
- print(f"\n✅ 删除成功!")
- print(f" 消息: {result['data']['message']}")
- return True
- else:
- print(f"\n❌ 删除失败!")
- print(f" 错误: {result.get('msg', '未知错误')}")
- return False
-
- except requests.exceptions.ConnectionError:
- print(f"\n❌ 连接失败: 无法连接到服务器 {BASE_URL}")
- print(f" 请确保 Flask 应用正在运行")
- return False
- except Exception as e:
- print(f"\n❌ 请求异常: {str(e)}")
- return False
- def test_invalid_cases():
- """测试各种异常情况"""
- print(f"\n{'='*80}")
- print("测试异常情况")
- print(f"{'='*80}")
-
- test_cases = [
- {
- "name": "缺少ID参数",
- "payload": {},
- "expected": "指标ID不能为空"
- },
- {
- "name": "无效的ID类型(字符串)",
- "payload": {"id": "invalid"},
- "expected": "指标ID必须为整数"
- },
- {
- "name": "不存在的节点ID",
- "payload": {"id": 999999},
- "expected": "数据指标节点不存在"
- }
- ]
-
- for i, test_case in enumerate(test_cases, 1):
- print(f"\n--- 测试 {i}: {test_case['name']} ---")
- try:
- response = requests.post(
- API_ENDPOINT,
- json=test_case['payload'],
- headers={"Content-Type": "application/json"}
- )
- result = response.json()
-
- print(f"请求数据: {json.dumps(test_case['payload'], ensure_ascii=False)}")
- print(f"响应: {json.dumps(result, ensure_ascii=False, indent=2)}")
-
- # 检查是否包含预期错误信息
- msg = str(result.get('msg', ''))
- if test_case['expected'] in msg:
- print(f"✅ 测试通过 - 返回了预期的错误信息")
- else:
- print(f"⚠️ 测试未完全匹配 - 预期包含: {test_case['expected']}")
-
- except Exception as e:
- print(f"❌ 测试异常: {str(e)}")
- def test_full_workflow():
- """测试完整的工作流:查询 -> 删除 -> 验证"""
- print(f"\n{'='*80}")
- print("完整工作流测试")
- print(f"{'='*80}")
-
- # 注意:这里需要先有一个真实的指标ID
- # 实际使用时,应该先创建一个测试指标,然后删除它
-
- print("\n提示: 完整工作流测试需要以下步骤:")
- print("1. 创建一个测试数据指标")
- print("2. 记录返回的指标ID")
- print("3. 使用该ID调用删除接口")
- print("4. 验证指标已被删除")
- print("\n当前示例仅演示删除步骤,请根据实际情况修改测试ID")
- def main():
- """主测试函数"""
- print(f"\n{'#'*80}")
- print("# 数据指标删除 API 测试")
- print(f"# 服务器: {BASE_URL}")
- print(f"# 接口: {API_ENDPOINT}")
- print(f"{'#'*80}")
-
- # 测试1: 尝试删除一个可能存在的节点
- # 注意:请根据实际情况修改这个ID
- test_metric_id = 1378
-
- print("\n⚠️ 注意: 请确保测试ID是可以被删除的测试数据!")
- print(f"⚠️ 当前测试ID: {test_metric_id}")
-
- user_input = input("\n是否继续测试删除操作? (y/n): ")
- if user_input.lower() == 'y':
- test_delete_metric(test_metric_id)
- else:
- print("已跳过删除测试")
-
- # 测试2: 测试异常情况
- test_invalid_cases()
-
- # 测试3: 完整工作流说明
- test_full_workflow()
-
- print(f"\n{'='*80}")
- print("测试完成!")
- print(f"{'='*80}\n")
- if __name__ == "__main__":
- try:
- main()
- except KeyboardInterrupt:
- print("\n\n测试被用户中断")
- except Exception as e:
- print(f"\n\n测试失败: {str(e)}")
- import traceback
- traceback.print_exc()
|