""" 清理并创建测试数据表脚本 删除所有旧的测试表,然后重新创建 """ from __future__ import annotations import random from datetime import datetime, timedelta from typing import Any import psycopg2 from loguru import logger # 生产环境数据库配置 DB_CONFIG = { "host": "192.168.3.143", "port": 5432, "database": "dataops", "user": "postgres", "password": "dataOps", } def get_connection(): """获取数据库连接""" return psycopg2.connect(**DB_CONFIG) def cleanup_all_test_tables(conn) -> None: """清理所有测试表(所有 schema)""" logger.info("Cleaning up all test tables...") with conn.cursor() as cur: # 查找所有 schema 中的测试表 cur.execute(""" SELECT table_schema, table_name FROM information_schema.tables WHERE table_name IN ('test_sales_data', 'test_user_statistics', 'test_product_inventory') """) tables = cur.fetchall() for schema, table in tables: logger.info(f"Dropping {schema}.{table}") cur.execute(f'DROP TABLE IF EXISTS "{schema}"."{table}" CASCADE') conn.commit() logger.info("Cleanup complete") def create_test_sales_data_table(conn) -> None: """创建销售数据分析表""" logger.info("Creating test_sales_data table...") with conn.cursor() as cur: cur.execute(""" CREATE TABLE public.test_sales_data ( id SERIAL PRIMARY KEY, order_id VARCHAR(50) NOT NULL, order_date DATE NOT NULL, customer_id VARCHAR(50) NOT NULL, customer_name VARCHAR(100), product_id VARCHAR(50) NOT NULL, product_name VARCHAR(200), category VARCHAR(100), quantity INTEGER NOT NULL, unit_price DECIMAL(10, 2) NOT NULL, total_amount DECIMAL(12, 2) NOT NULL, discount_rate DECIMAL(5, 2) DEFAULT 0, payment_method VARCHAR(50), region VARCHAR(100), city VARCHAR(100), status VARCHAR(50) DEFAULT 'completed', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cur.execute( "COMMENT ON TABLE public.test_sales_data IS 'Sales data table - test data'" ) conn.commit() logger.info("test_sales_data table created") def insert_test_sales_data(conn, num_records: int = 500) -> None: """插入销售测试数据""" logger.info(f"Inserting {num_records} sales records...") customers = [ ("C001", "Zhang San"), ("C002", "Li Si"), ("C003", "Wang Wu"), ("C004", "Zhao Liu"), ("C005", "Qian Qi"), ] products = [ ("P001", "iPhone 15 Pro", "Electronics", 7999.00), ("P002", "MacBook Pro", "Computers", 14999.00), ("P003", "AirPods Pro", "Accessories", 1899.00), ("P004", "iPad Pro", "Tablets", 8999.00), ("P005", "Apple Watch", "Wearables", 3299.00), ] regions = [ ("East", ["Shanghai", "Hangzhou", "Nanjing"]), ("North", ["Beijing", "Tianjin", "Shijiazhuang"]), ("South", ["Guangzhou", "Shenzhen", "Dongguan"]), ] payment_methods = ["Alipay", "WeChat Pay", "Bank Card", "Credit Card"] statuses = ["completed", "completed", "completed", "pending", "cancelled"] with conn.cursor() as cur: records: list[tuple[Any, ...]] = [] base_date = datetime.now() - timedelta(days=180) for i in range(num_records): order_id = f"ORD{datetime.now().strftime('%Y%m%d')}{i + 1:05d}" order_date = base_date + timedelta(days=random.randint(0, 180)) customer = random.choice(customers) product = random.choice(products) region_data = random.choice(regions) quantity = random.randint(1, 5) unit_price = product[3] discount_rate = random.choice([0, 0, 0, 0.05, 0.10, 0.15]) total_amount = round(quantity * unit_price * (1 - discount_rate), 2) records.append( ( order_id, order_date.date(), customer[0], customer[1], product[0], product[1], product[2], quantity, unit_price, total_amount, discount_rate, random.choice(payment_methods), region_data[0], random.choice(region_data[1]), random.choice(statuses), ) ) cur.executemany( """ INSERT INTO public.test_sales_data ( order_id, order_date, customer_id, customer_name, product_id, product_name, category, quantity, unit_price, total_amount, discount_rate, payment_method, region, city, status ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, records, ) conn.commit() logger.info(f"Inserted {num_records} sales records") def create_test_user_statistics_table(conn) -> None: """创建用户行为统计表""" logger.info("Creating test_user_statistics table...") with conn.cursor() as cur: cur.execute(""" CREATE TABLE public.test_user_statistics ( id SERIAL PRIMARY KEY, user_id VARCHAR(50) NOT NULL, username VARCHAR(100), email VARCHAR(200), register_date DATE, last_login_date TIMESTAMP, login_count INTEGER DEFAULT 0, total_orders INTEGER DEFAULT 0, total_amount DECIMAL(12, 2) DEFAULT 0, avg_order_amount DECIMAL(10, 2) DEFAULT 0, favorite_category VARCHAR(100), user_level VARCHAR(50), points INTEGER DEFAULT 0, is_vip BOOLEAN DEFAULT FALSE, device_type VARCHAR(50), platform VARCHAR(50), province VARCHAR(100), city VARCHAR(100), age_group VARCHAR(50), gender VARCHAR(20), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cur.execute( "COMMENT ON TABLE public.test_user_statistics IS 'User statistics table - test data'" ) conn.commit() logger.info("test_user_statistics table created") def insert_test_user_statistics(conn, num_records: int = 300) -> None: """插入用户统计测试数据""" logger.info(f"Inserting {num_records} user statistics records...") names = ["Alice", "Bob", "Charlie", "David", "Eva", "Frank", "Grace", "Henry"] categories = ["Electronics", "Computers", "Home", "Fashion", "Beauty", "Food"] levels = ["Regular", "Silver", "Gold", "Diamond", "VIP"] devices = ["iOS", "Android", "Windows", "macOS", "Web"] platforms = ["App", "Mini Program", "PC Web", "H5"] provinces = ["Beijing", "Shanghai", "Guangdong", "Zhejiang", "Jiangsu"] age_groups = ["18-25", "26-35", "36-45", "46-55", "55+"] genders = ["Male", "Female"] with conn.cursor() as cur: records: list[tuple[Any, ...]] = [] base_date = datetime.now() - timedelta(days=365) for i in range(num_records): user_id = f"U{100000 + i}" name = f"{random.choice(names)}{i}" register_date = base_date + timedelta(days=random.randint(0, 365)) last_login = register_date + timedelta( days=random.randint(0, (datetime.now() - register_date).days) ) login_count = random.randint(1, 500) total_orders = random.randint(0, 100) total_amount = round(random.uniform(0, 50000), 2) if total_orders > 0 else 0 avg_amount = ( round(total_amount / total_orders, 2) if total_orders > 0 else 0 ) points = random.randint(0, 10000) is_vip = points > 5000 records.append( ( user_id, name, f"{user_id.lower()}@example.com", register_date.date(), last_login, login_count, total_orders, total_amount, avg_amount, random.choice(categories), random.choice(levels), points, is_vip, random.choice(devices), random.choice(platforms), random.choice(provinces), f"{random.choice(provinces)} City", random.choice(age_groups), random.choice(genders), ) ) cur.executemany( """ INSERT INTO public.test_user_statistics ( user_id, username, email, register_date, last_login_date, login_count, total_orders, total_amount, avg_order_amount, favorite_category, user_level, points, is_vip, device_type, platform, province, city, age_group, gender ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, records, ) conn.commit() logger.info(f"Inserted {num_records} user statistics records") def create_test_product_inventory_table(conn) -> None: """创建商品库存表""" logger.info("Creating test_product_inventory table...") with conn.cursor() as cur: cur.execute(""" CREATE TABLE public.test_product_inventory ( id SERIAL PRIMARY KEY, sku VARCHAR(50) NOT NULL, product_name VARCHAR(200) NOT NULL, category VARCHAR(100), brand VARCHAR(100), supplier VARCHAR(200), warehouse VARCHAR(100), current_stock INTEGER DEFAULT 0, safety_stock INTEGER DEFAULT 0, max_stock INTEGER DEFAULT 0, unit_cost DECIMAL(10, 2), selling_price DECIMAL(10, 2), stock_status VARCHAR(50), last_inbound_date DATE, last_outbound_date DATE, inbound_quantity_30d INTEGER DEFAULT 0, outbound_quantity_30d INTEGER DEFAULT 0, turnover_rate DECIMAL(5, 2), is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cur.execute( "COMMENT ON TABLE public.test_product_inventory IS 'Product inventory table - test data'" ) conn.commit() logger.info("test_product_inventory table created") def insert_test_product_inventory(conn, num_records: int = 200) -> None: """插入商品库存测试数据""" logger.info(f"Inserting {num_records} product inventory records...") products = [ ("iPhone 15 Pro", "Electronics", "Apple"), ("MacBook Pro", "Computers", "Apple"), ("AirPods Pro", "Accessories", "Apple"), ("Huawei Mate 60", "Electronics", "Huawei"), ("Xiaomi 14 Pro", "Electronics", "Xiaomi"), ("Dyson Vacuum", "Home", "Dyson"), ("Sony TV", "Home", "Sony"), ("ThinkPad X1", "Computers", "Lenovo"), ] suppliers = ["Tech Co.", "Trade Inc.", "Electronics Ltd.", "Digital Corp."] warehouses = ["Beijing WH", "Shanghai WH", "Guangzhou WH", "Chengdu WH"] with conn.cursor() as cur: records: list[tuple[Any, ...]] = [] for i in range(num_records): product = random.choice(products) sku = f"SKU{100000 + i}" current_stock = random.randint(0, 1000) safety_stock = random.randint(50, 200) max_stock = random.randint(800, 2000) unit_cost = round(random.uniform(10, 5000), 2) selling_price = round(unit_cost * random.uniform(1.2, 2.0), 2) if current_stock == 0: stock_status = "Out of Stock" elif current_stock < safety_stock: stock_status = "Low Stock" elif current_stock > max_stock * 0.9: stock_status = "Overstocked" else: stock_status = "Normal" last_inbound = datetime.now() - timedelta(days=random.randint(1, 60)) last_outbound = datetime.now() - timedelta(days=random.randint(1, 30)) inbound_30d = random.randint(0, 500) outbound_30d = random.randint(0, 400) turnover = min(round(outbound_30d / max(current_stock, 1) * 30, 2), 999.99) records.append( ( sku, f"{product[0]} - Model {chr(65 + i % 26)}", product[1], product[2], random.choice(suppliers), random.choice(warehouses), current_stock, safety_stock, max_stock, unit_cost, selling_price, stock_status, last_inbound.date(), last_outbound.date(), inbound_30d, outbound_30d, turnover, random.choice([True, True, True, False]), ) ) cur.executemany( """ INSERT INTO public.test_product_inventory ( sku, product_name, category, brand, supplier, warehouse, current_stock, safety_stock, max_stock, unit_cost, selling_price, stock_status, last_inbound_date, last_outbound_date, inbound_quantity_30d, outbound_quantity_30d, turnover_rate, is_active ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, records, ) conn.commit() logger.info(f"Inserted {num_records} product inventory records") def update_data_products_stats(conn) -> None: """更新 data_products 表中的统计信息""" logger.info("Updating data_products statistics...") tables_info = [ ("test_sales_data", 17), ("test_user_statistics", 22), ("test_product_inventory", 21), ] with conn.cursor() as cur: for table_name, column_count in tables_info: cur.execute(f"SELECT COUNT(*) FROM public.{table_name}") record_count = cur.fetchone()[0] cur.execute( """ UPDATE public.data_products SET record_count = %s, column_count = %s, last_updated_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP, status = 'active' WHERE target_table = %s AND target_schema = 'public' """, (record_count, column_count, table_name), ) logger.info( f"Updated {table_name}: records={record_count}, columns={column_count}" ) conn.commit() logger.info("Statistics update complete") def main() -> None: """主函数""" logger.info("=" * 60) logger.info("Starting test data creation...") logger.info("=" * 60) try: conn = get_connection() logger.info("Database connected") # 清理所有旧表 cleanup_all_test_tables(conn) # 创建表和插入数据 create_test_sales_data_table(conn) insert_test_sales_data(conn, num_records=500) create_test_user_statistics_table(conn) insert_test_user_statistics(conn, num_records=300) create_test_product_inventory_table(conn) insert_test_product_inventory(conn, num_records=200) # 更新统计信息 update_data_products_stats(conn) conn.close() logger.info("=" * 60) logger.info("All test data created successfully!") logger.info("=" * 60) logger.info("Created tables:") logger.info(" 1. test_sales_data (500 records)") logger.info(" 2. test_user_statistics (300 records)") logger.info(" 3. test_product_inventory (200 records)") logger.info("=" * 60) except Exception as e: logger.error(f"Failed to create test data: {e}") raise if __name__ == "__main__": main()