""" 测试数据服务 API 接口 用于验证 data_service 功能是否正常工作 """ import json import sys from pathlib import Path # 设置控制台编码为 UTF-8(Windows) if sys.platform == "win32": import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8") import requests # 添加项目根目录到路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) # 生产环境 API 地址 BASE_URL = "https://company.citupro.com:18183/api/dataservice" def test_get_products(): """测试获取数据产品列表""" print("\n[测试 1] 获取数据产品列表") print("-" * 60) url = f"{BASE_URL}/products" params = {"page": 1, "page_size": 20} try: response = requests.get(url, params=params, timeout=10) response.raise_for_status() data = response.json() print(f"状态码: {response.status_code}") print(f"响应: {json.dumps(data, ensure_ascii=False, indent=2)}") if data.get("code") == 200: products = data.get("data", {}).get("list", []) print(f"\n[成功] 成功获取 {len(products)} 个数据产品") return products else: print(f"[失败] {data.get('message')}") return [] except Exception as e: print(f"[错误] 请求失败: {e}") return [] def test_get_product_detail(product_id: int): """测试获取数据产品详情""" print(f"\n[测试 2] 获取数据产品详情 (ID: {product_id})") print("-" * 60) url = f"{BASE_URL}/products/{product_id}" try: response = requests.get(url, timeout=10) response.raise_for_status() data = response.json() print(f"状态码: {response.status_code}") print(f"响应: {json.dumps(data, ensure_ascii=False, indent=2)}") if data.get("code") == 200: product = data.get("data", {}) print(f"\n[成功] 成功获取产品: {product.get('product_name')}") return product else: print(f"[失败] {data.get('message')}") return None except Exception as e: print(f"[错误] 请求失败: {e}") return None def test_get_product_preview(product_id: int, limit: int = 10): """测试获取数据预览""" print(f"\n[测试 3] 获取数据预览 (ID: {product_id}, 限制: {limit} 条)") print("-" * 60) url = f"{BASE_URL}/products/{product_id}/preview" params = {"limit": limit} try: response = requests.get(url, params=params, timeout=30) response.raise_for_status() data = response.json() print(f"状态码: {response.status_code}") if data.get("code") == 200: preview_data = data.get("data", {}) columns = preview_data.get("columns", []) rows = preview_data.get("data", []) total_count = preview_data.get("total_count", 0) print(f"\n[成功] 成功获取数据预览") print(f" 总记录数: {total_count}") print(f" 预览条数: {len(rows)}") print(f" 列数: {len(columns)}") print(f"\n 列信息:") for col in columns[:5]: # 只显示前5列 print(f" - {col['name']} ({col['type']})") if rows: print(f"\n 前3条数据示例:") for i, row in enumerate(rows[:3], 1): row_str = ", ".join( f"{k}={v}" for k, v in list(row.items())[:3] ) print(f" {i}. {row_str}") return preview_data else: print(f"[失败] {data.get('message')}") return None except Exception as e: print(f"[错误] 请求失败: {e}") return None def test_download_excel(product_id: int): """测试下载 Excel 文件""" print(f"\n[测试 4] 下载 Excel 文件 (ID: {product_id})") print("-" * 60) url = f"{BASE_URL}/products/{product_id}/download" params = {"limit": 50} try: response = requests.get(url, params=params, timeout=60, stream=True) response.raise_for_status() if response.headers.get("content-type", "").startswith( "application/vnd.openxmlformats" ): filename = response.headers.get( "content-disposition", "" ).split("filename=")[-1].strip('"') # 保存文件 output_dir = project_root / "test_output" output_dir.mkdir(exist_ok=True) filepath = output_dir / filename with open(filepath, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print(f"\n[成功] 成功下载 Excel 文件") print(f" 文件名: {filename}") print(f" 保存路径: {filepath}") print(f" 文件大小: {filepath.stat().st_size} 字节") return True else: print(f"[失败] 响应不是 Excel 文件") print(f" Content-Type: {response.headers.get('content-type')}") return False except Exception as e: print(f"[错误] 请求失败: {e}") return False def test_mark_as_viewed(product_id: int): """测试标记为已查看""" print(f"\n[测试 5] 标记为已查看 (ID: {product_id})") print("-" * 60) url = f"{BASE_URL}/products/{product_id}/viewed" try: response = requests.post(url, timeout=10) response.raise_for_status() data = response.json() print(f"状态码: {response.status_code}") print(f"响应: {json.dumps(data, ensure_ascii=False, indent=2)}") if data.get("code") == 200: product = data.get("data", {}) has_new_data = product.get("has_new_data", False) print(f"\n[成功] 标记成功") print(f" 是否有新数据: {has_new_data}") return True else: print(f"[失败] {data.get('message')}") return False except Exception as e: print(f"[错误] 请求失败: {e}") return False def test_refresh_stats(product_id: int): """测试刷新统计信息""" print(f"\n[测试 6] 刷新统计信息 (ID: {product_id})") print("-" * 60) url = f"{BASE_URL}/products/{product_id}/refresh" try: response = requests.post(url, timeout=30) response.raise_for_status() data = response.json() print(f"状态码: {response.status_code}") if data.get("code") == 200: product = data.get("data", {}) print(f"\n[成功] 刷新成功") print(f" 记录数: {product.get('record_count')}") print(f" 列数: {product.get('column_count')}") return True else: print(f"[失败] {data.get('message')}") return False except Exception as e: print(f"[错误] 请求失败: {e}") return False def main(): """主函数""" print("=" * 60) print("数据服务 API 接口测试") print("=" * 60) print(f"\nAPI 地址: {BASE_URL}") # 测试1: 获取产品列表 products = test_get_products() if not products: print("\n[错误] 无法获取数据产品列表,测试终止") return # 选择第一个产品进行详细测试 if products: product = products[0] product_id = product.get("id") if product_id: # 测试2: 获取产品详情 test_get_product_detail(product_id) # 测试3: 获取数据预览 test_get_product_preview(product_id, limit=10) # 测试4: 下载 Excel (可选,可能会比较慢) # test_download_excel(product_id) # 测试5: 标记为已查看 test_mark_as_viewed(product_id) # 测试6: 刷新统计信息 test_refresh_stats(product_id) print("\n" + "=" * 60) print("测试完成") print("=" * 60) if __name__ == "__main__": main()