||
- """
- 创建测试数据表脚本
- 为 data_products 表中注册的数据产品创建对应的数据表,并填充测试数据
- """
- from __future__ import annotations
- import random
- from datetime import datetime, timedelta
- from typing import Any
- import psycopg2
- from loguru import logger
- # 生产环境数据库配置
- DB_CONFIG = {
- "host": "192.168.3.143",
- "port": 5432,
- "database": "dataops",
- "user": "postgres",
- "password": "dataOps",
- }
- def get_connection():
- """获取数据库连接"""
- return psycopg2.connect(**DB_CONFIG)
- def create_test_sales_data_table(conn) -> None:
- """
- 创建销售数据分析表 test_sales_data
- 模拟电商销售数据
- """
- logger.info("创建 test_sales_data 表...")
- with conn.cursor() as cur:
- # 删除已存在的表
- cur.execute("DROP TABLE IF EXISTS public.test_sales_data CASCADE")
- # 创建表
- cur.execute("""
- CREATE TABLE public.test_sales_data (
- id SERIAL PRIMARY KEY,
- order_id VARCHAR(50) NOT NULL,
- order_date DATE NOT NULL,
- customer_id VARCHAR(50) NOT NULL,
- customer_name VARCHAR(100),
- product_id VARCHAR(50) NOT NULL,
- product_name VARCHAR(200),
- category VARCHAR(100),
- quantity INTEGER NOT NULL,
- unit_price DECIMAL(10, 2) NOT NULL,
- total_amount DECIMAL(12, 2) NOT NULL,
- discount_rate DECIMAL(5, 2) DEFAULT 0,
- payment_method VARCHAR(50),
- region VARCHAR(100),
- city VARCHAR(100),
- status VARCHAR(50) DEFAULT 'completed',
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- """)
- # 添加注释
- cur.execute(
- "COMMENT ON TABLE public.test_sales_data IS '销售数据分析表 - 测试数据'"
- )
- conn.commit()
- logger.info("test_sales_data 表创建成功")
- def insert_test_sales_data(conn, num_records: int = 500) -> None:
- """
- 插入销售测试数据
- Args:
- conn: 数据库连接
- num_records: 要插入的记录数
- """
- logger.info(f"插入 {num_records} 条销售测试数据...")
- # 模拟数据
- customers = [
- ("C001", "张三"),
- ("C002", "李四"),
- ("C003", "王五"),
- ("C004", "赵六"),
- ("C005", "钱七"),
- ("C006", "孙八"),
- ("C007", "周九"),
- ("C008", "吴十"),
- ("C009", "郑明"),
- ("C010", "陈华"),
- ]
- products = [
- ("P001", "iPhone 15 Pro", "手机数码", 7999.00),
- ("P002", "MacBook Pro 14", "电脑办公", 14999.00),
- ("P003", "AirPods Pro 2", "手机配件", 1899.00),
- ("P004", "iPad Pro 12.9", "平板电脑", 8999.00),
- ("P005", "Apple Watch S9", "智能穿戴", 3299.00),
- ("P006", "戴森吸尘器 V15", "家用电器", 4990.00),
- ("P007", "索尼降噪耳机", "音频设备", 2499.00),
- ("P008", "小米电视 75寸", "家用电器", 5999.00),
- ("P009", "华为 Mate 60 Pro", "手机数码", 6999.00),
- ("P010", "联想ThinkPad X1", "电脑办公", 12999.00),
- ]
- regions = [
- ("华东", ["上海", "杭州", "南京", "苏州", "无锡"]),
- ("华北", ["北京", "天津", "石家庄", "太原", "济南"]),
- ("华南", ["广州", "深圳", "东莞", "佛山", "珠海"]),
- ("西南", ["成都", "重庆", "昆明", "贵阳", "西安"]),
- ("华中", ["武汉", "长沙", "郑州", "合肥", "南昌"]),
- ]
- payment_methods = ["支付宝", "微信支付", "银行卡", "信用卡", "货到付款"]
- statuses = ["completed", "completed", "completed", "pending", "cancelled"]
- with conn.cursor() as cur:
- records: list[tuple[Any, ...]] = []
- base_date = datetime.now() - timedelta(days=180)
- for i in range(num_records):
- order_id = f"ORD{datetime.now().strftime('%Y%m%d')}{i + 1:05d}"
- order_date = base_date + timedelta(days=random.randint(0, 180))
- customer = random.choice(customers)
- product = random.choice(products)
- region_data = random.choice(regions)
- quantity = random.randint(1, 5)
- unit_price = product[3]
- discount_rate = random.choice([0, 0, 0, 0.05, 0.10, 0.15, 0.20])
- total_amount = round(quantity * unit_price * (1 - discount_rate), 2)
- records.append(
- (
- order_id,
- order_date.date(),
- customer[0],
- customer[1],
- product[0],
- product[1],
- product[2],
- quantity,
- unit_price,
- total_amount,
- discount_rate,
- random.choice(payment_methods),
- region_data[0],
- random.choice(region_data[1]),
- random.choice(statuses),
- )
- )
- cur.executemany(
- """
- INSERT INTO public.test_sales_data (
- order_id, order_date, customer_id, customer_name,
- product_id, product_name, category, quantity,
- unit_price, total_amount, discount_rate,
- payment_method, region, city, status
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """,
- records,
- )
- conn.commit()
- logger.info(f"成功插入 {num_records} 条销售数据")
- def create_test_user_statistics_table(conn) -> None:
- """
- 创建用户行为统计表 test_user_statistics
- 模拟用户活跃度和行为数据
- """
- logger.info("创建 test_user_statistics 表...")
- with conn.cursor() as cur:
- cur.execute("DROP TABLE IF EXISTS public.test_user_statistics CASCADE")
- cur.execute("""
- CREATE TABLE public.test_user_statistics (
- id SERIAL PRIMARY KEY,
- user_id VARCHAR(50) NOT NULL,
- username VARCHAR(100),
- email VARCHAR(200),
- register_date DATE,
- last_login_date TIMESTAMP,
- login_count INTEGER DEFAULT 0,
- total_orders INTEGER DEFAULT 0,
- total_amount DECIMAL(12, 2) DEFAULT 0,
- avg_order_amount DECIMAL(10, 2) DEFAULT 0,
- favorite_category VARCHAR(100),
- user_level VARCHAR(50),
- points INTEGER DEFAULT 0,
- is_vip BOOLEAN DEFAULT FALSE,
- device_type VARCHAR(50),
- platform VARCHAR(50),
- province VARCHAR(100),
- city VARCHAR(100),
- age_group VARCHAR(50),
- gender VARCHAR(20),
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- """)
- cur.execute(
- "COMMENT ON TABLE public.test_user_statistics IS '用户行为统计表 - 测试数据'"
- )
- conn.commit()
- logger.info("test_user_statistics 表创建成功")
- def insert_test_user_statistics(conn, num_records: int = 300) -> None:
- """
- 插入用户统计测试数据
- Args:
- conn: 数据库连接
- num_records: 要插入的记录数
- """
- logger.info(f"插入 {num_records} 条用户统计测试数据...")
- names = [
- "张伟",
- "王芳",
- "李娜",
- "刘洋",
- "陈明",
- "杨静",
- "赵强",
- "黄丽",
- "周杰",
- "吴敏",
- "徐涛",
- "孙燕",
- "马超",
- "朱婷",
- "胡磊",
- "郭琳",
- "林峰",
- "何雪",
- "高飞",
- "梁慧",
- "郑鹏",
- "谢雨",
- "韩冰",
- "唐昊",
- ]
- categories = [
- "手机数码",
- "电脑办公",
- "家用电器",
- "服装鞋帽",
- "美妆护肤",
- "食品生鲜",
- ]
- levels = ["普通用户", "银牌会员", "金牌会员", "钻石会员", "至尊会员"]
- devices = ["iOS", "Android", "Windows", "macOS", "Web"]
- platforms = ["App", "小程序", "PC网页", "H5"]
- provinces = ["北京", "上海", "广东", "浙江", "江苏", "四川", "湖北", "山东"]
- age_groups = ["18-25", "26-35", "36-45", "46-55", "55+"]
- genders = ["男", "女"]
- with conn.cursor() as cur:
- records: list[tuple[Any, ...]] = []
- base_date = datetime.now() - timedelta(days=365)
- for i in range(num_records):
- user_id = f"U{100000 + i}"
- name = random.choice(names)
- register_date = base_date + timedelta(days=random.randint(0, 365))
- last_login = register_date + timedelta(
- days=random.randint(0, (datetime.now() - register_date).days)
- )
- login_count = random.randint(1, 500)
- total_orders = random.randint(0, 100)
- total_amount = round(random.uniform(0, 50000), 2) if total_orders > 0 else 0
- avg_amount = (
- round(total_amount / total_orders, 2) if total_orders > 0 else 0
- )
- points = random.randint(0, 10000)
- is_vip = points > 5000
- records.append(
- (
- user_id,
- name,
- f"{user_id.lower()}@example.com",
- register_date.date(),
- last_login,
- login_count,
- total_orders,
- total_amount,
- avg_amount,
- random.choice(categories),
- random.choice(levels),
- points,
- is_vip,
- random.choice(devices),
- random.choice(platforms),
- random.choice(provinces),
- f"{random.choice(provinces)}市",
- random.choice(age_groups),
- random.choice(genders),
- )
- )
- cur.executemany(
- """
- INSERT INTO public.test_user_statistics (
- user_id, username, email, register_date, last_login_date,
- login_count, total_orders, total_amount, avg_order_amount,
- favorite_category, user_level, points, is_vip,
- device_type, platform, province, city, age_group, gender
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """,
- records,
- )
- conn.commit()
- logger.info(f"成功插入 {num_records} 条用户统计数据")
- def create_test_product_inventory_table(conn) -> None:
- """
- 创建商品库存表 test_product_inventory
- 模拟商品库存和进销存数据
- """
- logger.info("创建 test_product_inventory 表...")
- with conn.cursor() as cur:
- cur.execute("DROP TABLE IF EXISTS public.test_product_inventory CASCADE")
- cur.execute("""
- CREATE TABLE public.test_product_inventory (
- id SERIAL PRIMARY KEY,
- sku VARCHAR(50) NOT NULL,
- product_name VARCHAR(200) NOT NULL,
- category VARCHAR(100),
- brand VARCHAR(100),
- supplier VARCHAR(200),
- warehouse VARCHAR(100),
- current_stock INTEGER DEFAULT 0,
- safety_stock INTEGER DEFAULT 0,
- max_stock INTEGER DEFAULT 0,
- unit_cost DECIMAL(10, 2),
- selling_price DECIMAL(10, 2),
- stock_status VARCHAR(50),
- last_inbound_date DATE,
- last_outbound_date DATE,
- inbound_quantity_30d INTEGER DEFAULT 0,
- outbound_quantity_30d INTEGER DEFAULT 0,
- turnover_rate DECIMAL(5, 2),
- is_active BOOLEAN DEFAULT TRUE,
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- """)
- cur.execute(
- "COMMENT ON TABLE public.test_product_inventory IS '商品库存表 - 测试数据'"
- )
- conn.commit()
- logger.info("test_product_inventory 表创建成功")
- def insert_test_product_inventory(conn, num_records: int = 200) -> None:
- """
- 插入商品库存测试数据
- Args:
- conn: 数据库连接
- num_records: 要插入的记录数
- """
- logger.info(f"插入 {num_records} 条商品库存测试数据...")
- products = [
- ("iPhone 15 Pro", "手机数码", "Apple"),
- ("MacBook Pro", "电脑办公", "Apple"),
- ("AirPods Pro", "手机配件", "Apple"),
- ("华为Mate 60", "手机数码", "华为"),
- ("小米14 Pro", "手机数码", "小米"),
- ("戴森吸尘器", "家用电器", "戴森"),
- ("索尼电视", "家用电器", "索尼"),
- ("联想ThinkPad", "电脑办公", "联想"),
- ("Nike运动鞋", "服装鞋帽", "Nike"),
- ("Adidas外套", "服装鞋帽", "Adidas"),
- ("雅诗兰黛精华", "美妆护肤", "雅诗兰黛"),
- ("SK-II神仙水", "美妆护肤", "SK-II"),
- ("海蓝之谜面霜", "美妆护肤", "海蓝之谜"),
- ("飞利浦剃须刀", "个人护理", "飞利浦"),
- ("松下电饭煲", "家用电器", "松下"),
- ]
- suppliers = [
- "北京科技有限公司",
- "上海贸易有限公司",
- "广州电子有限公司",
- "深圳数码有限公司",
- "杭州商贸有限公司",
- ]
- warehouses = ["北京仓", "上海仓", "广州仓", "成都仓", "武汉仓"]
- with conn.cursor() as cur:
- records: list[tuple[Any, ...]] = []
- for i in range(num_records):
- product = random.choice(products)
- sku = f"SKU{100000 + i}"
- current_stock = random.randint(0, 1000)
- safety_stock = random.randint(50, 200)
- max_stock = random.randint(800, 2000)
- unit_cost = round(random.uniform(10, 5000), 2)
- selling_price = round(unit_cost * random.uniform(1.2, 2.0), 2)
- if current_stock == 0:
- stock_status = "缺货"
- elif current_stock < safety_stock:
- stock_status = "库存不足"
- elif current_stock > max_stock * 0.9:
- stock_status = "库存过剩"
- else:
- stock_status = "正常"
- last_inbound = datetime.now() - timedelta(days=random.randint(1, 60))
- last_outbound = datetime.now() - timedelta(days=random.randint(1, 30))
- inbound_30d = random.randint(0, 500)
- outbound_30d = random.randint(0, 400)
- turnover = min(round(outbound_30d / max(current_stock, 1) * 30, 2), 999.99)
- records.append(
- (
- sku,
- f"{product[0]} - 型号{chr(65 + i % 26)}",
- product[1],
- product[2],
- random.choice(suppliers),
- random.choice(warehouses),
- current_stock,
- safety_stock,
- max_stock,
- unit_cost,
- selling_price,
- stock_status,
- last_inbound.date(),
- last_outbound.date(),
- inbound_30d,
- outbound_30d,
- turnover,
- random.choice([True, True, True, False]),
- )
- )
- cur.executemany(
- """
- INSERT INTO public.test_product_inventory (
- sku, product_name, category, brand, supplier, warehouse,
- current_stock, safety_stock, max_stock, unit_cost, selling_price,
- stock_status, last_inbound_date, last_outbound_date,
- inbound_quantity_30d, outbound_quantity_30d, turnover_rate, is_active
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """,
- records,
- )
- conn.commit()
- logger.info(f"成功插入 {num_records} 条商品库存数据")
- def update_data_products_stats(conn) -> None:
- """更新 data_products 表中的统计信息"""
- logger.info("更新 data_products 表统计信息...")
- tables_info = [
- ("test_sales_data", 17),
- ("test_user_statistics", 22),
- ("test_product_inventory", 21),
- ]
- with conn.cursor() as cur:
- for table_name, column_count in tables_info:
- # 获取记录数
- cur.execute(f"SELECT COUNT(*) FROM public.{table_name}")
- record_count = cur.fetchone()[0]
- # 更新 data_products 表
- cur.execute(
- """
- UPDATE public.data_products
- SET record_count = %s,
- column_count = %s,
- last_updated_at = CURRENT_TIMESTAMP,
- updated_at = CURRENT_TIMESTAMP,
- status = 'active'
- WHERE target_table = %s AND target_schema = 'public'
- """,
- (record_count, column_count, table_name),
- )
- logger.info(
- f"更新 {table_name}: record_count={record_count}, column_count={column_count}"
- )
- conn.commit()
- logger.info("data_products 统计信息更新完成")
- def main() -> None:
- """主函数"""
- logger.info("=" * 60)
- logger.info("开始创建测试数据表和数据...")
- logger.info("=" * 60)
- try:
- conn = get_connection()
- logger.info("数据库连接成功")
- # 创建表和插入数据
- create_test_sales_data_table(conn)
- insert_test_sales_data(conn, num_records=500)
- create_test_user_statistics_table(conn)
- insert_test_user_statistics(conn, num_records=300)
- create_test_product_inventory_table(conn)
- insert_test_product_inventory(conn, num_records=200)
- # 更新 data_products 统计信息
- update_data_products_stats(conn)
- conn.close()
- logger.info("=" * 60)
- logger.info("所有测试数据创建完成!")
- logger.info("=" * 60)
- logger.info("已创建以下测试表:")
- logger.info(" 1. test_sales_data (500条) - 销售数据分析")
- logger.info(" 2. test_user_statistics (300条) - 用户行为统计")
- logger.info(" 3. test_product_inventory (200条) - 商品库存")
- logger.info("=" * 60)
- except Exception as e:
- logger.error(f"创建测试数据失败: {e}")
- raise
- if __name__ == "__main__":
- main()
|