prepare_data_service_test_data.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. """
  2. 准备数据服务功能的测试数据
  3. 包括创建测试数据表和注册数据产品
  4. """
  5. import os
  6. import sys
  7. from datetime import datetime
  8. from pathlib import Path
  9. # 设置控制台编码为 UTF-8(Windows)
  10. if sys.platform == "win32":
  11. import io
  12. sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
  13. sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
  14. # 添加项目根目录到路径
  15. project_root = Path(__file__).parent.parent
  16. sys.path.insert(0, str(project_root))
  17. import psycopg2
  18. from app.config.config import ProductionConfig
  19. def get_db_connection():
  20. """获取数据库连接"""
  21. config = ProductionConfig()
  22. db_uri = config.SQLALCHEMY_DATABASE_URI
  23. # 解析数据库连接URI
  24. uri_parts = db_uri.replace("postgresql://", "").split("@")
  25. if len(uri_parts) != 2:
  26. raise ValueError(f"无效的数据库URI格式: {db_uri}")
  27. user_pass = uri_parts[0].split(":")
  28. username = user_pass[0]
  29. password = user_pass[1] if len(user_pass) > 1 else ""
  30. host_db = uri_parts[1].split("/")
  31. if len(host_db) != 2:
  32. raise ValueError(f"无效的数据库URI格式: {db_uri}")
  33. host_port = host_db[0].split(":")
  34. hostname = host_port[0]
  35. port = int(host_port[1]) if len(host_port) > 1 else 5432
  36. database = host_db[1]
  37. return psycopg2.connect(
  38. host=hostname,
  39. port=port,
  40. database=database,
  41. user=username,
  42. password=password,
  43. )
  44. def create_test_tables(conn):
  45. """创建测试数据表"""
  46. cursor = conn.cursor()
  47. print("\n[1/3] 创建测试数据表...")
  48. # 直接使用代码创建表(更可靠)
  49. tables_created = []
  50. # 表1: 销售数据表
  51. try:
  52. cursor.execute("DROP TABLE IF EXISTS test_sales_data CASCADE")
  53. conn.commit()
  54. cursor.execute("""
  55. CREATE TABLE test_sales_data (
  56. id SERIAL PRIMARY KEY,
  57. order_id VARCHAR(50) NOT NULL,
  58. customer_name VARCHAR(100),
  59. product_name VARCHAR(200),
  60. quantity INTEGER,
  61. unit_price DECIMAL(10, 2),
  62. total_amount DECIMAL(10, 2),
  63. order_date DATE,
  64. region VARCHAR(50),
  65. status VARCHAR(20),
  66. create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  67. )
  68. """)
  69. cursor.execute("COMMENT ON TABLE test_sales_data IS '测试销售数据表'")
  70. conn.commit()
  71. tables_created.append("test_sales_data")
  72. print(" ✓ test_sales_data 表创建成功")
  73. except Exception as e:
  74. print(f" ✗ test_sales_data 表创建失败: {e}")
  75. conn.rollback()
  76. # 表2: 用户统计表
  77. try:
  78. cursor.execute("DROP TABLE IF EXISTS test_user_statistics CASCADE")
  79. conn.commit()
  80. cursor.execute("""
  81. CREATE TABLE test_user_statistics (
  82. id SERIAL PRIMARY KEY,
  83. user_id VARCHAR(50) NOT NULL,
  84. username VARCHAR(100),
  85. email VARCHAR(200),
  86. registration_date DATE,
  87. last_login_date DATE,
  88. total_orders INTEGER DEFAULT 0,
  89. total_amount DECIMAL(10, 2) DEFAULT 0,
  90. user_level VARCHAR(20),
  91. is_active BOOLEAN DEFAULT TRUE,
  92. create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  93. )
  94. """)
  95. cursor.execute("COMMENT ON TABLE test_user_statistics IS '测试用户统计表'")
  96. conn.commit()
  97. tables_created.append("test_user_statistics")
  98. print(" ✓ test_user_statistics 表创建成功")
  99. except Exception as e:
  100. print(f" ✗ test_user_statistics 表创建失败: {e}")
  101. conn.rollback()
  102. # 表3: 产品库存表
  103. try:
  104. cursor.execute("DROP TABLE IF EXISTS test_product_inventory CASCADE")
  105. conn.commit()
  106. cursor.execute("""
  107. CREATE TABLE test_product_inventory (
  108. id SERIAL PRIMARY KEY,
  109. product_code VARCHAR(50) UNIQUE NOT NULL,
  110. product_name VARCHAR(200),
  111. category VARCHAR(100),
  112. current_stock INTEGER,
  113. min_stock INTEGER,
  114. max_stock INTEGER,
  115. unit_price DECIMAL(10, 2),
  116. supplier VARCHAR(200),
  117. last_restock_date DATE,
  118. status VARCHAR(20),
  119. create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  120. )
  121. """)
  122. cursor.execute("COMMENT ON TABLE test_product_inventory IS '测试产品库存表'")
  123. conn.commit()
  124. tables_created.append("test_product_inventory")
  125. print(" ✓ test_product_inventory 表创建成功")
  126. except Exception as e:
  127. print(f" ✗ test_product_inventory 表创建失败: {e}")
  128. conn.rollback()
  129. else:
  130. # 如果SQL文件不存在,使用代码创建
  131. print(" SQL文件不存在,使用代码创建表...")
  132. # 表1: 销售数据表
  133. try:
  134. cursor.execute("DROP TABLE IF EXISTS test_sales_data CASCADE")
  135. cursor.execute("""
  136. CREATE TABLE test_sales_data (
  137. id SERIAL PRIMARY KEY,
  138. order_id VARCHAR(50) NOT NULL,
  139. customer_name VARCHAR(100),
  140. product_name VARCHAR(200),
  141. quantity INTEGER,
  142. unit_price DECIMAL(10, 2),
  143. total_amount DECIMAL(10, 2),
  144. order_date DATE,
  145. region VARCHAR(50),
  146. status VARCHAR(20),
  147. create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  148. )
  149. """)
  150. cursor.execute("COMMENT ON TABLE test_sales_data IS '测试销售数据表'")
  151. print(" ✓ test_sales_data 表创建成功")
  152. except Exception as e:
  153. print(f" ✗ test_sales_data 表创建失败: {e}")
  154. conn.rollback()
  155. # 表2: 用户统计表
  156. try:
  157. cursor.execute("DROP TABLE IF EXISTS test_user_statistics CASCADE")
  158. cursor.execute("""
  159. CREATE TABLE test_user_statistics (
  160. id SERIAL PRIMARY KEY,
  161. user_id VARCHAR(50) NOT NULL,
  162. username VARCHAR(100),
  163. email VARCHAR(200),
  164. registration_date DATE,
  165. last_login_date DATE,
  166. total_orders INTEGER DEFAULT 0,
  167. total_amount DECIMAL(10, 2) DEFAULT 0,
  168. user_level VARCHAR(20),
  169. is_active BOOLEAN DEFAULT TRUE,
  170. create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  171. )
  172. """)
  173. cursor.execute("COMMENT ON TABLE test_user_statistics IS '测试用户统计表'")
  174. print(" ✓ test_user_statistics 表创建成功")
  175. except Exception as e:
  176. print(f" ✗ test_user_statistics 表创建失败: {e}")
  177. conn.rollback()
  178. # 表3: 产品库存表
  179. try:
  180. cursor.execute("DROP TABLE IF EXISTS test_product_inventory CASCADE")
  181. cursor.execute("""
  182. CREATE TABLE test_product_inventory (
  183. id SERIAL PRIMARY KEY,
  184. product_code VARCHAR(50) UNIQUE NOT NULL,
  185. product_name VARCHAR(200),
  186. category VARCHAR(100),
  187. current_stock INTEGER,
  188. min_stock INTEGER,
  189. max_stock INTEGER,
  190. unit_price DECIMAL(10, 2),
  191. supplier VARCHAR(200),
  192. last_restock_date DATE,
  193. status VARCHAR(20),
  194. create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  195. )
  196. """)
  197. cursor.execute("COMMENT ON TABLE test_product_inventory IS '测试产品库存表'")
  198. print(" ✓ test_product_inventory 表创建成功")
  199. except Exception as e:
  200. print(f" ✗ test_product_inventory 表创建失败: {e}")
  201. conn.rollback()
  202. conn.commit()
  203. # 验证表是否创建成功
  204. cursor.execute("""
  205. SELECT table_name FROM information_schema.tables
  206. WHERE table_schema = 'public'
  207. AND table_name IN ('test_sales_data', 'test_user_statistics', 'test_product_inventory')
  208. ORDER BY table_name
  209. """)
  210. created_tables = [row[0] for row in cursor.fetchall()]
  211. print(f"\n[成功] 测试数据表创建完成")
  212. print(f" 已创建表: {', '.join(created_tables) if created_tables else '无'}")
  213. if not created_tables:
  214. print(" [错误] 没有成功创建任何表,请检查错误信息")
  215. return False
  216. return True
  217. def insert_test_data(conn):
  218. """插入测试数据"""
  219. cursor = conn.cursor()
  220. print("\n[2/3] 插入测试数据...")
  221. # 插入销售数据 (250条)
  222. sales_data = []
  223. regions = ["华东", "华南", "华北", "西南", "西北"]
  224. statuses = ["已完成", "处理中", "已取消"]
  225. products = ["笔记本电脑", "台式机", "显示器", "键盘", "鼠标", "耳机", "音响", "摄像头"]
  226. for i in range(250):
  227. order_date = datetime(2024, 1, 1).replace(day=(i % 28) + 1, month=(i // 28) % 12 + 1)
  228. quantity = (i % 10) + 1
  229. unit_price = round(100.0 + (i % 5000), 2)
  230. total_amount = quantity * unit_price
  231. sales_data.append((
  232. f"ORD{10000 + i}",
  233. f"客户{chr(65 + (i % 26))}{i}",
  234. products[i % len(products)],
  235. quantity,
  236. unit_price,
  237. total_amount,
  238. order_date,
  239. regions[i % len(regions)],
  240. statuses[i % len(statuses)],
  241. ))
  242. cursor.executemany("""
  243. INSERT INTO test_sales_data
  244. (order_id, customer_name, product_name, quantity, unit_price,
  245. total_amount, order_date, region, status)
  246. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
  247. """, sales_data)
  248. # 插入用户统计数据 (200条)
  249. user_data = []
  250. levels = ["普通", "银卡", "金卡", "钻石"]
  251. for i in range(200):
  252. reg_date = datetime(2024, 1, 1).replace(day=(i % 28) + 1, month=(i // 28) % 12 + 1)
  253. login_date = reg_date.replace(day=(reg_date.day + (i % 10)) % 28 + 1)
  254. user_data.append((
  255. f"USER{1000 + i}",
  256. f"user{i}",
  257. f"user{i}@example.com",
  258. reg_date,
  259. login_date,
  260. (i % 50) + 1,
  261. round(1000.0 + (i % 50000), 2),
  262. levels[i % len(levels)],
  263. (i % 10) != 0, # 每10个用户有一个不活跃
  264. ))
  265. cursor.executemany("""
  266. INSERT INTO test_user_statistics
  267. (user_id, username, email, registration_date, last_login_date,
  268. total_orders, total_amount, user_level, is_active)
  269. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
  270. """, user_data)
  271. # 插入产品库存数据 (150条)
  272. inventory_data = []
  273. categories = ["电子产品", "办公用品", "家具", "服装", "食品"]
  274. suppliers = ["供应商A", "供应商B", "供应商C", "供应商D"]
  275. for i in range(150):
  276. current_stock = (i % 500) + 10
  277. min_stock = 50
  278. max_stock = 1000
  279. inventory_data.append((
  280. f"PROD{10000 + i}",
  281. f"产品{i}",
  282. categories[i % len(categories)],
  283. current_stock,
  284. min_stock,
  285. max_stock,
  286. round(50.0 + (i % 500), 2),
  287. suppliers[i % len(suppliers)],
  288. datetime(2024, 1, 1).replace(day=(i % 28) + 1),
  289. "正常" if current_stock > min_stock else "缺货",
  290. ))
  291. cursor.executemany("""
  292. INSERT INTO test_product_inventory
  293. (product_code, product_name, category, current_stock, min_stock,
  294. max_stock, unit_price, supplier, last_restock_date, status)
  295. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  296. """, inventory_data)
  297. conn.commit()
  298. print(f"[成功] 测试数据插入完成:")
  299. print(f" - test_sales_data: 250 条")
  300. print(f" - test_user_statistics: 200 条")
  301. print(f" - test_product_inventory: 150 条")
  302. def register_data_products(conn):
  303. """注册数据产品到 data_products 表"""
  304. cursor = conn.cursor()
  305. print("\n[3/3] 注册数据产品...")
  306. products = [
  307. {
  308. "product_name": "销售数据分析",
  309. "product_name_en": "test_sales_data",
  310. "target_table": "test_sales_data",
  311. "target_schema": "public",
  312. "description": "销售订单数据分析,包含订单详情、客户信息、产品信息等",
  313. "source_dataflow_id": 1001,
  314. "source_dataflow_name": "销售数据加工流程",
  315. },
  316. {
  317. "product_name": "用户行为统计",
  318. "product_name_en": "test_user_statistics",
  319. "target_table": "test_user_statistics",
  320. "target_schema": "public",
  321. "description": "用户注册、登录、订单统计等行为数据分析",
  322. "source_dataflow_id": 1002,
  323. "source_dataflow_name": "用户数据加工流程",
  324. },
  325. {
  326. "product_name": "产品库存管理",
  327. "product_name_en": "test_product_inventory",
  328. "target_table": "test_product_inventory",
  329. "target_schema": "public",
  330. "description": "产品库存信息,包括库存数量、价格、供应商等信息",
  331. "source_dataflow_id": 1003,
  332. "source_dataflow_name": "库存数据加工流程",
  333. },
  334. ]
  335. for product in products:
  336. # 先检查表是否存在
  337. cursor.execute("""
  338. SELECT EXISTS (
  339. SELECT FROM information_schema.tables
  340. WHERE table_schema = %s AND table_name = %s
  341. )
  342. """, (product['target_schema'], product['target_table']))
  343. table_exists = cursor.fetchone()[0]
  344. if not table_exists:
  345. print(f" [跳过] 表 {product['target_table']} 不存在,跳过注册")
  346. continue
  347. # 获取表的记录数和列数
  348. table_name = product['target_table']
  349. cursor.execute(f'SELECT COUNT(*) FROM "{table_name}"')
  350. record_count = cursor.fetchone()[0]
  351. cursor.execute("""
  352. SELECT COUNT(*)
  353. FROM information_schema.columns
  354. WHERE table_schema = %s AND table_name = %s
  355. """, (product['target_schema'], product['target_table']))
  356. column_count = cursor.fetchone()[0]
  357. # 检查是否已存在
  358. cursor.execute("""
  359. SELECT id FROM data_products
  360. WHERE target_schema = %s AND target_table = %s
  361. """, (product['target_schema'], product['target_table']))
  362. existing = cursor.fetchone()
  363. if existing:
  364. # 更新现有记录
  365. cursor.execute("""
  366. UPDATE data_products SET
  367. product_name = %s,
  368. product_name_en = %s,
  369. description = %s,
  370. source_dataflow_id = %s,
  371. source_dataflow_name = %s,
  372. record_count = %s,
  373. column_count = %s,
  374. last_updated_at = CURRENT_TIMESTAMP,
  375. updated_at = CURRENT_TIMESTAMP
  376. WHERE id = %s
  377. """, (
  378. product['product_name'],
  379. product['product_name_en'],
  380. product['description'],
  381. product['source_dataflow_id'],
  382. product['source_dataflow_name'],
  383. record_count,
  384. column_count,
  385. existing[0],
  386. ))
  387. print(f" [更新] {product['product_name']} (ID: {existing[0]})")
  388. else:
  389. # 插入新记录
  390. cursor.execute("""
  391. INSERT INTO data_products
  392. (product_name, product_name_en, description, source_dataflow_id,
  393. source_dataflow_name, target_table, target_schema, record_count,
  394. column_count, last_updated_at, created_by, status)
  395. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, 'test_script', 'active')
  396. RETURNING id
  397. """, (
  398. product['product_name'],
  399. product['product_name_en'],
  400. product['description'],
  401. product['source_dataflow_id'],
  402. product['source_dataflow_name'],
  403. product['target_table'],
  404. product['target_schema'],
  405. record_count,
  406. column_count,
  407. ))
  408. product_id = cursor.fetchone()[0]
  409. print(f" [创建] {product['product_name']} (ID: {product_id}, 记录数: {record_count})")
  410. conn.commit()
  411. print("[成功] 数据产品注册完成")
  412. def main():
  413. """主函数"""
  414. print("=" * 60)
  415. print("准备数据服务功能测试数据")
  416. print("=" * 60)
  417. env = os.environ.get("FLASK_ENV", "production")
  418. print(f"\n当前环境: {env}")
  419. if env != "production":
  420. response = input("\n警告: 当前不是生产环境,是否继续?(yes/no): ")
  421. if response.lower() != "yes":
  422. print("已取消操作")
  423. return
  424. try:
  425. conn = get_db_connection()
  426. conn.autocommit = False
  427. try:
  428. create_test_tables(conn)
  429. insert_test_data(conn)
  430. register_data_products(conn)
  431. print("\n" + "=" * 60)
  432. print("[完成] 测试数据准备完成!")
  433. print("=" * 60)
  434. print("\n可以开始测试以下 API 接口:")
  435. print(" 1. GET /api/dataservice/products - 获取数据产品列表")
  436. print(" 2. GET /api/dataservice/products/{id} - 获取产品详情")
  437. print(" 3. GET /api/dataservice/products/{id}/preview - 获取数据预览")
  438. print(" 4. GET /api/dataservice/products/{id}/download - 下载Excel")
  439. print(" 5. POST /api/dataservice/products/{id}/viewed - 标记已查看")
  440. print(" 6. POST /api/dataservice/products/{id}/refresh - 刷新统计信息")
  441. finally:
  442. conn.close()
  443. except Exception as e:
  444. print(f"\n[错误] 操作失败: {e}")
  445. import traceback
  446. traceback.print_exc()
  447. sys.exit(1)
  448. if __name__ == "__main__":
  449. main()