production_line.py 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201
  1. from app.core.graph.graph_operations import connect_graph
  2. from flask import current_app
  3. import os
  4. import pandas as pd
  5. from datetime import datetime
  6. import psycopg2
  7. from psycopg2 import sql
  8. import logging
  9. from app.services.neo4j_driver import neo4j_driver
  10. import shutil
  11. import re
  12. from psycopg2.extras import execute_values
  13. import time
  14. from urllib.parse import urlparse, unquote, quote
  15. def production_draw_graph(id, type):
  16. """
  17. 根据节点ID和类型绘制生产线图谱
  18. Args:
  19. id: 节点ID
  20. type: 节点类型(DataModel, DataResource, DataMetric)
  21. Returns:
  22. dict: 包含节点、连线和根节点ID的图谱数据
  23. """
  24. # 获取Neo4j连接
  25. driver = connect_graph()
  26. if not driver:
  27. logger.error("无法连接到数据库")
  28. return {"nodes": [], "lines": [], "rootId": "", "error": "无法连接到数据库"}
  29. try:
  30. # 首先验证节点是否存在
  31. with driver.session() as session:
  32. check_node_query = """
  33. MATCH (n)
  34. WHERE id(n) = $nodeId
  35. RETURN n, labels(n) as labels, n.name_zh as name_zh
  36. """
  37. check_result = session.run(check_node_query, nodeId=id).single()
  38. if not check_result:
  39. logger.error(f"节点不存在: ID={id}")
  40. return {"nodes": [], "lines": [], "rootId": "", "error": "节点不存在"}
  41. actual_type = check_result["labels"][0] # 获取实际的节点类型
  42. node_name = check_result["name_zh"]
  43. # 如果提供的类型与实际类型不匹配,使用实际类型
  44. if type.lower() != actual_type.lower():
  45. logger.warning(f"提供的类型({type})与实际类型({actual_type})不匹配,使用实际类型")
  46. type = actual_type
  47. # 数据模型
  48. if type.lower() == "DataModel":
  49. cql = """
  50. MATCH (n:DataModel)
  51. WHERE id(n) = $nodeId
  52. OPTIONAL MATCH (n)-[r:connection]-(m:meta_node)
  53. OPTIONAL MATCH (n)-[r2:clean_model]-(d:data_standard)
  54. OPTIONAL MATCH (d)-[r3:clean_model]-(m)
  55. WITH
  56. collect({from: toString(id(n)), to: toString(id(m)), text: "包含"}) AS line1,
  57. collect({from: toString(id(n)), to: toString(id(d)), text: "清洗"}) AS line2,
  58. collect({from: toString(id(d)), to: toString(id(m)), text: "清洗"}) AS line3,
  59. collect({id: toString(id(n)), text: n.name_zh, type: "model"}) AS node1,
  60. collect({id: toString(id(m)), text: m.name}) AS node2,
  61. collect({id: toString(id(d)), text: d.name, type: "standard"}) AS node3,n
  62. WITH apoc.coll.toSet(line1 + line2 + line3) AS lines,
  63. apoc.coll.toSet(node1 + node2 + node3) AS nodes,
  64. toString(id(n)) as res
  65. RETURN lines,nodes,res
  66. """
  67. # 数据资源
  68. elif type.lower() == "DataResource":
  69. cql = """
  70. MATCH (n:DataResource)
  71. WHERE id(n) = $nodeId
  72. OPTIONAL MATCH (n)-[r:connection]-(m:meta_node)
  73. OPTIONAL MATCH (n)-[r2:clean_resource]-(d:data_standard)
  74. OPTIONAL MATCH (d)-[r3:clean_resource]-(m)
  75. WITH
  76. collect({from: toString(id(n)), to: toString(id(m)), text: "包含"}) AS lines1,
  77. collect({from: toString(id(n)), to: toString(id(d)), text: "清洗"}) AS lines2,
  78. collect({from: toString(id(d)), to: toString(id(m)), text: "清洗"}) AS lines3,
  79. collect({id: toString(id(n)), text: n.name_zh, type: "resource"}) AS nodes1,
  80. collect({id: toString(id(m)), text: m.name}) AS nodes2,
  81. collect({id: toString(id(d)), text: d.name, type: "standard"}) AS nodes3,n
  82. WITH
  83. apoc.coll.toSet(lines1 + lines2 + lines3) AS lines,
  84. apoc.coll.toSet(nodes1 + nodes2 + nodes3) AS nodes,
  85. toString(id(n)) AS res
  86. RETURN lines, nodes, res
  87. """
  88. # 数据指标
  89. elif type.lower() == "DataMetric":
  90. cql = """
  91. MATCH (n:DataMetric)
  92. WHERE id(n) = $nodeId
  93. OPTIONAL MATCH (n)-[r:connection]-(m:meta_node)
  94. WITH collect({from: toString(id(n)), to: toString(id(m)), text: "处理"}) AS line1,
  95. collect({id: toString(id(n)), text: n.name_zh, type: "metric"}) AS node1,
  96. collect({id: toString(id(m)), text: m.name}) AS node2,n
  97. WITH apoc.coll.toSet(line1) AS lines,
  98. apoc.coll.toSet(node1 + node2) AS nodes,
  99. toString(id(n)) as res
  100. RETURN lines,nodes,res
  101. """
  102. else:
  103. # 处理未知节点类型
  104. cql = """
  105. MATCH (n)
  106. WHERE id(n) = $nodeId
  107. OPTIONAL MATCH (n)-[r]-(m)
  108. WITH collect({from: toString(id(n)), to: toString(id(m)), text: type(r)}) AS lines,
  109. collect({id: toString(id(n)), text: n.name_zh, type: labels(n)[0]}) AS nodes1,
  110. collect({id: toString(id(m)), text: m.name, type: labels(m)[0]}) AS nodes2,
  111. toString(id(n)) as res
  112. RETURN apoc.coll.toSet(lines) AS lines,
  113. apoc.coll.toSet(nodes1 + nodes2) AS nodes,
  114. res
  115. """
  116. with driver.session() as session:
  117. try:
  118. result = session.run(cql, nodeId=id)
  119. data = result.data()
  120. # 如果没有数据,返回节点自身
  121. if not data:
  122. return {
  123. "nodes": [{"id": str(id), "text": node_name, "type": type}],
  124. "lines": [],
  125. "rootId": str(id)
  126. }
  127. res = {}
  128. for item in data:
  129. res = {
  130. "nodes": [record for record in item['nodes'] if record.get('id')],
  131. "lines": [record for record in item['lines'] if record.get('from') and record.get('to')],
  132. "rootId": item['res'],
  133. }
  134. # 确保节点列表不为空
  135. if not res.get("nodes"):
  136. res["nodes"] = [{"id": str(id), "text": node_name, "type": type}]
  137. return res
  138. except Exception as e:
  139. logger.error(f"执行图谱查询失败: {str(e)}")
  140. return {
  141. "nodes": [{"id": str(id), "text": node_name, "type": type}],
  142. "lines": [],
  143. "rootId": str(id),
  144. "error": f"查询执行失败: {str(e)}"
  145. }
  146. except Exception as e:
  147. logger.error(f"生成图谱失败: {str(e)}")
  148. return {"nodes": [], "lines": [], "rootId": "", "error": str(e)}
  149. """
  150. Manual execution functions for production line
  151. Author: paul
  152. Date: 2024-03-20
  153. """
  154. # 配置日志
  155. logger = logging.getLogger(__name__)
  156. # PostgreSQL配置
  157. def get_pg_config():
  158. """从配置文件获取PostgreSQL配置,支持包含特殊字符的密码"""
  159. db_uri = current_app.config['SQLALCHEMY_DATABASE_URI']
  160. # 尝试使用urlparse解析
  161. uri = urlparse(db_uri)
  162. # 如果解析失败或密码包含特殊字符导致解析错误,使用手动解析
  163. if uri.username is None or uri.password is None:
  164. # 手动解析URI: postgresql://username:password@host:port/database
  165. scheme_end = db_uri.find('://')
  166. if scheme_end == -1:
  167. raise ValueError("Invalid database URI format")
  168. auth_and_host = db_uri[scheme_end + 3:] # 跳过 '://'
  169. at_pos = auth_and_host.rfind('@') # 从右向左查找最后一个@
  170. if at_pos == -1:
  171. raise ValueError("Invalid database URI: missing @ separator")
  172. auth_part = auth_and_host[:at_pos]
  173. host_part = auth_and_host[at_pos + 1:]
  174. # 解析用户名和密码(可能包含特殊字符)
  175. colon_pos = auth_part.find(':')
  176. if colon_pos == -1:
  177. username = unquote(auth_part)
  178. password = None
  179. else:
  180. username = unquote(auth_part[:colon_pos])
  181. password = unquote(auth_part[colon_pos + 1:])
  182. # 解析主机、端口和数据库
  183. slash_pos = host_part.find('/')
  184. if slash_pos == -1:
  185. raise ValueError("Invalid database URI: missing database name")
  186. host_port = host_part[:slash_pos]
  187. database = unquote(host_part[slash_pos + 1:])
  188. # 解析主机和端口
  189. colon_pos = host_port.find(':')
  190. if colon_pos == -1:
  191. hostname = host_port
  192. port = 5432
  193. else:
  194. hostname = host_port[:colon_pos]
  195. port = int(host_port[colon_pos + 1:])
  196. else:
  197. # urlparse解析成功,解码可能被URL编码的字段
  198. username = unquote(uri.username) if uri.username else None
  199. password = unquote(uri.password) if uri.password else None
  200. database = unquote(uri.path[1:]) if uri.path and len(uri.path) > 1 else None
  201. hostname = uri.hostname
  202. port = uri.port or 5432
  203. if not all([username, password, database, hostname]):
  204. raise ValueError("Missing required database connection parameters")
  205. return {
  206. 'dbname': database,
  207. 'user': username,
  208. 'password': password,
  209. 'host': hostname,
  210. 'port': str(port)
  211. }
  212. def get_resource_storage_info(resource_id):
  213. """
  214. 获取数据资源的存储位置和元数据信息
  215. Returns:
  216. tuple: (storage_location, name_zh, name_en, metadata_list)
  217. - storage_location: 存储位置
  218. - name_zh: 资源中文名(用于查找Excel文件)
  219. - name_en: 资源英文名(用于数据库表名)
  220. - metadata_list: 元数据列表
  221. """
  222. try:
  223. with neo4j_driver.get_session() as session:
  224. # 获取资源基本信息
  225. resource_query = """
  226. MATCH (n:DataResource)
  227. WHERE id(n) = $resource_id
  228. RETURN n.storage_location as storage_location,
  229. n.name_zh as name_zh,
  230. n.name_en as name_en
  231. """
  232. result = session.run(resource_query, resource_id=int(resource_id))
  233. resource_data = result.single()
  234. if not resource_data:
  235. raise ValueError(f"找不到ID为{resource_id}的数据资源")
  236. if not resource_data['storage_location']:
  237. raise ValueError("存储位置未配置")
  238. # 查询元数据节点
  239. metadata_query = """
  240. MATCH (n:DataResource)-[:INCLUDES]->(m:DataMeta)
  241. WHERE id(n) = $resource_id
  242. RETURN m.name_zh as name, m.name_en as name_en, m.data_type as data_type
  243. """
  244. result = session.run(metadata_query, resource_id=int(resource_id))
  245. metadata_list = [dict(record) for record in result]
  246. # 检查元数据列表是否为空
  247. if not metadata_list:
  248. logger.warning(f"数据资源 {resource_id} 没有元数据节点,将尝试从Excel文件推断元数据")
  249. # 检查英文名是否存在
  250. if not resource_data['name_en']:
  251. raise ValueError("数据资源的英文名不能为空")
  252. return (
  253. resource_data['storage_location'],
  254. resource_data['name_zh'],
  255. resource_data['name_en'],
  256. metadata_list
  257. )
  258. except Exception as e:
  259. logger.error(f"获取资源存储信息失败: {str(e)}")
  260. raise
  261. def check_and_create_table(table_name, metadata_list):
  262. """
  263. 检查并创建PostgreSQL表
  264. Args:
  265. table_name: 表名
  266. metadata_list: 元数据列表
  267. """
  268. try:
  269. conn = psycopg2.connect(**get_pg_config())
  270. cur = conn.cursor()
  271. # 检查schema是否存在
  272. cur.execute("CREATE SCHEMA IF NOT EXISTS ods;")
  273. # 检查表是否存在
  274. cur.execute("""
  275. SELECT EXISTS (
  276. SELECT FROM information_schema.tables
  277. WHERE table_schema = 'ods'
  278. AND table_name = %s
  279. );
  280. """, (table_name,))
  281. table_exists = cur.fetchone()[0]
  282. if not table_exists:
  283. # 如果元数据列表为空,无法创建表
  284. if not metadata_list:
  285. logger.warning(f"元数据列表为空,无法创建表。将在加载数据时自动创建")
  286. return
  287. # 打印元数据列表用于调试
  288. logger.info(f"元数据列表: {metadata_list}")
  289. # 构建建表SQL
  290. columns = [
  291. f"{meta['name_en']} {meta['data_type']}"
  292. for meta in metadata_list
  293. if 'name_en' in meta and meta['name_en'] and 'data_type' in meta and meta['data_type']
  294. ]
  295. if not columns:
  296. logger.warning("没有有效的列定义,无法创建表")
  297. return
  298. sql = f"""
  299. CREATE TABLE ods.{table_name} (
  300. id SERIAL PRIMARY KEY,
  301. {", ".join(columns)},
  302. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  303. )
  304. """
  305. logger.info(f"创建表SQL: {sql}")
  306. cur.execute(sql)
  307. conn.commit()
  308. logger.info(f"表 ods.{table_name} 创建成功")
  309. else:
  310. logger.info(f"表 ods.{table_name} 已存在")
  311. # 检查是否存在insert_dt列,如果存在,移除它(因为我们只使用created_at)
  312. cur.execute(f"""
  313. SELECT EXISTS (
  314. SELECT FROM information_schema.columns
  315. WHERE table_schema = 'ods'
  316. AND table_name = '{table_name}'
  317. AND column_name = 'insert_dt'
  318. );
  319. """)
  320. insert_dt_exists = cur.fetchone()[0]
  321. # 如果insert_dt列存在,记录警告但不进行删除(删除列可能导致数据丢失)
  322. if insert_dt_exists:
  323. logger.warning(f"表 ods.{table_name} 存在冗余的insert_dt列,请考虑后续手动删除")
  324. # 检查是否存在created_at列,如果不存在,添加它
  325. cur.execute(f"""
  326. SELECT EXISTS (
  327. SELECT FROM information_schema.columns
  328. WHERE table_schema = 'ods'
  329. AND table_name = '{table_name}'
  330. AND column_name = 'created_at'
  331. );
  332. """)
  333. created_at_exists = cur.fetchone()[0]
  334. # 如果created_at列不存在,添加它
  335. if not created_at_exists:
  336. alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;"
  337. logger.info(f"添加created_at列: {alter_sql}")
  338. cur.execute(alter_sql)
  339. conn.commit()
  340. # 检查是否需要添加新列
  341. if metadata_list:
  342. # 获取现有列
  343. cur.execute(f"""
  344. SELECT column_name
  345. FROM information_schema.columns
  346. WHERE table_schema = 'ods'
  347. AND table_name = '{table_name}'
  348. """)
  349. existing_columns = [row[0] for row in cur.fetchall()]
  350. # 检查每个元数据是否需要作为新列添加
  351. for meta in metadata_list:
  352. if 'name_en' in meta and meta['name_en'] and meta['name_en'].lower() not in (col.lower() for col in existing_columns):
  353. column_type = meta.get('data_type', 'VARCHAR(255)')
  354. alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN {meta['name_en']} {column_type};"
  355. logger.info(f"添加新列: {alter_sql}")
  356. try:
  357. cur.execute(alter_sql)
  358. conn.commit()
  359. except Exception as e:
  360. logger.error(f"添加列失败: {str(e)}")
  361. except Exception as e:
  362. logger.error(f"创建表失败: {str(e)}")
  363. conn.rollback()
  364. raise
  365. finally:
  366. if cur:
  367. cur.close()
  368. if conn:
  369. conn.close()
  370. def load_excel_to_postgresql(file_path, table_name, metadata_list):
  371. """
  372. 加载Excel数据到PostgreSQL表
  373. Args:
  374. file_path: Excel文件路径
  375. table_name: 表名
  376. metadata_list: 元数据列表
  377. Returns:
  378. int: 加载的记录数
  379. """
  380. conn = None
  381. cur = None
  382. try:
  383. # 读取Excel数据
  384. df = pd.read_excel(file_path)
  385. # 如果Excel文件为空,返回0
  386. if df.empty:
  387. logger.warning(f"Excel文件 {file_path} 为空")
  388. return 0
  389. # 如果元数据列表为空,尝试自动创建表
  390. if not metadata_list:
  391. logger.warning("元数据列表为空,尝试根据Excel文件自动创建表")
  392. # 创建数据库连接
  393. conn = psycopg2.connect(**get_pg_config())
  394. cur = conn.cursor()
  395. # 检查schema是否存在
  396. cur.execute("CREATE SCHEMA IF NOT EXISTS ods;")
  397. # 检查表是否存在
  398. cur.execute(f"""
  399. SELECT EXISTS (
  400. SELECT FROM information_schema.tables
  401. WHERE table_schema = 'ods'
  402. AND table_name = '{table_name}'
  403. );
  404. """)
  405. table_exists = cur.fetchone()[0]
  406. # 如果表不存在,根据DataFrame自动创建
  407. if not table_exists:
  408. # 生成列定义
  409. columns = []
  410. for col_name in df.columns:
  411. # 生成有效的SQL列名
  412. sql_col_name = re.sub(r'\W+', '_', col_name).lower()
  413. # 根据数据类型推断SQL类型
  414. dtype = df[col_name].dtype
  415. if pd.api.types.is_integer_dtype(dtype):
  416. sql_type = 'INTEGER'
  417. elif pd.api.types.is_float_dtype(dtype):
  418. sql_type = 'NUMERIC(15,2)'
  419. elif pd.api.types.is_datetime64_dtype(dtype):
  420. sql_type = 'TIMESTAMP'
  421. elif pd.api.types.is_bool_dtype(dtype):
  422. sql_type = 'BOOLEAN'
  423. else:
  424. sql_type = 'VARCHAR(255)'
  425. columns.append(f"{sql_col_name} {sql_type}")
  426. # 创建表,只包含created_at时间戳字段
  427. create_sql = f"""
  428. CREATE TABLE ods.{table_name} (
  429. id SERIAL PRIMARY KEY,
  430. {', '.join(columns)},
  431. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  432. );
  433. """
  434. logger.info(f"自动生成的建表SQL: {create_sql}")
  435. cur.execute(create_sql)
  436. conn.commit()
  437. logger.info(f"表 ods.{table_name} 自动创建成功")
  438. else:
  439. # 检查是否存在created_at列
  440. cur.execute(f"""
  441. SELECT EXISTS (
  442. SELECT FROM information_schema.columns
  443. WHERE table_schema = 'ods'
  444. AND table_name = '{table_name}'
  445. AND column_name = 'created_at'
  446. );
  447. """)
  448. created_at_exists = cur.fetchone()[0]
  449. # 如果created_at列不存在,添加它
  450. if not created_at_exists:
  451. alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;"
  452. logger.info(f"添加created_at列: {alter_sql}")
  453. cur.execute(alter_sql)
  454. conn.commit()
  455. cur.close()
  456. conn.close()
  457. cur = None
  458. conn = None
  459. # 创建临时元数据列表用于插入数据
  460. metadata_list = []
  461. for col_name in df.columns:
  462. sql_col_name = re.sub(r'\W+', '_', col_name).lower()
  463. metadata_list.append({
  464. 'name_zh': col_name,
  465. 'name_en': sql_col_name
  466. })
  467. # 创建数据库连接
  468. conn = psycopg2.connect(**get_pg_config())
  469. cur = conn.cursor()
  470. # 准备插入数据
  471. records = []
  472. for _, row in df.iterrows():
  473. record = {}
  474. for meta in metadata_list:
  475. if 'name_zh' in meta and meta['name_zh'] in df.columns and 'name_en' in meta:
  476. # 获取Excel中的值
  477. value = row[meta['name_zh']]
  478. # 处理NaN和None值
  479. if pd.isna(value):
  480. value = None
  481. record[meta['name_en']] = value
  482. records.append(record)
  483. # 如果没有有效记录,返回0
  484. if not records:
  485. logger.warning("没有有效记录可插入")
  486. return 0
  487. # 获取列名列表,只包括元数据列(不再包括insert_dt)
  488. columns = [meta['name_en'] for meta in metadata_list if 'name_en' in meta]
  489. if not columns:
  490. logger.warning("没有有效列名")
  491. return 0
  492. # 正确使用execute_values的方式
  493. insert_sql = f"""
  494. INSERT INTO ods.{table_name} ({", ".join(columns)})
  495. VALUES %s
  496. """
  497. # 准备要插入的数据元组
  498. values = []
  499. for record in records:
  500. # 只包含数据列的值,不再需要添加时间戳
  501. row_values = tuple(record.get(col, None) for col in columns)
  502. values.append(row_values)
  503. # 执行批量插入
  504. execute_values(cur, insert_sql, values)
  505. conn.commit()
  506. # 返回插入的记录数
  507. return len(values)
  508. except Exception as e:
  509. logger.error(f"加载Excel数据到PostgreSQL失败: {str(e)}", exc_info=True)
  510. if conn:
  511. conn.rollback()
  512. raise
  513. finally:
  514. if cur:
  515. cur.close()
  516. if conn:
  517. conn.close()
  518. def get_full_storage_path(relative_path):
  519. """
  520. 根据相对路径获取完整的存储路径
  521. Args:
  522. relative_path: Neo4j中存储的相对路径
  523. Returns:
  524. str: 完整的存储路径
  525. """
  526. base_path = current_app.config['UPLOAD_BASE_PATH']
  527. # 移除路径开头的斜杠(如果有)
  528. relative_path = relative_path.lstrip('\\/')
  529. # 根据操作系统使用正确的路径分隔符
  530. if os.name == 'nt': # Windows
  531. full_path = os.path.join(base_path, relative_path.replace('/', '\\'))
  532. else: # Linux/Unix
  533. full_path = os.path.join(base_path, relative_path.replace('\\', '/'))
  534. return os.path.normpath(full_path)
  535. def get_archive_path():
  536. """
  537. 获取当前日期的归档路径
  538. Returns:
  539. str: 归档路径
  540. """
  541. base_path = current_app.config['ARCHIVE_BASE_PATH']
  542. date_folder = datetime.now().strftime('%Y-%m-%d')
  543. archive_path = os.path.join(base_path, date_folder)
  544. # 确保归档目录存在
  545. os.makedirs(archive_path, exist_ok=True)
  546. return archive_path
  547. def archive_excel_file(file_path):
  548. """
  549. 将Excel文件复制到归档目录,保持原始文件名
  550. Args:
  551. file_path: Excel文件的完整路径
  552. Returns:
  553. str: 归档后的文件路径
  554. """
  555. archive_path = get_archive_path()
  556. file_name = os.path.basename(file_path)
  557. archive_file_path = os.path.join(archive_path, file_name)
  558. # 如果文件已经存在于归档目录,替换它
  559. if os.path.exists(archive_file_path):
  560. os.remove(archive_file_path)
  561. logger.info(f"覆盖已存在的归档文件: {archive_file_path}")
  562. # 复制文件到归档目录
  563. shutil.copy2(file_path, archive_file_path)
  564. logger.info(f"文件已归档: {archive_file_path}")
  565. # 删除原始文件
  566. os.remove(file_path)
  567. logger.info(f"删除原始文件: {file_path}")
  568. return archive_file_path
  569. def execute_production_line(resource_id):
  570. """
  571. 执行生产线数据加载
  572. Args:
  573. resource_id: 数据资源ID
  574. Returns:
  575. dict: 执行结果
  576. """
  577. try:
  578. # 首先获取资源信息,判断类型
  579. resource_type = get_resource_type(resource_id)
  580. # 根据资源类型执行不同的加载逻辑
  581. if resource_type == 'ddl':
  582. # DDL类型资源,执行数据库抽取
  583. return execute_ddl_extraction(resource_id)
  584. else:
  585. # 其他类型(structure等),执行Excel数据加载
  586. return execute_excel_loading(resource_id)
  587. except Exception as e:
  588. logger.error(f"执行生产线失败: {str(e)}", exc_info=True)
  589. return {
  590. "status": "error",
  591. "message": str(e)
  592. }
  593. def get_resource_type(resource_id):
  594. """
  595. 获取资源类型
  596. Args:
  597. resource_id: 数据资源ID
  598. Returns:
  599. str: 资源类型,如'ddl'或'structure'
  600. """
  601. try:
  602. with neo4j_driver.get_session() as session:
  603. # 查询资源类型
  604. cypher = """
  605. MATCH (n:DataResource)
  606. WHERE id(n) = $resource_id
  607. RETURN n.type as type
  608. """
  609. result = session.run(cypher, resource_id=int(resource_id))
  610. record = result.single()
  611. if not record:
  612. raise ValueError(f"找不到ID为{resource_id}的数据资源")
  613. return record["type"] or 'structure' # 默认为structure类型
  614. except Exception as e:
  615. logger.error(f"获取资源类型失败: {str(e)}")
  616. raise
  617. def execute_excel_loading(resource_id):
  618. """
  619. 执行Excel文件数据加载(原有的加载逻辑)
  620. Args:
  621. resource_id: 数据资源ID
  622. Returns:
  623. dict: 执行结果
  624. """
  625. try:
  626. # 获取PostgreSQL配置
  627. pg_config = get_pg_config()
  628. # 1. 获取存储信息
  629. storage_location, name_zh, name_en, metadata_list = get_resource_storage_info(resource_id)
  630. # 2. 检查并创建表
  631. check_and_create_table(name_en, metadata_list)
  632. # 3. 获取完整的存储路径并扫描Excel文件
  633. full_storage_path = get_full_storage_path(storage_location)
  634. if not os.path.exists(full_storage_path):
  635. # 如果目录不存在,创建它
  636. try:
  637. os.makedirs(full_storage_path, exist_ok=True)
  638. logger.info(f"创建目录: {full_storage_path}")
  639. except Exception as e:
  640. raise ValueError(f"无法创建存储路径: {full_storage_path}, 错误: {str(e)}")
  641. # 首先使用中文名查找文件
  642. excel_files = []
  643. if name_zh:
  644. excel_files = [
  645. f for f in os.listdir(full_storage_path)
  646. if f.startswith(name_zh) and f.endswith(('.xlsx', '.xls'))
  647. ]
  648. if excel_files:
  649. logger.info(f"使用中文名'{name_zh}'找到Excel文件: {excel_files}")
  650. # 如果使用中文名没找到文件,尝试使用英文名
  651. if not excel_files and name_en:
  652. excel_files = [
  653. f for f in os.listdir(full_storage_path)
  654. if f.startswith(name_en) and f.endswith(('.xlsx', '.xls'))
  655. ]
  656. if excel_files:
  657. logger.info(f"使用英文名'{name_en}'找到Excel文件: {excel_files}")
  658. # 如果两种方式都没找到文件,报错
  659. if not excel_files:
  660. error_msg = (
  661. f"未找到匹配的Excel文件\n"
  662. f"搜索路径: {full_storage_path}\n"
  663. f"尝试查找的文件名模式:\n"
  664. f"1. {name_zh}*.xlsx/xls (中文名)\n"
  665. f"2. {name_en}*.xlsx/xls (英文名)\n"
  666. f"请确认文件已上传到正确位置,且文件名以资源的中文名或英文名开头"
  667. )
  668. logger.error(error_msg)
  669. raise ValueError(error_msg)
  670. # 4. 加载数据并归档文件
  671. total_records = 0
  672. processed_files = []
  673. archived_files = []
  674. for excel_file in excel_files:
  675. file_path = os.path.join(full_storage_path, excel_file)
  676. try:
  677. # 如果元数据为空,尝试从Excel文件中推断
  678. if not metadata_list:
  679. logger.info(f"尝试从Excel文件 {excel_file} 推断元数据")
  680. metadata_list = extract_metadata_from_excel(file_path, name_en)
  681. if metadata_list:
  682. # 重新尝试创建表
  683. check_and_create_table(name_en, metadata_list)
  684. else:
  685. logger.warning("无法从Excel文件推断元数据,将尝试直接加载数据")
  686. # 加载数据到PostgreSQL
  687. records = load_excel_to_postgresql(file_path, name_en, metadata_list)
  688. total_records += records
  689. processed_files.append(excel_file)
  690. # 归档成功处理的文件
  691. archived_path = archive_excel_file(file_path)
  692. archived_files.append(archived_path)
  693. logger.info(f"已处理并归档文件 {excel_file}, 加载 {records} 条记录")
  694. except Exception as e:
  695. logger.error(f"处理文件 {excel_file} 失败: {str(e)}", exc_info=True)
  696. raise
  697. return {
  698. "status": "success",
  699. "message": f"数据加载成功,共处理 {len(processed_files)} 个文件,加载 {total_records} 条记录",
  700. "total_records": total_records,
  701. "files_processed": processed_files,
  702. "files_archived": archived_files
  703. }
  704. except Exception as e:
  705. logger.error(f"执行Excel加载失败: {str(e)}", exc_info=True)
  706. return {
  707. "status": "error",
  708. "message": str(e)
  709. }
  710. def extract_metadata_from_excel(file_path, table_name):
  711. """
  712. 从Excel文件中提取元数据
  713. Args:
  714. file_path: Excel文件路径
  715. table_name: 表名(用于翻译列名)
  716. Returns:
  717. list: 元数据列表
  718. """
  719. try:
  720. # 读取Excel文件的第一行作为列名
  721. df = pd.read_excel(file_path, nrows=0)
  722. if df.empty:
  723. logger.warning(f"Excel文件 {file_path} 为空")
  724. return []
  725. # 获取列名
  726. column_names = df.columns.tolist()
  727. # 翻译列名
  728. metadata_list = []
  729. for name in column_names:
  730. # 使用已有的翻译功能
  731. try:
  732. from app.core.meta_data import translate_and_parse
  733. from app.core.meta_data import infer_column_type
  734. # 翻译列名
  735. name_en = translate_and_parse(name)[0] if name else f"column_{len(metadata_list)}"
  736. # 确保列名是合法的SQL标识符
  737. name_en = re.sub(r'\W+', '_', name_en).lower()
  738. # 推断数据类型
  739. df_sample = pd.read_excel(file_path, nrows=10)
  740. col_index = column_names.index(name)
  741. col_types = infer_column_type(df_sample)
  742. data_type = col_types[col_index] if col_index < len(col_types) else 'VARCHAR(255)'
  743. metadata_list.append({
  744. 'name_zh': name,
  745. 'name_en': name_en,
  746. 'data_type': data_type
  747. })
  748. except Exception as e:
  749. logger.error(f"处理列 {name} 时出错: {str(e)}")
  750. # 使用默认值
  751. name_en = f"column_{len(metadata_list)}"
  752. metadata_list.append({
  753. 'name_zh': name,
  754. 'name_en': name_en,
  755. 'data_type': 'VARCHAR(255)'
  756. })
  757. logger.info(f"从Excel推断出的元数据: {metadata_list}")
  758. return metadata_list
  759. except Exception as e:
  760. logger.error(f"从Excel文件提取元数据失败: {str(e)}")
  761. return []
  762. def execute_ddl_extraction(resource_id):
  763. """
  764. 执行DDL资源数据抽取
  765. Args:
  766. resource_id: 数据资源ID
  767. Returns:
  768. dict: 执行结果
  769. """
  770. try:
  771. from sqlalchemy import create_engine, text
  772. import pandas as pd
  773. logger.info(f"开始执行DDL资源数据抽取,ID: {resource_id}")
  774. # 1. 获取资源详情
  775. resource_data = get_resource_details(resource_id)
  776. if not resource_data:
  777. return {"status": "error", "message": f"资源不存在,ID: {resource_id}"}
  778. # 2. 获取资源元数据
  779. metadata_list = resource_data.get('meta_list', [])
  780. if not metadata_list:
  781. return {"status": "error", "message": "资源没有元数据信息,无法创建表"}
  782. # 3. 获取资源表名
  783. target_table_name = resource_data.get('name_en')
  784. if not target_table_name:
  785. return {"status": "error", "message": "资源没有英文名称,无法确定目标表名"}
  786. # 4. 获取关联的数据源信息
  787. data_source_info = get_resource_data_source(resource_id)
  788. if not data_source_info:
  789. return {"status": "error", "message": "无法获取关联的数据源信息"}
  790. # 5. 在PostgreSQL中创建目标表
  791. create_result = create_target_table(target_table_name, metadata_list)
  792. if not create_result["success"]:
  793. return {"status": "error", "message": f"创建目标表失败: {create_result['message']}"}
  794. # 6. 执行数据抽取
  795. extract_result = extract_data_to_postgres(data_source_info, target_table_name, metadata_list)
  796. return {
  797. "status": "success",
  798. "message": f"数据抽取成功,从{extract_result['source_table']}表抽取到{extract_result['target_table']}表,共处理 {extract_result['total_records']} 条记录,执行了 {extract_result['execution_time']:.2f} 秒",
  799. "total_records": extract_result['total_records'],
  800. "source_table": extract_result['source_table'],
  801. "target_table": extract_result['target_table'],
  802. "execution_time": extract_result['execution_time']
  803. }
  804. except Exception as e:
  805. logger.error(f"DDL数据抽取失败: {str(e)}", exc_info=True)
  806. return {
  807. "status": "error",
  808. "message": str(e)
  809. }
  810. def get_resource_details(resource_id):
  811. """
  812. 获取资源详细信息
  813. Args:
  814. resource_id: 数据资源ID
  815. Returns:
  816. dict: 资源详情
  817. """
  818. from app.core.data_resource.resource import handle_id_resource
  819. return handle_id_resource(resource_id)
  820. def get_resource_data_source(resource_id):
  821. """获取数据资源关联的数据源信息"""
  822. try:
  823. with neo4j_driver.get_session() as session:
  824. # 查询数据资源节点连接的数据源节点
  825. cypher = """
  826. MATCH (n:DataResource)-[:originates_from]->(ds:DataSource)
  827. WHERE id(n) = $resource_id
  828. RETURN ds
  829. """
  830. result = session.run(cypher, resource_id=int(resource_id))
  831. record = result.single()
  832. if not record:
  833. logger.warning(f"资源ID {resource_id} 没有关联的数据源")
  834. return None
  835. # 构建数据源连接信息
  836. data_source = dict(record["ds"])
  837. return {
  838. "type": data_source.get("type", "").lower(),
  839. "host": data_source.get("host"),
  840. "port": data_source.get("port"),
  841. "database": data_source.get("database"),
  842. "username": data_source.get("username"),
  843. "password": data_source.get("password")
  844. # 如果需要其他参数可以添加
  845. # "param": data_source.get("param")
  846. }
  847. except Exception as e:
  848. logger.error(f"获取数据源信息失败: {str(e)}")
  849. return None
  850. def create_target_table(table_name, metadata_list):
  851. """
  852. 在PostgreSQL中创建目标表
  853. Args:
  854. table_name: 表名
  855. metadata_list: 元数据列表
  856. Returns:
  857. dict: {"success": bool, "message": str}
  858. """
  859. try:
  860. import psycopg2
  861. from flask import current_app
  862. # 获取PostgreSQL配置
  863. pg_config = get_pg_config()
  864. conn = psycopg2.connect(**pg_config)
  865. cur = conn.cursor()
  866. # 检查schema是否存在
  867. cur.execute("CREATE SCHEMA IF NOT EXISTS ods;")
  868. # 检查表是否存在
  869. cur.execute("""
  870. SELECT EXISTS (
  871. SELECT FROM information_schema.tables
  872. WHERE table_schema = 'ods'
  873. AND table_name = %s
  874. );
  875. """, (table_name,))
  876. table_exists = cur.fetchone()[0]
  877. if table_exists:
  878. logger.info(f"表 ods.{table_name} 已存在,将跳过创建")
  879. return {"success": True, "message": f"表 ods.{table_name} 已存在"}
  880. # 构建列定义
  881. columns = []
  882. for meta in metadata_list:
  883. column_name = meta.get('name_en')
  884. data_type = meta.get('data_type')
  885. if column_name and data_type:
  886. columns.append(f"{column_name} {data_type}")
  887. if not columns:
  888. return {"success": False, "message": "没有有效的列定义"}
  889. # 构建建表SQL
  890. sql = f"""
  891. CREATE TABLE ods.{table_name} (
  892. id SERIAL PRIMARY KEY,
  893. {", ".join(columns)},
  894. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  895. )
  896. """
  897. logger.info(f"创建表SQL: {sql}")
  898. cur.execute(sql)
  899. conn.commit()
  900. logger.info(f"表 ods.{table_name} 创建成功")
  901. return {"success": True, "message": f"表 ods.{table_name} 创建成功"}
  902. except Exception as e:
  903. logger.error(f"创建目标表失败: {str(e)}")
  904. if 'conn' in locals() and conn:
  905. conn.rollback()
  906. return {"success": False, "message": str(e)}
  907. finally:
  908. if 'cur' in locals() and cur:
  909. cur.close()
  910. if 'conn' in locals() and conn:
  911. conn.close()
  912. def extract_data_to_postgres(source_conn_info, target_table, metadata_list):
  913. """
  914. 从源数据库抽取数据到PostgreSQL
  915. Args:
  916. source_conn_info: 源数据库连接信息
  917. target_table: 目标表名
  918. metadata_list: 元数据列表
  919. Returns:
  920. dict: 抽取结果
  921. """
  922. try:
  923. from sqlalchemy import create_engine, text
  924. import pandas as pd
  925. from flask import current_app
  926. # 源表名称与目标表相同
  927. source_table = target_table
  928. # 批处理大小
  929. batch_size = current_app.config.get('DATA_EXTRACT_BATCH_SIZE', 1000)
  930. # 源数据库连接字符串构建
  931. db_type = source_conn_info["type"]
  932. if db_type == "mysql":
  933. # 对用户名、密码和数据库名进行URL编码,处理特殊字符
  934. encoded_username = quote(source_conn_info['username'], safe='')
  935. encoded_password = quote(source_conn_info['password'], safe='')
  936. encoded_database = quote(source_conn_info['database'], safe='')
  937. connection_string = f"mysql+pymysql://{encoded_username}:{encoded_password}@{source_conn_info['host']}:{source_conn_info['port']}/{encoded_database}"
  938. # 检查是否存在param参数,如存在则添加到连接字符串中
  939. if 'param' in source_conn_info and source_conn_info['param']:
  940. # 确保param参数以&开头
  941. param = source_conn_info['param']
  942. if not param.startswith('&'):
  943. param = '&' + param
  944. connection_string = f"{connection_string}?{param[1:]}"
  945. logger.debug(f"添加了数据源的param参数: {param}")
  946. elif db_type == "postgresql":
  947. # 对用户名、密码和数据库名进行URL编码,处理特殊字符
  948. encoded_username = quote(source_conn_info['username'], safe='')
  949. encoded_password = quote(source_conn_info['password'], safe='')
  950. encoded_database = quote(source_conn_info['database'], safe='')
  951. connection_string = f"postgresql://{encoded_username}:{encoded_password}@{source_conn_info['host']}:{source_conn_info['port']}/{encoded_database}"
  952. else:
  953. raise ValueError(f"不支持的数据库类型: {db_type}")
  954. # 目标数据库连接参数
  955. pg_config = get_pg_config()
  956. # 对用户名、密码和数据库名进行URL编码,处理特殊字符
  957. encoded_user = quote(pg_config['user'], safe='')
  958. encoded_password = quote(pg_config['password'], safe='')
  959. encoded_dbname = quote(pg_config['dbname'], safe='')
  960. target_connection_string = f"postgresql://{encoded_user}:{encoded_password}@{pg_config['host']}:{pg_config['port']}/{encoded_dbname}"
  961. # 记录最终连接字符串
  962. logger.debug(f"python连接源表的最终连接字符串: {connection_string}")
  963. # 连接源数据库
  964. source_engine = create_engine(connection_string)
  965. # 连接目标数据库
  966. target_engine = create_engine(target_connection_string)
  967. # 获取元数据列名,构建查询字段列表
  968. column_names = [meta.get('name_en') for meta in metadata_list if meta.get('name_en')]
  969. if not column_names:
  970. raise ValueError("没有有效的列名")
  971. # 构建查询语句
  972. select_columns = ", ".join(column_names)
  973. query = f"SELECT {select_columns} FROM {source_table}"
  974. # 获取记录总数
  975. with source_engine.connect() as conn:
  976. count_result = conn.execute(text(f"SELECT COUNT(*) FROM {source_table}"))
  977. total_count = count_result.scalar()
  978. # 分批抽取数据
  979. total_records = 0
  980. offset = 0
  981. # 计算开始时间
  982. start_time = time.time()
  983. while offset < total_count:
  984. # 构建带有分页的查询
  985. paginated_query = f"{query} LIMIT {batch_size} OFFSET {offset}"
  986. # 读取数据批次
  987. df = pd.read_sql(paginated_query, source_engine)
  988. batch_count = len(df)
  989. if batch_count == 0:
  990. break
  991. # 写入目标数据库
  992. df.to_sql(
  993. target_table,
  994. target_engine,
  995. schema='ods',
  996. if_exists='append',
  997. index=False
  998. )
  999. total_records += batch_count
  1000. offset += batch_size
  1001. logger.info(f"已抽取 {total_records}/{total_count} 条记录")
  1002. # 计算执行时间
  1003. end_time = time.time()
  1004. execution_time = end_time - start_time
  1005. logger.info(f"作业抽取了 {total_records} 条记录,执行了 {execution_time:.2f} 秒")
  1006. return {
  1007. "total_records": total_records,
  1008. "source_table": source_table,
  1009. "target_table": f"ods.{target_table}",
  1010. "execution_time": execution_time
  1011. }
  1012. except Exception as e:
  1013. logger.error(f"数据抽取失败: {str(e)}")
  1014. raise