""" 准备数据服务功能的测试数据 包括创建测试数据表和注册数据产品 """ import os import sys from datetime import datetime from pathlib import Path # 设置控制台编码为 UTF-8(Windows) if sys.platform == "win32": import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8") # 添加项目根目录到路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) import psycopg2 from app.config.config import ProductionConfig def get_db_connection(): """获取数据库连接""" config = ProductionConfig() db_uri = config.SQLALCHEMY_DATABASE_URI # 解析数据库连接URI uri_parts = db_uri.replace("postgresql://", "").split("@") if len(uri_parts) != 2: raise ValueError(f"无效的数据库URI格式: {db_uri}") user_pass = uri_parts[0].split(":") username = user_pass[0] password = user_pass[1] if len(user_pass) > 1 else "" host_db = uri_parts[1].split("/") if len(host_db) != 2: raise ValueError(f"无效的数据库URI格式: {db_uri}") host_port = host_db[0].split(":") hostname = host_port[0] port = int(host_port[1]) if len(host_port) > 1 else 5432 database = host_db[1] return psycopg2.connect( host=hostname, port=port, database=database, user=username, password=password, ) def create_test_tables(conn): """创建测试数据表""" cursor = conn.cursor() print("\n[1/3] 创建测试数据表...") # 直接使用代码创建表(更可靠) tables_created = [] # 表1: 销售数据表 try: cursor.execute("DROP TABLE IF EXISTS test_sales_data CASCADE") conn.commit() cursor.execute(""" CREATE TABLE test_sales_data ( id SERIAL PRIMARY KEY, order_id VARCHAR(50) NOT NULL, customer_name VARCHAR(100), product_name VARCHAR(200), quantity INTEGER, unit_price DECIMAL(10, 2), total_amount DECIMAL(10, 2), order_date DATE, region VARCHAR(50), status VARCHAR(20), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute("COMMENT ON TABLE test_sales_data IS '测试销售数据表'") conn.commit() tables_created.append("test_sales_data") print(" ✓ test_sales_data 表创建成功") except Exception as e: print(f" ✗ test_sales_data 表创建失败: {e}") conn.rollback() # 表2: 用户统计表 try: cursor.execute("DROP TABLE IF EXISTS test_user_statistics CASCADE") conn.commit() cursor.execute(""" CREATE TABLE test_user_statistics ( id SERIAL PRIMARY KEY, user_id VARCHAR(50) NOT NULL, username VARCHAR(100), email VARCHAR(200), registration_date DATE, last_login_date DATE, total_orders INTEGER DEFAULT 0, total_amount DECIMAL(10, 2) DEFAULT 0, user_level VARCHAR(20), is_active BOOLEAN DEFAULT TRUE, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute("COMMENT ON TABLE test_user_statistics IS '测试用户统计表'") conn.commit() tables_created.append("test_user_statistics") print(" ✓ test_user_statistics 表创建成功") except Exception as e: print(f" ✗ test_user_statistics 表创建失败: {e}") conn.rollback() # 表3: 产品库存表 try: cursor.execute("DROP TABLE IF EXISTS test_product_inventory CASCADE") conn.commit() cursor.execute(""" CREATE TABLE test_product_inventory ( id SERIAL PRIMARY KEY, product_code VARCHAR(50) UNIQUE NOT NULL, product_name VARCHAR(200), category VARCHAR(100), current_stock INTEGER, min_stock INTEGER, max_stock INTEGER, unit_price DECIMAL(10, 2), supplier VARCHAR(200), last_restock_date DATE, status VARCHAR(20), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute("COMMENT ON TABLE test_product_inventory IS '测试产品库存表'") conn.commit() tables_created.append("test_product_inventory") print(" ✓ test_product_inventory 表创建成功") except Exception as e: print(f" ✗ test_product_inventory 表创建失败: {e}") conn.rollback() else: # 如果SQL文件不存在,使用代码创建 print(" SQL文件不存在,使用代码创建表...") # 表1: 销售数据表 try: cursor.execute("DROP TABLE IF EXISTS test_sales_data CASCADE") cursor.execute(""" CREATE TABLE test_sales_data ( id SERIAL PRIMARY KEY, order_id VARCHAR(50) NOT NULL, customer_name VARCHAR(100), product_name VARCHAR(200), quantity INTEGER, unit_price DECIMAL(10, 2), total_amount DECIMAL(10, 2), order_date DATE, region VARCHAR(50), status VARCHAR(20), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute("COMMENT ON TABLE test_sales_data IS '测试销售数据表'") print(" ✓ test_sales_data 表创建成功") except Exception as e: print(f" ✗ test_sales_data 表创建失败: {e}") conn.rollback() # 表2: 用户统计表 try: cursor.execute("DROP TABLE IF EXISTS test_user_statistics CASCADE") cursor.execute(""" CREATE TABLE test_user_statistics ( id SERIAL PRIMARY KEY, user_id VARCHAR(50) NOT NULL, username VARCHAR(100), email VARCHAR(200), registration_date DATE, last_login_date DATE, total_orders INTEGER DEFAULT 0, total_amount DECIMAL(10, 2) DEFAULT 0, user_level VARCHAR(20), is_active BOOLEAN DEFAULT TRUE, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute("COMMENT ON TABLE test_user_statistics IS '测试用户统计表'") print(" ✓ test_user_statistics 表创建成功") except Exception as e: print(f" ✗ test_user_statistics 表创建失败: {e}") conn.rollback() # 表3: 产品库存表 try: cursor.execute("DROP TABLE IF EXISTS test_product_inventory CASCADE") cursor.execute(""" CREATE TABLE test_product_inventory ( id SERIAL PRIMARY KEY, product_code VARCHAR(50) UNIQUE NOT NULL, product_name VARCHAR(200), category VARCHAR(100), current_stock INTEGER, min_stock INTEGER, max_stock INTEGER, unit_price DECIMAL(10, 2), supplier VARCHAR(200), last_restock_date DATE, status VARCHAR(20), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute("COMMENT ON TABLE test_product_inventory IS '测试产品库存表'") print(" ✓ test_product_inventory 表创建成功") except Exception as e: print(f" ✗ test_product_inventory 表创建失败: {e}") conn.rollback() conn.commit() # 验证表是否创建成功 cursor.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_name IN ('test_sales_data', 'test_user_statistics', 'test_product_inventory') ORDER BY table_name """) created_tables = [row[0] for row in cursor.fetchall()] print(f"\n[成功] 测试数据表创建完成") print(f" 已创建表: {', '.join(created_tables) if created_tables else '无'}") if not created_tables: print(" [错误] 没有成功创建任何表,请检查错误信息") return False return True def insert_test_data(conn): """插入测试数据""" cursor = conn.cursor() print("\n[2/3] 插入测试数据...") # 插入销售数据 (250条) sales_data = [] regions = ["华东", "华南", "华北", "西南", "西北"] statuses = ["已完成", "处理中", "已取消"] products = ["笔记本电脑", "台式机", "显示器", "键盘", "鼠标", "耳机", "音响", "摄像头"] for i in range(250): order_date = datetime(2024, 1, 1).replace(day=(i % 28) + 1, month=(i // 28) % 12 + 1) quantity = (i % 10) + 1 unit_price = round(100.0 + (i % 5000), 2) total_amount = quantity * unit_price sales_data.append(( f"ORD{10000 + i}", f"客户{chr(65 + (i % 26))}{i}", products[i % len(products)], quantity, unit_price, total_amount, order_date, regions[i % len(regions)], statuses[i % len(statuses)], )) cursor.executemany(""" INSERT INTO test_sales_data (order_id, customer_name, product_name, quantity, unit_price, total_amount, order_date, region, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """, sales_data) # 插入用户统计数据 (200条) user_data = [] levels = ["普通", "银卡", "金卡", "钻石"] for i in range(200): reg_date = datetime(2024, 1, 1).replace(day=(i % 28) + 1, month=(i // 28) % 12 + 1) login_date = reg_date.replace(day=(reg_date.day + (i % 10)) % 28 + 1) user_data.append(( f"USER{1000 + i}", f"user{i}", f"user{i}@example.com", reg_date, login_date, (i % 50) + 1, round(1000.0 + (i % 50000), 2), levels[i % len(levels)], (i % 10) != 0, # 每10个用户有一个不活跃 )) cursor.executemany(""" INSERT INTO test_user_statistics (user_id, username, email, registration_date, last_login_date, total_orders, total_amount, user_level, is_active) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """, user_data) # 插入产品库存数据 (150条) inventory_data = [] categories = ["电子产品", "办公用品", "家具", "服装", "食品"] suppliers = ["供应商A", "供应商B", "供应商C", "供应商D"] for i in range(150): current_stock = (i % 500) + 10 min_stock = 50 max_stock = 1000 inventory_data.append(( f"PROD{10000 + i}", f"产品{i}", categories[i % len(categories)], current_stock, min_stock, max_stock, round(50.0 + (i % 500), 2), suppliers[i % len(suppliers)], datetime(2024, 1, 1).replace(day=(i % 28) + 1), "正常" if current_stock > min_stock else "缺货", )) cursor.executemany(""" INSERT INTO test_product_inventory (product_code, product_name, category, current_stock, min_stock, max_stock, unit_price, supplier, last_restock_date, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, inventory_data) conn.commit() print(f"[成功] 测试数据插入完成:") print(f" - test_sales_data: 250 条") print(f" - test_user_statistics: 200 条") print(f" - test_product_inventory: 150 条") def register_data_products(conn): """注册数据产品到 data_products 表""" cursor = conn.cursor() print("\n[3/3] 注册数据产品...") products = [ { "product_name": "销售数据分析", "product_name_en": "test_sales_data", "target_table": "test_sales_data", "target_schema": "public", "description": "销售订单数据分析,包含订单详情、客户信息、产品信息等", "source_dataflow_id": 1001, "source_dataflow_name": "销售数据加工流程", }, { "product_name": "用户行为统计", "product_name_en": "test_user_statistics", "target_table": "test_user_statistics", "target_schema": "public", "description": "用户注册、登录、订单统计等行为数据分析", "source_dataflow_id": 1002, "source_dataflow_name": "用户数据加工流程", }, { "product_name": "产品库存管理", "product_name_en": "test_product_inventory", "target_table": "test_product_inventory", "target_schema": "public", "description": "产品库存信息,包括库存数量、价格、供应商等信息", "source_dataflow_id": 1003, "source_dataflow_name": "库存数据加工流程", }, ] for product in products: # 先检查表是否存在 cursor.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = %s AND table_name = %s ) """, (product['target_schema'], product['target_table'])) table_exists = cursor.fetchone()[0] if not table_exists: print(f" [跳过] 表 {product['target_table']} 不存在,跳过注册") continue # 获取表的记录数和列数 table_name = product['target_table'] cursor.execute(f'SELECT COUNT(*) FROM "{table_name}"') record_count = cursor.fetchone()[0] cursor.execute(""" SELECT COUNT(*) FROM information_schema.columns WHERE table_schema = %s AND table_name = %s """, (product['target_schema'], product['target_table'])) column_count = cursor.fetchone()[0] # 检查是否已存在 cursor.execute(""" SELECT id FROM data_products WHERE target_schema = %s AND target_table = %s """, (product['target_schema'], product['target_table'])) existing = cursor.fetchone() if existing: # 更新现有记录 cursor.execute(""" UPDATE data_products SET product_name = %s, product_name_en = %s, description = %s, source_dataflow_id = %s, source_dataflow_name = %s, record_count = %s, column_count = %s, last_updated_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = %s """, ( product['product_name'], product['product_name_en'], product['description'], product['source_dataflow_id'], product['source_dataflow_name'], record_count, column_count, existing[0], )) print(f" [更新] {product['product_name']} (ID: {existing[0]})") else: # 插入新记录 cursor.execute(""" INSERT INTO data_products (product_name, product_name_en, description, source_dataflow_id, source_dataflow_name, target_table, target_schema, record_count, column_count, last_updated_at, created_by, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, 'test_script', 'active') RETURNING id """, ( product['product_name'], product['product_name_en'], product['description'], product['source_dataflow_id'], product['source_dataflow_name'], product['target_table'], product['target_schema'], record_count, column_count, )) product_id = cursor.fetchone()[0] print(f" [创建] {product['product_name']} (ID: {product_id}, 记录数: {record_count})") conn.commit() print("[成功] 数据产品注册完成") def main(): """主函数""" print("=" * 60) print("准备数据服务功能测试数据") print("=" * 60) env = os.environ.get("FLASK_ENV", "production") print(f"\n当前环境: {env}") if env != "production": response = input("\n警告: 当前不是生产环境,是否继续?(yes/no): ") if response.lower() != "yes": print("已取消操作") return try: conn = get_db_connection() conn.autocommit = False try: create_test_tables(conn) insert_test_data(conn) register_data_products(conn) print("\n" + "=" * 60) print("[完成] 测试数据准备完成!") print("=" * 60) print("\n可以开始测试以下 API 接口:") print(" 1. GET /api/dataservice/products - 获取数据产品列表") print(" 2. GET /api/dataservice/products/{id} - 获取产品详情") print(" 3. GET /api/dataservice/products/{id}/preview - 获取数据预览") print(" 4. GET /api/dataservice/products/{id}/download - 下载Excel") print(" 5. POST /api/dataservice/products/{id}/viewed - 标记已查看") print(" 6. POST /api/dataservice/products/{id}/refresh - 刷新统计信息") finally: conn.close() except Exception as e: print(f"\n[错误] 操作失败: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()