| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471 |
- """
- 清理并创建测试数据表脚本
- 删除所有旧的测试表,然后重新创建
- """
- from __future__ import annotations
- import random
- from datetime import datetime, timedelta
- from typing import Any
- import psycopg2
- from loguru import logger
- # 生产环境数据库配置
- DB_CONFIG = {
- "host": "192.168.3.143",
- "port": 5432,
- "database": "dataops",
- "user": "postgres",
- "password": "dataOps",
- }
- def get_connection():
- """获取数据库连接"""
- return psycopg2.connect(**DB_CONFIG)
- def cleanup_all_test_tables(conn) -> None:
- """清理所有测试表(所有 schema)"""
- logger.info("Cleaning up all test tables...")
- with conn.cursor() as cur:
- # 查找所有 schema 中的测试表
- cur.execute("""
- SELECT table_schema, table_name
- FROM information_schema.tables
- WHERE table_name IN ('test_sales_data', 'test_user_statistics', 'test_product_inventory')
- """)
- tables = cur.fetchall()
- for schema, table in tables:
- logger.info(f"Dropping {schema}.{table}")
- cur.execute(f'DROP TABLE IF EXISTS "{schema}"."{table}" CASCADE')
- conn.commit()
- logger.info("Cleanup complete")
- def create_test_sales_data_table(conn) -> None:
- """创建销售数据分析表"""
- logger.info("Creating test_sales_data table...")
- with conn.cursor() as cur:
- cur.execute("""
- CREATE TABLE public.test_sales_data (
- id SERIAL PRIMARY KEY,
- order_id VARCHAR(50) NOT NULL,
- order_date DATE NOT NULL,
- customer_id VARCHAR(50) NOT NULL,
- customer_name VARCHAR(100),
- product_id VARCHAR(50) NOT NULL,
- product_name VARCHAR(200),
- category VARCHAR(100),
- quantity INTEGER NOT NULL,
- unit_price DECIMAL(10, 2) NOT NULL,
- total_amount DECIMAL(12, 2) NOT NULL,
- discount_rate DECIMAL(5, 2) DEFAULT 0,
- payment_method VARCHAR(50),
- region VARCHAR(100),
- city VARCHAR(100),
- status VARCHAR(50) DEFAULT 'completed',
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- """)
- cur.execute(
- "COMMENT ON TABLE public.test_sales_data IS 'Sales data table - test data'"
- )
- conn.commit()
- logger.info("test_sales_data table created")
- def insert_test_sales_data(conn, num_records: int = 500) -> None:
- """插入销售测试数据"""
- logger.info(f"Inserting {num_records} sales records...")
- customers = [
- ("C001", "Zhang San"),
- ("C002", "Li Si"),
- ("C003", "Wang Wu"),
- ("C004", "Zhao Liu"),
- ("C005", "Qian Qi"),
- ]
- products = [
- ("P001", "iPhone 15 Pro", "Electronics", 7999.00),
- ("P002", "MacBook Pro", "Computers", 14999.00),
- ("P003", "AirPods Pro", "Accessories", 1899.00),
- ("P004", "iPad Pro", "Tablets", 8999.00),
- ("P005", "Apple Watch", "Wearables", 3299.00),
- ]
- regions = [
- ("East", ["Shanghai", "Hangzhou", "Nanjing"]),
- ("North", ["Beijing", "Tianjin", "Shijiazhuang"]),
- ("South", ["Guangzhou", "Shenzhen", "Dongguan"]),
- ]
- payment_methods = ["Alipay", "WeChat Pay", "Bank Card", "Credit Card"]
- statuses = ["completed", "completed", "completed", "pending", "cancelled"]
- with conn.cursor() as cur:
- records: list[tuple[Any, ...]] = []
- base_date = datetime.now() - timedelta(days=180)
- for i in range(num_records):
- order_id = f"ORD{datetime.now().strftime('%Y%m%d')}{i + 1:05d}"
- order_date = base_date + timedelta(days=random.randint(0, 180))
- customer = random.choice(customers)
- product = random.choice(products)
- region_data = random.choice(regions)
- quantity = random.randint(1, 5)
- unit_price = product[3]
- discount_rate = random.choice([0, 0, 0, 0.05, 0.10, 0.15])
- total_amount = round(quantity * unit_price * (1 - discount_rate), 2)
- records.append(
- (
- order_id,
- order_date.date(),
- customer[0],
- customer[1],
- product[0],
- product[1],
- product[2],
- quantity,
- unit_price,
- total_amount,
- discount_rate,
- random.choice(payment_methods),
- region_data[0],
- random.choice(region_data[1]),
- random.choice(statuses),
- )
- )
- cur.executemany(
- """
- INSERT INTO public.test_sales_data (
- order_id, order_date, customer_id, customer_name,
- product_id, product_name, category, quantity,
- unit_price, total_amount, discount_rate,
- payment_method, region, city, status
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """,
- records,
- )
- conn.commit()
- logger.info(f"Inserted {num_records} sales records")
- def create_test_user_statistics_table(conn) -> None:
- """创建用户行为统计表"""
- logger.info("Creating test_user_statistics table...")
- with conn.cursor() as cur:
- cur.execute("""
- CREATE TABLE public.test_user_statistics (
- id SERIAL PRIMARY KEY,
- user_id VARCHAR(50) NOT NULL,
- username VARCHAR(100),
- email VARCHAR(200),
- register_date DATE,
- last_login_date TIMESTAMP,
- login_count INTEGER DEFAULT 0,
- total_orders INTEGER DEFAULT 0,
- total_amount DECIMAL(12, 2) DEFAULT 0,
- avg_order_amount DECIMAL(10, 2) DEFAULT 0,
- favorite_category VARCHAR(100),
- user_level VARCHAR(50),
- points INTEGER DEFAULT 0,
- is_vip BOOLEAN DEFAULT FALSE,
- device_type VARCHAR(50),
- platform VARCHAR(50),
- province VARCHAR(100),
- city VARCHAR(100),
- age_group VARCHAR(50),
- gender VARCHAR(20),
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- """)
- cur.execute(
- "COMMENT ON TABLE public.test_user_statistics IS 'User statistics table - test data'"
- )
- conn.commit()
- logger.info("test_user_statistics table created")
- def insert_test_user_statistics(conn, num_records: int = 300) -> None:
- """插入用户统计测试数据"""
- logger.info(f"Inserting {num_records} user statistics records...")
- names = ["Alice", "Bob", "Charlie", "David", "Eva", "Frank", "Grace", "Henry"]
- categories = ["Electronics", "Computers", "Home", "Fashion", "Beauty", "Food"]
- levels = ["Regular", "Silver", "Gold", "Diamond", "VIP"]
- devices = ["iOS", "Android", "Windows", "macOS", "Web"]
- platforms = ["App", "Mini Program", "PC Web", "H5"]
- provinces = ["Beijing", "Shanghai", "Guangdong", "Zhejiang", "Jiangsu"]
- age_groups = ["18-25", "26-35", "36-45", "46-55", "55+"]
- genders = ["Male", "Female"]
- with conn.cursor() as cur:
- records: list[tuple[Any, ...]] = []
- base_date = datetime.now() - timedelta(days=365)
- for i in range(num_records):
- user_id = f"U{100000 + i}"
- name = f"{random.choice(names)}{i}"
- register_date = base_date + timedelta(days=random.randint(0, 365))
- last_login = register_date + timedelta(
- days=random.randint(0, (datetime.now() - register_date).days)
- )
- login_count = random.randint(1, 500)
- total_orders = random.randint(0, 100)
- total_amount = round(random.uniform(0, 50000), 2) if total_orders > 0 else 0
- avg_amount = (
- round(total_amount / total_orders, 2) if total_orders > 0 else 0
- )
- points = random.randint(0, 10000)
- is_vip = points > 5000
- records.append(
- (
- user_id,
- name,
- f"{user_id.lower()}@example.com",
- register_date.date(),
- last_login,
- login_count,
- total_orders,
- total_amount,
- avg_amount,
- random.choice(categories),
- random.choice(levels),
- points,
- is_vip,
- random.choice(devices),
- random.choice(platforms),
- random.choice(provinces),
- f"{random.choice(provinces)} City",
- random.choice(age_groups),
- random.choice(genders),
- )
- )
- cur.executemany(
- """
- INSERT INTO public.test_user_statistics (
- user_id, username, email, register_date, last_login_date,
- login_count, total_orders, total_amount, avg_order_amount,
- favorite_category, user_level, points, is_vip,
- device_type, platform, province, city, age_group, gender
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """,
- records,
- )
- conn.commit()
- logger.info(f"Inserted {num_records} user statistics records")
- def create_test_product_inventory_table(conn) -> None:
- """创建商品库存表"""
- logger.info("Creating test_product_inventory table...")
- with conn.cursor() as cur:
- cur.execute("""
- CREATE TABLE public.test_product_inventory (
- id SERIAL PRIMARY KEY,
- sku VARCHAR(50) NOT NULL,
- product_name VARCHAR(200) NOT NULL,
- category VARCHAR(100),
- brand VARCHAR(100),
- supplier VARCHAR(200),
- warehouse VARCHAR(100),
- current_stock INTEGER DEFAULT 0,
- safety_stock INTEGER DEFAULT 0,
- max_stock INTEGER DEFAULT 0,
- unit_cost DECIMAL(10, 2),
- selling_price DECIMAL(10, 2),
- stock_status VARCHAR(50),
- last_inbound_date DATE,
- last_outbound_date DATE,
- inbound_quantity_30d INTEGER DEFAULT 0,
- outbound_quantity_30d INTEGER DEFAULT 0,
- turnover_rate DECIMAL(5, 2),
- is_active BOOLEAN DEFAULT TRUE,
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- """)
- cur.execute(
- "COMMENT ON TABLE public.test_product_inventory IS 'Product inventory table - test data'"
- )
- conn.commit()
- logger.info("test_product_inventory table created")
- def insert_test_product_inventory(conn, num_records: int = 200) -> None:
- """插入商品库存测试数据"""
- logger.info(f"Inserting {num_records} product inventory records...")
- products = [
- ("iPhone 15 Pro", "Electronics", "Apple"),
- ("MacBook Pro", "Computers", "Apple"),
- ("AirPods Pro", "Accessories", "Apple"),
- ("Huawei Mate 60", "Electronics", "Huawei"),
- ("Xiaomi 14 Pro", "Electronics", "Xiaomi"),
- ("Dyson Vacuum", "Home", "Dyson"),
- ("Sony TV", "Home", "Sony"),
- ("ThinkPad X1", "Computers", "Lenovo"),
- ]
- suppliers = ["Tech Co.", "Trade Inc.", "Electronics Ltd.", "Digital Corp."]
- warehouses = ["Beijing WH", "Shanghai WH", "Guangzhou WH", "Chengdu WH"]
- with conn.cursor() as cur:
- records: list[tuple[Any, ...]] = []
- for i in range(num_records):
- product = random.choice(products)
- sku = f"SKU{100000 + i}"
- current_stock = random.randint(0, 1000)
- safety_stock = random.randint(50, 200)
- max_stock = random.randint(800, 2000)
- unit_cost = round(random.uniform(10, 5000), 2)
- selling_price = round(unit_cost * random.uniform(1.2, 2.0), 2)
- if current_stock == 0:
- stock_status = "Out of Stock"
- elif current_stock < safety_stock:
- stock_status = "Low Stock"
- elif current_stock > max_stock * 0.9:
- stock_status = "Overstocked"
- else:
- stock_status = "Normal"
- last_inbound = datetime.now() - timedelta(days=random.randint(1, 60))
- last_outbound = datetime.now() - timedelta(days=random.randint(1, 30))
- inbound_30d = random.randint(0, 500)
- outbound_30d = random.randint(0, 400)
- turnover = min(round(outbound_30d / max(current_stock, 1) * 30, 2), 999.99)
- records.append(
- (
- sku,
- f"{product[0]} - Model {chr(65 + i % 26)}",
- product[1],
- product[2],
- random.choice(suppliers),
- random.choice(warehouses),
- current_stock,
- safety_stock,
- max_stock,
- unit_cost,
- selling_price,
- stock_status,
- last_inbound.date(),
- last_outbound.date(),
- inbound_30d,
- outbound_30d,
- turnover,
- random.choice([True, True, True, False]),
- )
- )
- cur.executemany(
- """
- INSERT INTO public.test_product_inventory (
- sku, product_name, category, brand, supplier, warehouse,
- current_stock, safety_stock, max_stock, unit_cost, selling_price,
- stock_status, last_inbound_date, last_outbound_date,
- inbound_quantity_30d, outbound_quantity_30d, turnover_rate, is_active
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """,
- records,
- )
- conn.commit()
- logger.info(f"Inserted {num_records} product inventory records")
- def update_data_products_stats(conn) -> None:
- """更新 data_products 表中的统计信息"""
- logger.info("Updating data_products statistics...")
- tables_info = [
- ("test_sales_data", 17),
- ("test_user_statistics", 22),
- ("test_product_inventory", 21),
- ]
- with conn.cursor() as cur:
- for table_name, column_count in tables_info:
- cur.execute(f"SELECT COUNT(*) FROM public.{table_name}")
- record_count = cur.fetchone()[0]
- cur.execute(
- """
- UPDATE public.data_products
- SET record_count = %s,
- column_count = %s,
- last_updated_at = CURRENT_TIMESTAMP,
- updated_at = CURRENT_TIMESTAMP,
- status = 'active'
- WHERE target_table = %s AND target_schema = 'public'
- """,
- (record_count, column_count, table_name),
- )
- logger.info(
- f"Updated {table_name}: records={record_count}, columns={column_count}"
- )
- conn.commit()
- logger.info("Statistics update complete")
- def main() -> None:
- """主函数"""
- logger.info("=" * 60)
- logger.info("Starting test data creation...")
- logger.info("=" * 60)
- try:
- conn = get_connection()
- logger.info("Database connected")
- # 清理所有旧表
- cleanup_all_test_tables(conn)
- # 创建表和插入数据
- create_test_sales_data_table(conn)
- insert_test_sales_data(conn, num_records=500)
- create_test_user_statistics_table(conn)
- insert_test_user_statistics(conn, num_records=300)
- create_test_product_inventory_table(conn)
- insert_test_product_inventory(conn, num_records=200)
- # 更新统计信息
- update_data_products_stats(conn)
- conn.close()
- logger.info("=" * 60)
- logger.info("All test data created successfully!")
- logger.info("=" * 60)
- logger.info("Created tables:")
- logger.info(" 1. test_sales_data (500 records)")
- logger.info(" 2. test_user_statistics (300 records)")
- logger.info(" 3. test_product_inventory (200 records)")
- logger.info("=" * 60)
- except Exception as e:
- logger.error(f"Failed to create test data: {e}")
- raise
- if __name__ == "__main__":
- main()
|