create_test_data_tables.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. """
  2. 创建测试数据表脚本
  3. 为 data_products 表中注册的数据产品创建对应的数据表,并填充测试数据
  4. """
  5. from __future__ import annotations
  6. import random
  7. from datetime import datetime, timedelta
  8. from typing import Any
  9. import psycopg2
  10. from loguru import logger
  11. # 生产环境数据库配置
  12. DB_CONFIG = {
  13. "host": "192.168.3.143",
  14. "port": 5432,
  15. "database": "dataops",
  16. "user": "postgres",
  17. "password": "dataOps",
  18. }
  19. def get_connection():
  20. """获取数据库连接"""
  21. return psycopg2.connect(**DB_CONFIG)
  22. def create_test_sales_data_table(conn) -> None:
  23. """
  24. 创建销售数据分析表 test_sales_data
  25. 模拟电商销售数据
  26. """
  27. logger.info("创建 test_sales_data 表...")
  28. with conn.cursor() as cur:
  29. # 删除已存在的表
  30. cur.execute("DROP TABLE IF EXISTS public.test_sales_data CASCADE")
  31. # 创建表
  32. cur.execute("""
  33. CREATE TABLE public.test_sales_data (
  34. id SERIAL PRIMARY KEY,
  35. order_id VARCHAR(50) NOT NULL,
  36. order_date DATE NOT NULL,
  37. customer_id VARCHAR(50) NOT NULL,
  38. customer_name VARCHAR(100),
  39. product_id VARCHAR(50) NOT NULL,
  40. product_name VARCHAR(200),
  41. category VARCHAR(100),
  42. quantity INTEGER NOT NULL,
  43. unit_price DECIMAL(10, 2) NOT NULL,
  44. total_amount DECIMAL(12, 2) NOT NULL,
  45. discount_rate DECIMAL(5, 2) DEFAULT 0,
  46. payment_method VARCHAR(50),
  47. region VARCHAR(100),
  48. city VARCHAR(100),
  49. status VARCHAR(50) DEFAULT 'completed',
  50. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  51. )
  52. """)
  53. # 添加注释
  54. cur.execute(
  55. "COMMENT ON TABLE public.test_sales_data IS '销售数据分析表 - 测试数据'"
  56. )
  57. conn.commit()
  58. logger.info("test_sales_data 表创建成功")
  59. def insert_test_sales_data(conn, num_records: int = 500) -> None:
  60. """
  61. 插入销售测试数据
  62. Args:
  63. conn: 数据库连接
  64. num_records: 要插入的记录数
  65. """
  66. logger.info(f"插入 {num_records} 条销售测试数据...")
  67. # 模拟数据
  68. customers = [
  69. ("C001", "张三"),
  70. ("C002", "李四"),
  71. ("C003", "王五"),
  72. ("C004", "赵六"),
  73. ("C005", "钱七"),
  74. ("C006", "孙八"),
  75. ("C007", "周九"),
  76. ("C008", "吴十"),
  77. ("C009", "郑明"),
  78. ("C010", "陈华"),
  79. ]
  80. products = [
  81. ("P001", "iPhone 15 Pro", "手机数码", 7999.00),
  82. ("P002", "MacBook Pro 14", "电脑办公", 14999.00),
  83. ("P003", "AirPods Pro 2", "手机配件", 1899.00),
  84. ("P004", "iPad Pro 12.9", "平板电脑", 8999.00),
  85. ("P005", "Apple Watch S9", "智能穿戴", 3299.00),
  86. ("P006", "戴森吸尘器 V15", "家用电器", 4990.00),
  87. ("P007", "索尼降噪耳机", "音频设备", 2499.00),
  88. ("P008", "小米电视 75寸", "家用电器", 5999.00),
  89. ("P009", "华为 Mate 60 Pro", "手机数码", 6999.00),
  90. ("P010", "联想ThinkPad X1", "电脑办公", 12999.00),
  91. ]
  92. regions = [
  93. ("华东", ["上海", "杭州", "南京", "苏州", "无锡"]),
  94. ("华北", ["北京", "天津", "石家庄", "太原", "济南"]),
  95. ("华南", ["广州", "深圳", "东莞", "佛山", "珠海"]),
  96. ("西南", ["成都", "重庆", "昆明", "贵阳", "西安"]),
  97. ("华中", ["武汉", "长沙", "郑州", "合肥", "南昌"]),
  98. ]
  99. payment_methods = ["支付宝", "微信支付", "银行卡", "信用卡", "货到付款"]
  100. statuses = ["completed", "completed", "completed", "pending", "cancelled"]
  101. with conn.cursor() as cur:
  102. records: list[tuple[Any, ...]] = []
  103. base_date = datetime.now() - timedelta(days=180)
  104. for i in range(num_records):
  105. order_id = f"ORD{datetime.now().strftime('%Y%m%d')}{i + 1:05d}"
  106. order_date = base_date + timedelta(days=random.randint(0, 180))
  107. customer = random.choice(customers)
  108. product = random.choice(products)
  109. region_data = random.choice(regions)
  110. quantity = random.randint(1, 5)
  111. unit_price = product[3]
  112. discount_rate = random.choice([0, 0, 0, 0.05, 0.10, 0.15, 0.20])
  113. total_amount = round(quantity * unit_price * (1 - discount_rate), 2)
  114. records.append(
  115. (
  116. order_id,
  117. order_date.date(),
  118. customer[0],
  119. customer[1],
  120. product[0],
  121. product[1],
  122. product[2],
  123. quantity,
  124. unit_price,
  125. total_amount,
  126. discount_rate,
  127. random.choice(payment_methods),
  128. region_data[0],
  129. random.choice(region_data[1]),
  130. random.choice(statuses),
  131. )
  132. )
  133. cur.executemany(
  134. """
  135. INSERT INTO public.test_sales_data (
  136. order_id, order_date, customer_id, customer_name,
  137. product_id, product_name, category, quantity,
  138. unit_price, total_amount, discount_rate,
  139. payment_method, region, city, status
  140. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  141. """,
  142. records,
  143. )
  144. conn.commit()
  145. logger.info(f"成功插入 {num_records} 条销售数据")
  146. def create_test_user_statistics_table(conn) -> None:
  147. """
  148. 创建用户行为统计表 test_user_statistics
  149. 模拟用户活跃度和行为数据
  150. """
  151. logger.info("创建 test_user_statistics 表...")
  152. with conn.cursor() as cur:
  153. cur.execute("DROP TABLE IF EXISTS public.test_user_statistics CASCADE")
  154. cur.execute("""
  155. CREATE TABLE public.test_user_statistics (
  156. id SERIAL PRIMARY KEY,
  157. user_id VARCHAR(50) NOT NULL,
  158. username VARCHAR(100),
  159. email VARCHAR(200),
  160. register_date DATE,
  161. last_login_date TIMESTAMP,
  162. login_count INTEGER DEFAULT 0,
  163. total_orders INTEGER DEFAULT 0,
  164. total_amount DECIMAL(12, 2) DEFAULT 0,
  165. avg_order_amount DECIMAL(10, 2) DEFAULT 0,
  166. favorite_category VARCHAR(100),
  167. user_level VARCHAR(50),
  168. points INTEGER DEFAULT 0,
  169. is_vip BOOLEAN DEFAULT FALSE,
  170. device_type VARCHAR(50),
  171. platform VARCHAR(50),
  172. province VARCHAR(100),
  173. city VARCHAR(100),
  174. age_group VARCHAR(50),
  175. gender VARCHAR(20),
  176. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  177. updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  178. )
  179. """)
  180. cur.execute(
  181. "COMMENT ON TABLE public.test_user_statistics IS '用户行为统计表 - 测试数据'"
  182. )
  183. conn.commit()
  184. logger.info("test_user_statistics 表创建成功")
  185. def insert_test_user_statistics(conn, num_records: int = 300) -> None:
  186. """
  187. 插入用户统计测试数据
  188. Args:
  189. conn: 数据库连接
  190. num_records: 要插入的记录数
  191. """
  192. logger.info(f"插入 {num_records} 条用户统计测试数据...")
  193. names = [
  194. "张伟",
  195. "王芳",
  196. "李娜",
  197. "刘洋",
  198. "陈明",
  199. "杨静",
  200. "赵强",
  201. "黄丽",
  202. "周杰",
  203. "吴敏",
  204. "徐涛",
  205. "孙燕",
  206. "马超",
  207. "朱婷",
  208. "胡磊",
  209. "郭琳",
  210. "林峰",
  211. "何雪",
  212. "高飞",
  213. "梁慧",
  214. "郑鹏",
  215. "谢雨",
  216. "韩冰",
  217. "唐昊",
  218. ]
  219. categories = [
  220. "手机数码",
  221. "电脑办公",
  222. "家用电器",
  223. "服装鞋帽",
  224. "美妆护肤",
  225. "食品生鲜",
  226. ]
  227. levels = ["普通用户", "银牌会员", "金牌会员", "钻石会员", "至尊会员"]
  228. devices = ["iOS", "Android", "Windows", "macOS", "Web"]
  229. platforms = ["App", "小程序", "PC网页", "H5"]
  230. provinces = ["北京", "上海", "广东", "浙江", "江苏", "四川", "湖北", "山东"]
  231. age_groups = ["18-25", "26-35", "36-45", "46-55", "55+"]
  232. genders = ["男", "女"]
  233. with conn.cursor() as cur:
  234. records: list[tuple[Any, ...]] = []
  235. base_date = datetime.now() - timedelta(days=365)
  236. for i in range(num_records):
  237. user_id = f"U{100000 + i}"
  238. name = random.choice(names)
  239. register_date = base_date + timedelta(days=random.randint(0, 365))
  240. last_login = register_date + timedelta(
  241. days=random.randint(0, (datetime.now() - register_date).days)
  242. )
  243. login_count = random.randint(1, 500)
  244. total_orders = random.randint(0, 100)
  245. total_amount = round(random.uniform(0, 50000), 2) if total_orders > 0 else 0
  246. avg_amount = (
  247. round(total_amount / total_orders, 2) if total_orders > 0 else 0
  248. )
  249. points = random.randint(0, 10000)
  250. is_vip = points > 5000
  251. records.append(
  252. (
  253. user_id,
  254. name,
  255. f"{user_id.lower()}@example.com",
  256. register_date.date(),
  257. last_login,
  258. login_count,
  259. total_orders,
  260. total_amount,
  261. avg_amount,
  262. random.choice(categories),
  263. random.choice(levels),
  264. points,
  265. is_vip,
  266. random.choice(devices),
  267. random.choice(platforms),
  268. random.choice(provinces),
  269. f"{random.choice(provinces)}市",
  270. random.choice(age_groups),
  271. random.choice(genders),
  272. )
  273. )
  274. cur.executemany(
  275. """
  276. INSERT INTO public.test_user_statistics (
  277. user_id, username, email, register_date, last_login_date,
  278. login_count, total_orders, total_amount, avg_order_amount,
  279. favorite_category, user_level, points, is_vip,
  280. device_type, platform, province, city, age_group, gender
  281. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  282. """,
  283. records,
  284. )
  285. conn.commit()
  286. logger.info(f"成功插入 {num_records} 条用户统计数据")
  287. def create_test_product_inventory_table(conn) -> None:
  288. """
  289. 创建商品库存表 test_product_inventory
  290. 模拟商品库存和进销存数据
  291. """
  292. logger.info("创建 test_product_inventory 表...")
  293. with conn.cursor() as cur:
  294. cur.execute("DROP TABLE IF EXISTS public.test_product_inventory CASCADE")
  295. cur.execute("""
  296. CREATE TABLE public.test_product_inventory (
  297. id SERIAL PRIMARY KEY,
  298. sku VARCHAR(50) NOT NULL,
  299. product_name VARCHAR(200) NOT NULL,
  300. category VARCHAR(100),
  301. brand VARCHAR(100),
  302. supplier VARCHAR(200),
  303. warehouse VARCHAR(100),
  304. current_stock INTEGER DEFAULT 0,
  305. safety_stock INTEGER DEFAULT 0,
  306. max_stock INTEGER DEFAULT 0,
  307. unit_cost DECIMAL(10, 2),
  308. selling_price DECIMAL(10, 2),
  309. stock_status VARCHAR(50),
  310. last_inbound_date DATE,
  311. last_outbound_date DATE,
  312. inbound_quantity_30d INTEGER DEFAULT 0,
  313. outbound_quantity_30d INTEGER DEFAULT 0,
  314. turnover_rate DECIMAL(5, 2),
  315. is_active BOOLEAN DEFAULT TRUE,
  316. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  317. updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  318. )
  319. """)
  320. cur.execute(
  321. "COMMENT ON TABLE public.test_product_inventory IS '商品库存表 - 测试数据'"
  322. )
  323. conn.commit()
  324. logger.info("test_product_inventory 表创建成功")
  325. def insert_test_product_inventory(conn, num_records: int = 200) -> None:
  326. """
  327. 插入商品库存测试数据
  328. Args:
  329. conn: 数据库连接
  330. num_records: 要插入的记录数
  331. """
  332. logger.info(f"插入 {num_records} 条商品库存测试数据...")
  333. products = [
  334. ("iPhone 15 Pro", "手机数码", "Apple"),
  335. ("MacBook Pro", "电脑办公", "Apple"),
  336. ("AirPods Pro", "手机配件", "Apple"),
  337. ("华为Mate 60", "手机数码", "华为"),
  338. ("小米14 Pro", "手机数码", "小米"),
  339. ("戴森吸尘器", "家用电器", "戴森"),
  340. ("索尼电视", "家用电器", "索尼"),
  341. ("联想ThinkPad", "电脑办公", "联想"),
  342. ("Nike运动鞋", "服装鞋帽", "Nike"),
  343. ("Adidas外套", "服装鞋帽", "Adidas"),
  344. ("雅诗兰黛精华", "美妆护肤", "雅诗兰黛"),
  345. ("SK-II神仙水", "美妆护肤", "SK-II"),
  346. ("海蓝之谜面霜", "美妆护肤", "海蓝之谜"),
  347. ("飞利浦剃须刀", "个人护理", "飞利浦"),
  348. ("松下电饭煲", "家用电器", "松下"),
  349. ]
  350. suppliers = [
  351. "北京科技有限公司",
  352. "上海贸易有限公司",
  353. "广州电子有限公司",
  354. "深圳数码有限公司",
  355. "杭州商贸有限公司",
  356. ]
  357. warehouses = ["北京仓", "上海仓", "广州仓", "成都仓", "武汉仓"]
  358. with conn.cursor() as cur:
  359. records: list[tuple[Any, ...]] = []
  360. for i in range(num_records):
  361. product = random.choice(products)
  362. sku = f"SKU{100000 + i}"
  363. current_stock = random.randint(0, 1000)
  364. safety_stock = random.randint(50, 200)
  365. max_stock = random.randint(800, 2000)
  366. unit_cost = round(random.uniform(10, 5000), 2)
  367. selling_price = round(unit_cost * random.uniform(1.2, 2.0), 2)
  368. if current_stock == 0:
  369. stock_status = "缺货"
  370. elif current_stock < safety_stock:
  371. stock_status = "库存不足"
  372. elif current_stock > max_stock * 0.9:
  373. stock_status = "库存过剩"
  374. else:
  375. stock_status = "正常"
  376. last_inbound = datetime.now() - timedelta(days=random.randint(1, 60))
  377. last_outbound = datetime.now() - timedelta(days=random.randint(1, 30))
  378. inbound_30d = random.randint(0, 500)
  379. outbound_30d = random.randint(0, 400)
  380. turnover = min(round(outbound_30d / max(current_stock, 1) * 30, 2), 999.99)
  381. records.append(
  382. (
  383. sku,
  384. f"{product[0]} - 型号{chr(65 + i % 26)}",
  385. product[1],
  386. product[2],
  387. random.choice(suppliers),
  388. random.choice(warehouses),
  389. current_stock,
  390. safety_stock,
  391. max_stock,
  392. unit_cost,
  393. selling_price,
  394. stock_status,
  395. last_inbound.date(),
  396. last_outbound.date(),
  397. inbound_30d,
  398. outbound_30d,
  399. turnover,
  400. random.choice([True, True, True, False]),
  401. )
  402. )
  403. cur.executemany(
  404. """
  405. INSERT INTO public.test_product_inventory (
  406. sku, product_name, category, brand, supplier, warehouse,
  407. current_stock, safety_stock, max_stock, unit_cost, selling_price,
  408. stock_status, last_inbound_date, last_outbound_date,
  409. inbound_quantity_30d, outbound_quantity_30d, turnover_rate, is_active
  410. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  411. """,
  412. records,
  413. )
  414. conn.commit()
  415. logger.info(f"成功插入 {num_records} 条商品库存数据")
  416. def update_data_products_stats(conn) -> None:
  417. """更新 data_products 表中的统计信息"""
  418. logger.info("更新 data_products 表统计信息...")
  419. tables_info = [
  420. ("test_sales_data", 17),
  421. ("test_user_statistics", 22),
  422. ("test_product_inventory", 21),
  423. ]
  424. with conn.cursor() as cur:
  425. for table_name, column_count in tables_info:
  426. # 获取记录数
  427. cur.execute(f"SELECT COUNT(*) FROM public.{table_name}")
  428. record_count = cur.fetchone()[0]
  429. # 更新 data_products 表
  430. cur.execute(
  431. """
  432. UPDATE public.data_products
  433. SET record_count = %s,
  434. column_count = %s,
  435. last_updated_at = CURRENT_TIMESTAMP,
  436. updated_at = CURRENT_TIMESTAMP,
  437. status = 'active'
  438. WHERE target_table = %s AND target_schema = 'public'
  439. """,
  440. (record_count, column_count, table_name),
  441. )
  442. logger.info(
  443. f"更新 {table_name}: record_count={record_count}, column_count={column_count}"
  444. )
  445. conn.commit()
  446. logger.info("data_products 统计信息更新完成")
  447. def main() -> None:
  448. """主函数"""
  449. logger.info("=" * 60)
  450. logger.info("开始创建测试数据表和数据...")
  451. logger.info("=" * 60)
  452. try:
  453. conn = get_connection()
  454. logger.info("数据库连接成功")
  455. # 创建表和插入数据
  456. create_test_sales_data_table(conn)
  457. insert_test_sales_data(conn, num_records=500)
  458. create_test_user_statistics_table(conn)
  459. insert_test_user_statistics(conn, num_records=300)
  460. create_test_product_inventory_table(conn)
  461. insert_test_product_inventory(conn, num_records=200)
  462. # 更新 data_products 统计信息
  463. update_data_products_stats(conn)
  464. conn.close()
  465. logger.info("=" * 60)
  466. logger.info("所有测试数据创建完成!")
  467. logger.info("=" * 60)
  468. logger.info("已创建以下测试表:")
  469. logger.info(" 1. test_sales_data (500条) - 销售数据分析")
  470. logger.info(" 2. test_user_statistics (300条) - 用户行为统计")
  471. logger.info(" 3. test_product_inventory (200条) - 商品库存")
  472. logger.info("=" * 60)
  473. except Exception as e:
  474. logger.error(f"创建测试数据失败: {e}")
  475. raise
  476. if __name__ == "__main__":
  477. main()