prepare_data_service_test_data_fixed.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. """
  2. 准备数据服务功能的测试数据(修复版)
  3. 包括创建测试数据表和注册数据产品
  4. """
  5. import os
  6. import sys
  7. from datetime import datetime
  8. from pathlib import Path
  9. # 设置控制台编码为 UTF-8(Windows)
  10. if sys.platform == "win32":
  11. import io
  12. sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
  13. sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
  14. # 添加项目根目录到路径
  15. project_root = Path(__file__).parent.parent
  16. sys.path.insert(0, str(project_root))
  17. import psycopg2
  18. from app.config.config import ProductionConfig
  19. def get_db_connection():
  20. """获取数据库连接"""
  21. config = ProductionConfig()
  22. db_uri = config.SQLALCHEMY_DATABASE_URI
  23. uri_parts = db_uri.replace("postgresql://", "").split("@")
  24. user_pass = uri_parts[0].split(":")
  25. username = user_pass[0]
  26. password = user_pass[1] if len(user_pass) > 1 else ""
  27. host_db = uri_parts[1].split("/")
  28. host_port = host_db[0].split(":")
  29. hostname = host_port[0]
  30. port = int(host_port[1]) if len(host_port) > 1 else 5432
  31. database = host_db[1]
  32. return psycopg2.connect(
  33. host=hostname,
  34. port=port,
  35. database=database,
  36. user=username,
  37. password=password,
  38. )
  39. def create_test_tables(conn):
  40. """创建测试数据表"""
  41. cursor = conn.cursor()
  42. print("\n[1/3] 创建测试数据表...")
  43. tables_created = []
  44. # 表1: 销售数据表
  45. try:
  46. cursor.execute("DROP TABLE IF EXISTS test_sales_data CASCADE")
  47. conn.commit()
  48. cursor.execute("""
  49. CREATE TABLE test_sales_data (
  50. id SERIAL PRIMARY KEY,
  51. order_id VARCHAR(50) NOT NULL,
  52. customer_name VARCHAR(100),
  53. product_name VARCHAR(200),
  54. quantity INTEGER,
  55. unit_price DECIMAL(10, 2),
  56. total_amount DECIMAL(10, 2),
  57. order_date DATE,
  58. region VARCHAR(50),
  59. status VARCHAR(20),
  60. create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  61. )
  62. """)
  63. cursor.execute("COMMENT ON TABLE test_sales_data IS '测试销售数据表'")
  64. conn.commit()
  65. tables_created.append("test_sales_data")
  66. print(" ✓ test_sales_data 表创建成功")
  67. except Exception as e:
  68. print(f" ✗ test_sales_data 表创建失败: {e}")
  69. conn.rollback()
  70. # 表2: 用户统计表
  71. try:
  72. cursor.execute("DROP TABLE IF EXISTS test_user_statistics CASCADE")
  73. conn.commit()
  74. cursor.execute("""
  75. CREATE TABLE test_user_statistics (
  76. id SERIAL PRIMARY KEY,
  77. user_id VARCHAR(50) NOT NULL,
  78. username VARCHAR(100),
  79. email VARCHAR(200),
  80. registration_date DATE,
  81. last_login_date DATE,
  82. total_orders INTEGER DEFAULT 0,
  83. total_amount DECIMAL(10, 2) DEFAULT 0,
  84. user_level VARCHAR(20),
  85. is_active BOOLEAN DEFAULT TRUE,
  86. create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  87. )
  88. """)
  89. cursor.execute("COMMENT ON TABLE test_user_statistics IS '测试用户统计表'")
  90. conn.commit()
  91. tables_created.append("test_user_statistics")
  92. print(" ✓ test_user_statistics 表创建成功")
  93. except Exception as e:
  94. print(f" ✗ test_user_statistics 表创建失败: {e}")
  95. conn.rollback()
  96. # 表3: 产品库存表
  97. try:
  98. cursor.execute("DROP TABLE IF EXISTS test_product_inventory CASCADE")
  99. conn.commit()
  100. cursor.execute("""
  101. CREATE TABLE test_product_inventory (
  102. id SERIAL PRIMARY KEY,
  103. product_code VARCHAR(50) UNIQUE NOT NULL,
  104. product_name VARCHAR(200),
  105. category VARCHAR(100),
  106. current_stock INTEGER,
  107. min_stock INTEGER,
  108. max_stock INTEGER,
  109. unit_price DECIMAL(10, 2),
  110. supplier VARCHAR(200),
  111. last_restock_date DATE,
  112. status VARCHAR(20),
  113. create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  114. )
  115. """)
  116. cursor.execute("COMMENT ON TABLE test_product_inventory IS '测试产品库存表'")
  117. conn.commit()
  118. tables_created.append("test_product_inventory")
  119. print(" ✓ test_product_inventory 表创建成功")
  120. except Exception as e:
  121. print(f" ✗ test_product_inventory 表创建失败: {e}")
  122. conn.rollback()
  123. # 验证表是否创建成功(刷新连接以查看最新状态)
  124. conn.rollback() # 确保任何未提交的事务回滚
  125. cursor.execute("""
  126. SELECT table_name FROM information_schema.tables
  127. WHERE table_schema = 'public'
  128. AND table_name IN ('test_sales_data', 'test_user_statistics', 'test_product_inventory')
  129. ORDER BY table_name
  130. """)
  131. created_tables = [row[0] for row in cursor.fetchall()]
  132. print(f"\n[成功] 测试数据表创建完成")
  133. print(f" 已创建表: {', '.join(created_tables) if created_tables else '无'}")
  134. return len(created_tables) > 0
  135. def insert_test_data(conn):
  136. """插入测试数据"""
  137. cursor = conn.cursor()
  138. print("\n[2/3] 插入测试数据...")
  139. # 插入销售数据 (250条)
  140. sales_data = []
  141. regions = ["华东", "华南", "华北", "西南", "西北"]
  142. statuses = ["已完成", "处理中", "已取消"]
  143. products = ["笔记本电脑", "台式机", "显示器", "键盘", "鼠标", "耳机", "音响", "摄像头"]
  144. for i in range(250):
  145. order_date = datetime(2024, 1, 1).replace(day=(i % 28) + 1, month=(i // 28) % 12 + 1)
  146. quantity = (i % 10) + 1
  147. unit_price = round(100.0 + (i % 5000), 2)
  148. total_amount = quantity * unit_price
  149. sales_data.append((
  150. f"ORD{10000 + i}",
  151. f"客户{chr(65 + (i % 26))}{i}",
  152. products[i % len(products)],
  153. quantity,
  154. unit_price,
  155. total_amount,
  156. order_date,
  157. regions[i % len(regions)],
  158. statuses[i % len(statuses)],
  159. ))
  160. cursor.executemany("""
  161. INSERT INTO test_sales_data
  162. (order_id, customer_name, product_name, quantity, unit_price,
  163. total_amount, order_date, region, status)
  164. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
  165. """, sales_data)
  166. # 插入用户统计数据 (200条)
  167. user_data = []
  168. levels = ["普通", "银卡", "金卡", "钻石"]
  169. for i in range(200):
  170. reg_date = datetime(2024, 1, 1).replace(day=(i % 28) + 1, month=(i // 28) % 12 + 1)
  171. login_date = reg_date.replace(day=(reg_date.day + (i % 10)) % 28 + 1)
  172. user_data.append((
  173. f"USER{1000 + i}",
  174. f"user{i}",
  175. f"user{i}@example.com",
  176. reg_date,
  177. login_date,
  178. (i % 50) + 1,
  179. round(1000.0 + (i % 50000), 2),
  180. levels[i % len(levels)],
  181. (i % 10) != 0,
  182. ))
  183. cursor.executemany("""
  184. INSERT INTO test_user_statistics
  185. (user_id, username, email, registration_date, last_login_date,
  186. total_orders, total_amount, user_level, is_active)
  187. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
  188. """, user_data)
  189. # 插入产品库存数据 (150条)
  190. inventory_data = []
  191. categories = ["电子产品", "办公用品", "家具", "服装", "食品"]
  192. suppliers = ["供应商A", "供应商B", "供应商C", "供应商D"]
  193. for i in range(150):
  194. current_stock = (i % 500) + 10
  195. min_stock = 50
  196. max_stock = 1000
  197. inventory_data.append((
  198. f"PROD{10000 + i}",
  199. f"产品{i}",
  200. categories[i % len(categories)],
  201. current_stock,
  202. min_stock,
  203. max_stock,
  204. round(50.0 + (i % 500), 2),
  205. suppliers[i % len(suppliers)],
  206. datetime(2024, 1, 1).replace(day=(i % 28) + 1),
  207. "正常" if current_stock > min_stock else "缺货",
  208. ))
  209. cursor.executemany("""
  210. INSERT INTO test_product_inventory
  211. (product_code, product_name, category, current_stock, min_stock,
  212. max_stock, unit_price, supplier, last_restock_date, status)
  213. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  214. """, inventory_data)
  215. conn.commit()
  216. print(f"[成功] 测试数据插入完成:")
  217. print(f" - test_sales_data: 250 条")
  218. print(f" - test_user_statistics: 200 条")
  219. print(f" - test_product_inventory: 150 条")
  220. def register_data_products(conn):
  221. """注册数据产品到 data_products 表"""
  222. cursor = conn.cursor()
  223. print("\n[3/3] 注册数据产品...")
  224. products = [
  225. {
  226. "product_name": "销售数据分析",
  227. "product_name_en": "test_sales_data",
  228. "target_table": "test_sales_data",
  229. "target_schema": "public",
  230. "description": "销售订单数据分析,包含订单详情、客户信息、产品信息等",
  231. "source_dataflow_id": 1001,
  232. "source_dataflow_name": "销售数据加工流程",
  233. },
  234. {
  235. "product_name": "用户行为统计",
  236. "product_name_en": "test_user_statistics",
  237. "target_table": "test_user_statistics",
  238. "target_schema": "public",
  239. "description": "用户注册、登录、订单统计等行为数据分析",
  240. "source_dataflow_id": 1002,
  241. "source_dataflow_name": "用户数据加工流程",
  242. },
  243. {
  244. "product_name": "产品库存管理",
  245. "product_name_en": "test_product_inventory",
  246. "target_table": "test_product_inventory",
  247. "target_schema": "public",
  248. "description": "产品库存信息,包括库存数量、价格、供应商等信息",
  249. "source_dataflow_id": 1003,
  250. "source_dataflow_name": "库存数据加工流程",
  251. },
  252. ]
  253. for product in products:
  254. table_name = product['target_table']
  255. # 直接尝试查询表(更可靠的方式)
  256. try:
  257. cursor.execute(f'SELECT COUNT(*) FROM "{table_name}"')
  258. except Exception as e:
  259. print(f" [跳过] 表 {table_name} 不存在或无法访问: {e}")
  260. continue
  261. # 获取表的记录数和列数
  262. record_count = cursor.fetchone()[0]
  263. cursor.execute("""
  264. SELECT COUNT(*)
  265. FROM information_schema.columns
  266. WHERE table_schema = %s AND table_name = %s
  267. """, (product['target_schema'], product['target_table']))
  268. column_count = cursor.fetchone()[0]
  269. # 检查是否已存在
  270. cursor.execute("""
  271. SELECT id FROM data_products
  272. WHERE target_schema = %s AND target_table = %s
  273. """, (product['target_schema'], product['target_table']))
  274. existing = cursor.fetchone()
  275. if existing:
  276. cursor.execute("""
  277. UPDATE data_products SET
  278. product_name = %s,
  279. product_name_en = %s,
  280. description = %s,
  281. source_dataflow_id = %s,
  282. source_dataflow_name = %s,
  283. record_count = %s,
  284. column_count = %s,
  285. last_updated_at = CURRENT_TIMESTAMP,
  286. updated_at = CURRENT_TIMESTAMP
  287. WHERE id = %s
  288. """, (
  289. product['product_name'],
  290. product['product_name_en'],
  291. product['description'],
  292. product['source_dataflow_id'],
  293. product['source_dataflow_name'],
  294. record_count,
  295. column_count,
  296. existing[0],
  297. ))
  298. print(f" [更新] {product['product_name']} (ID: {existing[0]})")
  299. else:
  300. cursor.execute("""
  301. INSERT INTO data_products
  302. (product_name, product_name_en, description, source_dataflow_id,
  303. source_dataflow_name, target_table, target_schema, record_count,
  304. column_count, last_updated_at, created_by, status)
  305. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, 'test_script', 'active')
  306. RETURNING id
  307. """, (
  308. product['product_name'],
  309. product['product_name_en'],
  310. product['description'],
  311. product['source_dataflow_id'],
  312. product['source_dataflow_name'],
  313. product['target_table'],
  314. product['target_schema'],
  315. record_count,
  316. column_count,
  317. ))
  318. product_id = cursor.fetchone()[0]
  319. print(f" [创建] {product['product_name']} (ID: {product_id}, 记录数: {record_count})")
  320. conn.commit()
  321. print("[成功] 数据产品注册完成")
  322. def main():
  323. """主函数"""
  324. print("=" * 60)
  325. print("准备数据服务功能测试数据")
  326. print("=" * 60)
  327. env = os.environ.get("FLASK_ENV", "production")
  328. print(f"\n当前环境: {env}")
  329. if env != "production":
  330. response = input("\n警告: 当前不是生产环境,是否继续?(yes/no): ")
  331. if response.lower() != "yes":
  332. print("已取消操作")
  333. return
  334. try:
  335. conn = get_db_connection()
  336. conn.autocommit = False
  337. try:
  338. # 尝试创建表
  339. create_test_tables(conn)
  340. # 直接尝试插入数据(如果表已存在或创建成功,都能继续)
  341. try:
  342. insert_test_data(conn)
  343. register_data_products(conn)
  344. except Exception as e:
  345. print(f"\n[错误] 插入数据或注册产品失败: {e}")
  346. print(" 可能原因: 表不存在或数据已存在")
  347. raise
  348. print("\n" + "=" * 60)
  349. print("[完成] 测试数据准备完成!")
  350. print("=" * 60)
  351. print("\n可以开始测试以下 API 接口:")
  352. print(" 1. GET /api/dataservice/products - 获取数据产品列表")
  353. print(" 2. GET /api/dataservice/products/{id} - 获取产品详情")
  354. print(" 3. GET /api/dataservice/products/{id}/preview - 获取数据预览")
  355. print(" 4. GET /api/dataservice/products/{id}/download - 下载Excel")
  356. print(" 5. POST /api/dataservice/products/{id}/viewed - 标记已查看")
  357. print(" 6. POST /api/dataservice/products/{id}/refresh - 刷新统计信息")
  358. else:
  359. print("\n[警告] 表创建可能失败,但将继续尝试插入数据和注册产品")
  360. finally:
  361. conn.close()
  362. except Exception as e:
  363. print(f"\n[错误] 操作失败: {e}")
  364. import traceback
  365. traceback.print_exc()
  366. sys.exit(1)
  367. if __name__ == "__main__":
  368. main()