| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518 |
- """
- 准备数据服务功能的测试数据
- 包括创建测试数据表和注册数据产品
- """
- import os
- import sys
- from datetime import datetime
- from pathlib import Path
- # 设置控制台编码为 UTF-8(Windows)
- if sys.platform == "win32":
- import io
- sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
- sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
- # 添加项目根目录到路径
- project_root = Path(__file__).parent.parent
- sys.path.insert(0, str(project_root))
- import psycopg2
- from app.config.config import ProductionConfig
- def get_db_connection():
- """获取数据库连接"""
- config = ProductionConfig()
- db_uri = config.SQLALCHEMY_DATABASE_URI
- # 解析数据库连接URI
- uri_parts = db_uri.replace("postgresql://", "").split("@")
- if len(uri_parts) != 2:
- raise ValueError(f"无效的数据库URI格式: {db_uri}")
- user_pass = uri_parts[0].split(":")
- username = user_pass[0]
- password = user_pass[1] if len(user_pass) > 1 else ""
- host_db = uri_parts[1].split("/")
- if len(host_db) != 2:
- raise ValueError(f"无效的数据库URI格式: {db_uri}")
- host_port = host_db[0].split(":")
- hostname = host_port[0]
- port = int(host_port[1]) if len(host_port) > 1 else 5432
- database = host_db[1]
- return psycopg2.connect(
- host=hostname,
- port=port,
- database=database,
- user=username,
- password=password,
- )
- def create_test_tables(conn):
- """创建测试数据表"""
- cursor = conn.cursor()
- print("\n[1/3] 创建测试数据表...")
- # 直接使用代码创建表(更可靠)
- tables_created = []
-
- # 表1: 销售数据表
- try:
- cursor.execute("DROP TABLE IF EXISTS test_sales_data CASCADE")
- conn.commit()
-
- cursor.execute("""
- CREATE TABLE test_sales_data (
- id SERIAL PRIMARY KEY,
- order_id VARCHAR(50) NOT NULL,
- customer_name VARCHAR(100),
- product_name VARCHAR(200),
- quantity INTEGER,
- unit_price DECIMAL(10, 2),
- total_amount DECIMAL(10, 2),
- order_date DATE,
- region VARCHAR(50),
- status VARCHAR(20),
- create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- """)
- cursor.execute("COMMENT ON TABLE test_sales_data IS '测试销售数据表'")
- conn.commit()
- tables_created.append("test_sales_data")
- print(" ✓ test_sales_data 表创建成功")
- except Exception as e:
- print(f" ✗ test_sales_data 表创建失败: {e}")
- conn.rollback()
- # 表2: 用户统计表
- try:
- cursor.execute("DROP TABLE IF EXISTS test_user_statistics CASCADE")
- conn.commit()
-
- cursor.execute("""
- CREATE TABLE test_user_statistics (
- id SERIAL PRIMARY KEY,
- user_id VARCHAR(50) NOT NULL,
- username VARCHAR(100),
- email VARCHAR(200),
- registration_date DATE,
- last_login_date DATE,
- total_orders INTEGER DEFAULT 0,
- total_amount DECIMAL(10, 2) DEFAULT 0,
- user_level VARCHAR(20),
- is_active BOOLEAN DEFAULT TRUE,
- create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- """)
- cursor.execute("COMMENT ON TABLE test_user_statistics IS '测试用户统计表'")
- conn.commit()
- tables_created.append("test_user_statistics")
- print(" ✓ test_user_statistics 表创建成功")
- except Exception as e:
- print(f" ✗ test_user_statistics 表创建失败: {e}")
- conn.rollback()
- # 表3: 产品库存表
- try:
- cursor.execute("DROP TABLE IF EXISTS test_product_inventory CASCADE")
- conn.commit()
-
- cursor.execute("""
- CREATE TABLE test_product_inventory (
- id SERIAL PRIMARY KEY,
- product_code VARCHAR(50) UNIQUE NOT NULL,
- product_name VARCHAR(200),
- category VARCHAR(100),
- current_stock INTEGER,
- min_stock INTEGER,
- max_stock INTEGER,
- unit_price DECIMAL(10, 2),
- supplier VARCHAR(200),
- last_restock_date DATE,
- status VARCHAR(20),
- create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- """)
- cursor.execute("COMMENT ON TABLE test_product_inventory IS '测试产品库存表'")
- conn.commit()
- tables_created.append("test_product_inventory")
- print(" ✓ test_product_inventory 表创建成功")
- except Exception as e:
- print(f" ✗ test_product_inventory 表创建失败: {e}")
- conn.rollback()
- else:
- # 如果SQL文件不存在,使用代码创建
- print(" SQL文件不存在,使用代码创建表...")
-
- # 表1: 销售数据表
- try:
- cursor.execute("DROP TABLE IF EXISTS test_sales_data CASCADE")
- cursor.execute("""
- CREATE TABLE test_sales_data (
- id SERIAL PRIMARY KEY,
- order_id VARCHAR(50) NOT NULL,
- customer_name VARCHAR(100),
- product_name VARCHAR(200),
- quantity INTEGER,
- unit_price DECIMAL(10, 2),
- total_amount DECIMAL(10, 2),
- order_date DATE,
- region VARCHAR(50),
- status VARCHAR(20),
- create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- """)
- cursor.execute("COMMENT ON TABLE test_sales_data IS '测试销售数据表'")
- print(" ✓ test_sales_data 表创建成功")
- except Exception as e:
- print(f" ✗ test_sales_data 表创建失败: {e}")
- conn.rollback()
- # 表2: 用户统计表
- try:
- cursor.execute("DROP TABLE IF EXISTS test_user_statistics CASCADE")
- cursor.execute("""
- CREATE TABLE test_user_statistics (
- id SERIAL PRIMARY KEY,
- user_id VARCHAR(50) NOT NULL,
- username VARCHAR(100),
- email VARCHAR(200),
- registration_date DATE,
- last_login_date DATE,
- total_orders INTEGER DEFAULT 0,
- total_amount DECIMAL(10, 2) DEFAULT 0,
- user_level VARCHAR(20),
- is_active BOOLEAN DEFAULT TRUE,
- create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- """)
- cursor.execute("COMMENT ON TABLE test_user_statistics IS '测试用户统计表'")
- print(" ✓ test_user_statistics 表创建成功")
- except Exception as e:
- print(f" ✗ test_user_statistics 表创建失败: {e}")
- conn.rollback()
- # 表3: 产品库存表
- try:
- cursor.execute("DROP TABLE IF EXISTS test_product_inventory CASCADE")
- cursor.execute("""
- CREATE TABLE test_product_inventory (
- id SERIAL PRIMARY KEY,
- product_code VARCHAR(50) UNIQUE NOT NULL,
- product_name VARCHAR(200),
- category VARCHAR(100),
- current_stock INTEGER,
- min_stock INTEGER,
- max_stock INTEGER,
- unit_price DECIMAL(10, 2),
- supplier VARCHAR(200),
- last_restock_date DATE,
- status VARCHAR(20),
- create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- """)
- cursor.execute("COMMENT ON TABLE test_product_inventory IS '测试产品库存表'")
- print(" ✓ test_product_inventory 表创建成功")
- except Exception as e:
- print(f" ✗ test_product_inventory 表创建失败: {e}")
- conn.rollback()
-
- conn.commit()
- # 验证表是否创建成功
- cursor.execute("""
- SELECT table_name FROM information_schema.tables
- WHERE table_schema = 'public'
- AND table_name IN ('test_sales_data', 'test_user_statistics', 'test_product_inventory')
- ORDER BY table_name
- """)
- created_tables = [row[0] for row in cursor.fetchall()]
- print(f"\n[成功] 测试数据表创建完成")
- print(f" 已创建表: {', '.join(created_tables) if created_tables else '无'}")
-
- if not created_tables:
- print(" [错误] 没有成功创建任何表,请检查错误信息")
- return False
-
- return True
- def insert_test_data(conn):
- """插入测试数据"""
- cursor = conn.cursor()
- print("\n[2/3] 插入测试数据...")
- # 插入销售数据 (250条)
- sales_data = []
- regions = ["华东", "华南", "华北", "西南", "西北"]
- statuses = ["已完成", "处理中", "已取消"]
- products = ["笔记本电脑", "台式机", "显示器", "键盘", "鼠标", "耳机", "音响", "摄像头"]
- for i in range(250):
- order_date = datetime(2024, 1, 1).replace(day=(i % 28) + 1, month=(i // 28) % 12 + 1)
- quantity = (i % 10) + 1
- unit_price = round(100.0 + (i % 5000), 2)
- total_amount = quantity * unit_price
- sales_data.append((
- f"ORD{10000 + i}",
- f"客户{chr(65 + (i % 26))}{i}",
- products[i % len(products)],
- quantity,
- unit_price,
- total_amount,
- order_date,
- regions[i % len(regions)],
- statuses[i % len(statuses)],
- ))
- cursor.executemany("""
- INSERT INTO test_sales_data
- (order_id, customer_name, product_name, quantity, unit_price,
- total_amount, order_date, region, status)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
- """, sales_data)
- # 插入用户统计数据 (200条)
- user_data = []
- levels = ["普通", "银卡", "金卡", "钻石"]
- for i in range(200):
- reg_date = datetime(2024, 1, 1).replace(day=(i % 28) + 1, month=(i // 28) % 12 + 1)
- login_date = reg_date.replace(day=(reg_date.day + (i % 10)) % 28 + 1)
- user_data.append((
- f"USER{1000 + i}",
- f"user{i}",
- f"user{i}@example.com",
- reg_date,
- login_date,
- (i % 50) + 1,
- round(1000.0 + (i % 50000), 2),
- levels[i % len(levels)],
- (i % 10) != 0, # 每10个用户有一个不活跃
- ))
- cursor.executemany("""
- INSERT INTO test_user_statistics
- (user_id, username, email, registration_date, last_login_date,
- total_orders, total_amount, user_level, is_active)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
- """, user_data)
- # 插入产品库存数据 (150条)
- inventory_data = []
- categories = ["电子产品", "办公用品", "家具", "服装", "食品"]
- suppliers = ["供应商A", "供应商B", "供应商C", "供应商D"]
- for i in range(150):
- current_stock = (i % 500) + 10
- min_stock = 50
- max_stock = 1000
- inventory_data.append((
- f"PROD{10000 + i}",
- f"产品{i}",
- categories[i % len(categories)],
- current_stock,
- min_stock,
- max_stock,
- round(50.0 + (i % 500), 2),
- suppliers[i % len(suppliers)],
- datetime(2024, 1, 1).replace(day=(i % 28) + 1),
- "正常" if current_stock > min_stock else "缺货",
- ))
- cursor.executemany("""
- INSERT INTO test_product_inventory
- (product_code, product_name, category, current_stock, min_stock,
- max_stock, unit_price, supplier, last_restock_date, status)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """, inventory_data)
- conn.commit()
- print(f"[成功] 测试数据插入完成:")
- print(f" - test_sales_data: 250 条")
- print(f" - test_user_statistics: 200 条")
- print(f" - test_product_inventory: 150 条")
- def register_data_products(conn):
- """注册数据产品到 data_products 表"""
- cursor = conn.cursor()
- print("\n[3/3] 注册数据产品...")
- products = [
- {
- "product_name": "销售数据分析",
- "product_name_en": "test_sales_data",
- "target_table": "test_sales_data",
- "target_schema": "public",
- "description": "销售订单数据分析,包含订单详情、客户信息、产品信息等",
- "source_dataflow_id": 1001,
- "source_dataflow_name": "销售数据加工流程",
- },
- {
- "product_name": "用户行为统计",
- "product_name_en": "test_user_statistics",
- "target_table": "test_user_statistics",
- "target_schema": "public",
- "description": "用户注册、登录、订单统计等行为数据分析",
- "source_dataflow_id": 1002,
- "source_dataflow_name": "用户数据加工流程",
- },
- {
- "product_name": "产品库存管理",
- "product_name_en": "test_product_inventory",
- "target_table": "test_product_inventory",
- "target_schema": "public",
- "description": "产品库存信息,包括库存数量、价格、供应商等信息",
- "source_dataflow_id": 1003,
- "source_dataflow_name": "库存数据加工流程",
- },
- ]
- for product in products:
- # 先检查表是否存在
- cursor.execute("""
- SELECT EXISTS (
- SELECT FROM information_schema.tables
- WHERE table_schema = %s AND table_name = %s
- )
- """, (product['target_schema'], product['target_table']))
- table_exists = cursor.fetchone()[0]
- if not table_exists:
- print(f" [跳过] 表 {product['target_table']} 不存在,跳过注册")
- continue
- # 获取表的记录数和列数
- table_name = product['target_table']
- cursor.execute(f'SELECT COUNT(*) FROM "{table_name}"')
- record_count = cursor.fetchone()[0]
- cursor.execute("""
- SELECT COUNT(*)
- FROM information_schema.columns
- WHERE table_schema = %s AND table_name = %s
- """, (product['target_schema'], product['target_table']))
- column_count = cursor.fetchone()[0]
- # 检查是否已存在
- cursor.execute("""
- SELECT id FROM data_products
- WHERE target_schema = %s AND target_table = %s
- """, (product['target_schema'], product['target_table']))
- existing = cursor.fetchone()
- if existing:
- # 更新现有记录
- cursor.execute("""
- UPDATE data_products SET
- product_name = %s,
- product_name_en = %s,
- description = %s,
- source_dataflow_id = %s,
- source_dataflow_name = %s,
- record_count = %s,
- column_count = %s,
- last_updated_at = CURRENT_TIMESTAMP,
- updated_at = CURRENT_TIMESTAMP
- WHERE id = %s
- """, (
- product['product_name'],
- product['product_name_en'],
- product['description'],
- product['source_dataflow_id'],
- product['source_dataflow_name'],
- record_count,
- column_count,
- existing[0],
- ))
- print(f" [更新] {product['product_name']} (ID: {existing[0]})")
- else:
- # 插入新记录
- cursor.execute("""
- INSERT INTO data_products
- (product_name, product_name_en, description, source_dataflow_id,
- source_dataflow_name, target_table, target_schema, record_count,
- column_count, last_updated_at, created_by, status)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, 'test_script', 'active')
- RETURNING id
- """, (
- product['product_name'],
- product['product_name_en'],
- product['description'],
- product['source_dataflow_id'],
- product['source_dataflow_name'],
- product['target_table'],
- product['target_schema'],
- record_count,
- column_count,
- ))
- product_id = cursor.fetchone()[0]
- print(f" [创建] {product['product_name']} (ID: {product_id}, 记录数: {record_count})")
- conn.commit()
- print("[成功] 数据产品注册完成")
- def main():
- """主函数"""
- print("=" * 60)
- print("准备数据服务功能测试数据")
- print("=" * 60)
- env = os.environ.get("FLASK_ENV", "production")
- print(f"\n当前环境: {env}")
- if env != "production":
- response = input("\n警告: 当前不是生产环境,是否继续?(yes/no): ")
- if response.lower() != "yes":
- print("已取消操作")
- return
- try:
- conn = get_db_connection()
- conn.autocommit = False
- try:
- create_test_tables(conn)
- insert_test_data(conn)
- register_data_products(conn)
- print("\n" + "=" * 60)
- print("[完成] 测试数据准备完成!")
- print("=" * 60)
- print("\n可以开始测试以下 API 接口:")
- print(" 1. GET /api/dataservice/products - 获取数据产品列表")
- print(" 2. GET /api/dataservice/products/{id} - 获取产品详情")
- print(" 3. GET /api/dataservice/products/{id}/preview - 获取数据预览")
- print(" 4. GET /api/dataservice/products/{id}/download - 下载Excel")
- print(" 5. POST /api/dataservice/products/{id}/viewed - 标记已查看")
- print(" 6. POST /api/dataservice/products/{id}/refresh - 刷新统计信息")
- finally:
- conn.close()
- except Exception as e:
- print(f"\n[错误] 操作失败: {e}")
- import traceback
- traceback.print_exc()
- sys.exit(1)
- if __name__ == "__main__":
- main()
|