| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201 |
- from app.core.graph.graph_operations import connect_graph
- from flask import current_app
- import os
- import pandas as pd
- from datetime import datetime
- import psycopg2
- from psycopg2 import sql
- import logging
- from app.services.neo4j_driver import neo4j_driver
- import shutil
- import re
- from psycopg2.extras import execute_values
- import time
- from urllib.parse import urlparse, unquote, quote
- def production_draw_graph(id, type):
- """
- 根据节点ID和类型绘制生产线图谱
-
- Args:
- id: 节点ID
- type: 节点类型(DataModel, DataResource, DataMetric)
-
- Returns:
- dict: 包含节点、连线和根节点ID的图谱数据
- """
- # 获取Neo4j连接
- driver = connect_graph()
- if not driver:
- logger.error("无法连接到数据库")
- return {"nodes": [], "lines": [], "rootId": "", "error": "无法连接到数据库"}
-
- try:
- # 首先验证节点是否存在
- with driver.session() as session:
- check_node_query = """
- MATCH (n)
- WHERE id(n) = $nodeId
- RETURN n, labels(n) as labels, n.name_zh as name_zh
- """
- check_result = session.run(check_node_query, nodeId=id).single()
-
- if not check_result:
- logger.error(f"节点不存在: ID={id}")
- return {"nodes": [], "lines": [], "rootId": "", "error": "节点不存在"}
-
- actual_type = check_result["labels"][0] # 获取实际的节点类型
- node_name = check_result["name_zh"]
-
- # 如果提供的类型与实际类型不匹配,使用实际类型
- if type.lower() != actual_type.lower():
- logger.warning(f"提供的类型({type})与实际类型({actual_type})不匹配,使用实际类型")
- type = actual_type
-
- # 数据模型
- if type.lower() == "DataModel":
- cql = """
- MATCH (n:DataModel)
- WHERE id(n) = $nodeId
- OPTIONAL MATCH (n)-[r:connection]-(m:meta_node)
- OPTIONAL MATCH (n)-[r2:clean_model]-(d:data_standard)
- OPTIONAL MATCH (d)-[r3:clean_model]-(m)
- WITH
- collect({from: toString(id(n)), to: toString(id(m)), text: "包含"}) AS line1,
- collect({from: toString(id(n)), to: toString(id(d)), text: "清洗"}) AS line2,
- collect({from: toString(id(d)), to: toString(id(m)), text: "清洗"}) AS line3,
- collect({id: toString(id(n)), text: n.name_zh, type: "model"}) AS node1,
- collect({id: toString(id(m)), text: m.name}) AS node2,
- collect({id: toString(id(d)), text: d.name, type: "standard"}) AS node3,n
- WITH apoc.coll.toSet(line1 + line2 + line3) AS lines,
- apoc.coll.toSet(node1 + node2 + node3) AS nodes,
- toString(id(n)) as res
- RETURN lines,nodes,res
- """
- # 数据资源
- elif type.lower() == "DataResource":
- cql = """
- MATCH (n:DataResource)
- WHERE id(n) = $nodeId
- OPTIONAL MATCH (n)-[r:connection]-(m:meta_node)
- OPTIONAL MATCH (n)-[r2:clean_resource]-(d:data_standard)
- OPTIONAL MATCH (d)-[r3:clean_resource]-(m)
- WITH
- collect({from: toString(id(n)), to: toString(id(m)), text: "包含"}) AS lines1,
- collect({from: toString(id(n)), to: toString(id(d)), text: "清洗"}) AS lines2,
- collect({from: toString(id(d)), to: toString(id(m)), text: "清洗"}) AS lines3,
- collect({id: toString(id(n)), text: n.name_zh, type: "resource"}) AS nodes1,
- collect({id: toString(id(m)), text: m.name}) AS nodes2,
- collect({id: toString(id(d)), text: d.name, type: "standard"}) AS nodes3,n
- WITH
- apoc.coll.toSet(lines1 + lines2 + lines3) AS lines,
- apoc.coll.toSet(nodes1 + nodes2 + nodes3) AS nodes,
- toString(id(n)) AS res
- RETURN lines, nodes, res
- """
- # 数据指标
- elif type.lower() == "DataMetric":
- cql = """
- MATCH (n:DataMetric)
- WHERE id(n) = $nodeId
- OPTIONAL MATCH (n)-[r:connection]-(m:meta_node)
- WITH collect({from: toString(id(n)), to: toString(id(m)), text: "处理"}) AS line1,
- collect({id: toString(id(n)), text: n.name_zh, type: "metric"}) AS node1,
- collect({id: toString(id(m)), text: m.name}) AS node2,n
- WITH apoc.coll.toSet(line1) AS lines,
- apoc.coll.toSet(node1 + node2) AS nodes,
- toString(id(n)) as res
- RETURN lines,nodes,res
- """
- else:
- # 处理未知节点类型
- cql = """
- MATCH (n)
- WHERE id(n) = $nodeId
- OPTIONAL MATCH (n)-[r]-(m)
- WITH collect({from: toString(id(n)), to: toString(id(m)), text: type(r)}) AS lines,
- collect({id: toString(id(n)), text: n.name_zh, type: labels(n)[0]}) AS nodes1,
- collect({id: toString(id(m)), text: m.name, type: labels(m)[0]}) AS nodes2,
- toString(id(n)) as res
- RETURN apoc.coll.toSet(lines) AS lines,
- apoc.coll.toSet(nodes1 + nodes2) AS nodes,
- res
- """
-
- with driver.session() as session:
- try:
- result = session.run(cql, nodeId=id)
- data = result.data()
-
- # 如果没有数据,返回节点自身
- if not data:
- return {
- "nodes": [{"id": str(id), "text": node_name, "type": type}],
- "lines": [],
- "rootId": str(id)
- }
-
- res = {}
- for item in data:
- res = {
- "nodes": [record for record in item['nodes'] if record.get('id')],
- "lines": [record for record in item['lines'] if record.get('from') and record.get('to')],
- "rootId": item['res'],
- }
-
- # 确保节点列表不为空
- if not res.get("nodes"):
- res["nodes"] = [{"id": str(id), "text": node_name, "type": type}]
-
- return res
- except Exception as e:
- logger.error(f"执行图谱查询失败: {str(e)}")
- return {
- "nodes": [{"id": str(id), "text": node_name, "type": type}],
- "lines": [],
- "rootId": str(id),
- "error": f"查询执行失败: {str(e)}"
- }
-
- except Exception as e:
- logger.error(f"生成图谱失败: {str(e)}")
- return {"nodes": [], "lines": [], "rootId": "", "error": str(e)}
- """
- Manual execution functions for production line
- Author: paul
- Date: 2024-03-20
- """
- # 配置日志
- logger = logging.getLogger(__name__)
- # PostgreSQL配置
- def get_pg_config():
- """从配置文件获取PostgreSQL配置,支持包含特殊字符的密码"""
- db_uri = current_app.config['SQLALCHEMY_DATABASE_URI']
-
- # 尝试使用urlparse解析
- uri = urlparse(db_uri)
-
- # 如果解析失败或密码包含特殊字符导致解析错误,使用手动解析
- if uri.username is None or uri.password is None:
- # 手动解析URI: postgresql://username:password@host:port/database
- scheme_end = db_uri.find('://')
- if scheme_end == -1:
- raise ValueError("Invalid database URI format")
-
- auth_and_host = db_uri[scheme_end + 3:] # 跳过 '://'
- at_pos = auth_and_host.rfind('@') # 从右向左查找最后一个@
-
- if at_pos == -1:
- raise ValueError("Invalid database URI: missing @ separator")
-
- auth_part = auth_and_host[:at_pos]
- host_part = auth_and_host[at_pos + 1:]
-
- # 解析用户名和密码(可能包含特殊字符)
- colon_pos = auth_part.find(':')
- if colon_pos == -1:
- username = unquote(auth_part)
- password = None
- else:
- username = unquote(auth_part[:colon_pos])
- password = unquote(auth_part[colon_pos + 1:])
-
- # 解析主机、端口和数据库
- slash_pos = host_part.find('/')
- if slash_pos == -1:
- raise ValueError("Invalid database URI: missing database name")
-
- host_port = host_part[:slash_pos]
- database = unquote(host_part[slash_pos + 1:])
-
- # 解析主机和端口
- colon_pos = host_port.find(':')
- if colon_pos == -1:
- hostname = host_port
- port = 5432
- else:
- hostname = host_port[:colon_pos]
- port = int(host_port[colon_pos + 1:])
- else:
- # urlparse解析成功,解码可能被URL编码的字段
- username = unquote(uri.username) if uri.username else None
- password = unquote(uri.password) if uri.password else None
- database = unquote(uri.path[1:]) if uri.path and len(uri.path) > 1 else None
- hostname = uri.hostname
- port = uri.port or 5432
-
- if not all([username, password, database, hostname]):
- raise ValueError("Missing required database connection parameters")
-
- return {
- 'dbname': database,
- 'user': username,
- 'password': password,
- 'host': hostname,
- 'port': str(port)
- }
- def get_resource_storage_info(resource_id):
- """
- 获取数据资源的存储位置和元数据信息
-
- Returns:
- tuple: (storage_location, name_zh, name_en, metadata_list)
- - storage_location: 存储位置
- - name_zh: 资源中文名(用于查找Excel文件)
- - name_en: 资源英文名(用于数据库表名)
- - metadata_list: 元数据列表
- """
- try:
- with neo4j_driver.get_session() as session:
- # 获取资源基本信息
- resource_query = """
- MATCH (n:DataResource)
- WHERE id(n) = $resource_id
- RETURN n.storage_location as storage_location,
- n.name_zh as name_zh,
- n.name_en as name_en
- """
- result = session.run(resource_query, resource_id=int(resource_id))
- resource_data = result.single()
-
- if not resource_data:
- raise ValueError(f"找不到ID为{resource_id}的数据资源")
-
- if not resource_data['storage_location']:
- raise ValueError("存储位置未配置")
-
- # 查询元数据节点
- metadata_query = """
- MATCH (n:DataResource)-[:INCLUDES]->(m:DataMeta)
- WHERE id(n) = $resource_id
- RETURN m.name_zh as name, m.name_en as name_en, m.data_type as data_type
- """
- result = session.run(metadata_query, resource_id=int(resource_id))
- metadata_list = [dict(record) for record in result]
-
- # 检查元数据列表是否为空
- if not metadata_list:
- logger.warning(f"数据资源 {resource_id} 没有元数据节点,将尝试从Excel文件推断元数据")
-
- # 检查英文名是否存在
- if not resource_data['name_en']:
- raise ValueError("数据资源的英文名不能为空")
-
- return (
- resource_data['storage_location'],
- resource_data['name_zh'],
- resource_data['name_en'],
- metadata_list
- )
- except Exception as e:
- logger.error(f"获取资源存储信息失败: {str(e)}")
- raise
- def check_and_create_table(table_name, metadata_list):
- """
- 检查并创建PostgreSQL表
-
- Args:
- table_name: 表名
- metadata_list: 元数据列表
- """
- try:
- conn = psycopg2.connect(**get_pg_config())
- cur = conn.cursor()
-
- # 检查schema是否存在
- cur.execute("CREATE SCHEMA IF NOT EXISTS ods;")
-
- # 检查表是否存在
- cur.execute("""
- SELECT EXISTS (
- SELECT FROM information_schema.tables
- WHERE table_schema = 'ods'
- AND table_name = %s
- );
- """, (table_name,))
-
- table_exists = cur.fetchone()[0]
-
- if not table_exists:
- # 如果元数据列表为空,无法创建表
- if not metadata_list:
- logger.warning(f"元数据列表为空,无法创建表。将在加载数据时自动创建")
- return
-
- # 打印元数据列表用于调试
- logger.info(f"元数据列表: {metadata_list}")
-
- # 构建建表SQL
- columns = [
- f"{meta['name_en']} {meta['data_type']}"
- for meta in metadata_list
- if 'name_en' in meta and meta['name_en'] and 'data_type' in meta and meta['data_type']
- ]
-
- if not columns:
- logger.warning("没有有效的列定义,无法创建表")
- return
-
- sql = f"""
- CREATE TABLE ods.{table_name} (
- id SERIAL PRIMARY KEY,
- {", ".join(columns)},
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- """
-
- logger.info(f"创建表SQL: {sql}")
- cur.execute(sql)
- conn.commit()
- logger.info(f"表 ods.{table_name} 创建成功")
- else:
- logger.info(f"表 ods.{table_name} 已存在")
-
- # 检查是否存在insert_dt列,如果存在,移除它(因为我们只使用created_at)
- cur.execute(f"""
- SELECT EXISTS (
- SELECT FROM information_schema.columns
- WHERE table_schema = 'ods'
- AND table_name = '{table_name}'
- AND column_name = 'insert_dt'
- );
- """)
- insert_dt_exists = cur.fetchone()[0]
-
- # 如果insert_dt列存在,记录警告但不进行删除(删除列可能导致数据丢失)
- if insert_dt_exists:
- logger.warning(f"表 ods.{table_name} 存在冗余的insert_dt列,请考虑后续手动删除")
-
- # 检查是否存在created_at列,如果不存在,添加它
- cur.execute(f"""
- SELECT EXISTS (
- SELECT FROM information_schema.columns
- WHERE table_schema = 'ods'
- AND table_name = '{table_name}'
- AND column_name = 'created_at'
- );
- """)
- created_at_exists = cur.fetchone()[0]
-
- # 如果created_at列不存在,添加它
- if not created_at_exists:
- alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;"
- logger.info(f"添加created_at列: {alter_sql}")
- cur.execute(alter_sql)
- conn.commit()
-
- # 检查是否需要添加新列
- if metadata_list:
- # 获取现有列
- cur.execute(f"""
- SELECT column_name
- FROM information_schema.columns
- WHERE table_schema = 'ods'
- AND table_name = '{table_name}'
- """)
- existing_columns = [row[0] for row in cur.fetchall()]
-
- # 检查每个元数据是否需要作为新列添加
- for meta in metadata_list:
- if 'name_en' in meta and meta['name_en'] and meta['name_en'].lower() not in (col.lower() for col in existing_columns):
- column_type = meta.get('data_type', 'VARCHAR(255)')
- alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN {meta['name_en']} {column_type};"
- logger.info(f"添加新列: {alter_sql}")
- try:
- cur.execute(alter_sql)
- conn.commit()
- except Exception as e:
- logger.error(f"添加列失败: {str(e)}")
- except Exception as e:
- logger.error(f"创建表失败: {str(e)}")
- conn.rollback()
- raise
- finally:
- if cur:
- cur.close()
- if conn:
- conn.close()
- def load_excel_to_postgresql(file_path, table_name, metadata_list):
- """
- 加载Excel数据到PostgreSQL表
-
- Args:
- file_path: Excel文件路径
- table_name: 表名
- metadata_list: 元数据列表
-
- Returns:
- int: 加载的记录数
- """
- conn = None
- cur = None
- try:
- # 读取Excel数据
- df = pd.read_excel(file_path)
-
- # 如果Excel文件为空,返回0
- if df.empty:
- logger.warning(f"Excel文件 {file_path} 为空")
- return 0
-
- # 如果元数据列表为空,尝试自动创建表
- if not metadata_list:
- logger.warning("元数据列表为空,尝试根据Excel文件自动创建表")
-
- # 创建数据库连接
- conn = psycopg2.connect(**get_pg_config())
- cur = conn.cursor()
-
- # 检查schema是否存在
- cur.execute("CREATE SCHEMA IF NOT EXISTS ods;")
-
- # 检查表是否存在
- cur.execute(f"""
- SELECT EXISTS (
- SELECT FROM information_schema.tables
- WHERE table_schema = 'ods'
- AND table_name = '{table_name}'
- );
- """)
- table_exists = cur.fetchone()[0]
-
- # 如果表不存在,根据DataFrame自动创建
- if not table_exists:
- # 生成列定义
- columns = []
- for col_name in df.columns:
- # 生成有效的SQL列名
- sql_col_name = re.sub(r'\W+', '_', col_name).lower()
-
- # 根据数据类型推断SQL类型
- dtype = df[col_name].dtype
- if pd.api.types.is_integer_dtype(dtype):
- sql_type = 'INTEGER'
- elif pd.api.types.is_float_dtype(dtype):
- sql_type = 'NUMERIC(15,2)'
- elif pd.api.types.is_datetime64_dtype(dtype):
- sql_type = 'TIMESTAMP'
- elif pd.api.types.is_bool_dtype(dtype):
- sql_type = 'BOOLEAN'
- else:
- sql_type = 'VARCHAR(255)'
-
- columns.append(f"{sql_col_name} {sql_type}")
-
- # 创建表,只包含created_at时间戳字段
- create_sql = f"""
- CREATE TABLE ods.{table_name} (
- id SERIAL PRIMARY KEY,
- {', '.join(columns)},
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- );
- """
- logger.info(f"自动生成的建表SQL: {create_sql}")
- cur.execute(create_sql)
- conn.commit()
- logger.info(f"表 ods.{table_name} 自动创建成功")
- else:
- # 检查是否存在created_at列
- cur.execute(f"""
- SELECT EXISTS (
- SELECT FROM information_schema.columns
- WHERE table_schema = 'ods'
- AND table_name = '{table_name}'
- AND column_name = 'created_at'
- );
- """)
- created_at_exists = cur.fetchone()[0]
-
- # 如果created_at列不存在,添加它
- if not created_at_exists:
- alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;"
- logger.info(f"添加created_at列: {alter_sql}")
- cur.execute(alter_sql)
- conn.commit()
-
- cur.close()
- conn.close()
- cur = None
- conn = None
-
- # 创建临时元数据列表用于插入数据
- metadata_list = []
- for col_name in df.columns:
- sql_col_name = re.sub(r'\W+', '_', col_name).lower()
- metadata_list.append({
- 'name_zh': col_name,
- 'name_en': sql_col_name
- })
-
- # 创建数据库连接
- conn = psycopg2.connect(**get_pg_config())
- cur = conn.cursor()
-
- # 准备插入数据
- records = []
- for _, row in df.iterrows():
- record = {}
- for meta in metadata_list:
- if 'name_zh' in meta and meta['name_zh'] in df.columns and 'name_en' in meta:
- # 获取Excel中的值
- value = row[meta['name_zh']]
- # 处理NaN和None值
- if pd.isna(value):
- value = None
- record[meta['name_en']] = value
- records.append(record)
-
- # 如果没有有效记录,返回0
- if not records:
- logger.warning("没有有效记录可插入")
- return 0
-
- # 获取列名列表,只包括元数据列(不再包括insert_dt)
- columns = [meta['name_en'] for meta in metadata_list if 'name_en' in meta]
- if not columns:
- logger.warning("没有有效列名")
- return 0
-
- # 正确使用execute_values的方式
- insert_sql = f"""
- INSERT INTO ods.{table_name} ({", ".join(columns)})
- VALUES %s
- """
-
- # 准备要插入的数据元组
- values = []
- for record in records:
- # 只包含数据列的值,不再需要添加时间戳
- row_values = tuple(record.get(col, None) for col in columns)
- values.append(row_values)
-
- # 执行批量插入
- execute_values(cur, insert_sql, values)
- conn.commit()
-
- # 返回插入的记录数
- return len(values)
- except Exception as e:
- logger.error(f"加载Excel数据到PostgreSQL失败: {str(e)}", exc_info=True)
- if conn:
- conn.rollback()
- raise
- finally:
- if cur:
- cur.close()
- if conn:
- conn.close()
- def get_full_storage_path(relative_path):
- """
- 根据相对路径获取完整的存储路径
-
- Args:
- relative_path: Neo4j中存储的相对路径
-
- Returns:
- str: 完整的存储路径
- """
- base_path = current_app.config['UPLOAD_BASE_PATH']
- # 移除路径开头的斜杠(如果有)
- relative_path = relative_path.lstrip('\\/')
- # 根据操作系统使用正确的路径分隔符
- if os.name == 'nt': # Windows
- full_path = os.path.join(base_path, relative_path.replace('/', '\\'))
- else: # Linux/Unix
- full_path = os.path.join(base_path, relative_path.replace('\\', '/'))
- return os.path.normpath(full_path)
- def get_archive_path():
- """
- 获取当前日期的归档路径
-
- Returns:
- str: 归档路径
- """
- base_path = current_app.config['ARCHIVE_BASE_PATH']
- date_folder = datetime.now().strftime('%Y-%m-%d')
- archive_path = os.path.join(base_path, date_folder)
-
- # 确保归档目录存在
- os.makedirs(archive_path, exist_ok=True)
- return archive_path
- def archive_excel_file(file_path):
- """
- 将Excel文件复制到归档目录,保持原始文件名
-
- Args:
- file_path: Excel文件的完整路径
-
- Returns:
- str: 归档后的文件路径
- """
- archive_path = get_archive_path()
- file_name = os.path.basename(file_path)
- archive_file_path = os.path.join(archive_path, file_name)
-
- # 如果文件已经存在于归档目录,替换它
- if os.path.exists(archive_file_path):
- os.remove(archive_file_path)
- logger.info(f"覆盖已存在的归档文件: {archive_file_path}")
-
- # 复制文件到归档目录
- shutil.copy2(file_path, archive_file_path)
- logger.info(f"文件已归档: {archive_file_path}")
-
- # 删除原始文件
- os.remove(file_path)
- logger.info(f"删除原始文件: {file_path}")
-
- return archive_file_path
- def execute_production_line(resource_id):
- """
- 执行生产线数据加载
-
- Args:
- resource_id: 数据资源ID
-
- Returns:
- dict: 执行结果
- """
- try:
- # 首先获取资源信息,判断类型
- resource_type = get_resource_type(resource_id)
-
- # 根据资源类型执行不同的加载逻辑
- if resource_type == 'ddl':
- # DDL类型资源,执行数据库抽取
- return execute_ddl_extraction(resource_id)
- else:
- # 其他类型(structure等),执行Excel数据加载
- return execute_excel_loading(resource_id)
- except Exception as e:
- logger.error(f"执行生产线失败: {str(e)}", exc_info=True)
- return {
- "status": "error",
- "message": str(e)
- }
- def get_resource_type(resource_id):
- """
- 获取资源类型
-
- Args:
- resource_id: 数据资源ID
-
- Returns:
- str: 资源类型,如'ddl'或'structure'
- """
- try:
- with neo4j_driver.get_session() as session:
- # 查询资源类型
- cypher = """
- MATCH (n:DataResource)
- WHERE id(n) = $resource_id
- RETURN n.type as type
- """
- result = session.run(cypher, resource_id=int(resource_id))
- record = result.single()
-
- if not record:
- raise ValueError(f"找不到ID为{resource_id}的数据资源")
-
- return record["type"] or 'structure' # 默认为structure类型
- except Exception as e:
- logger.error(f"获取资源类型失败: {str(e)}")
- raise
- def execute_excel_loading(resource_id):
- """
- 执行Excel文件数据加载(原有的加载逻辑)
-
- Args:
- resource_id: 数据资源ID
-
- Returns:
- dict: 执行结果
- """
- try:
- # 获取PostgreSQL配置
- pg_config = get_pg_config()
-
- # 1. 获取存储信息
- storage_location, name_zh, name_en, metadata_list = get_resource_storage_info(resource_id)
-
- # 2. 检查并创建表
- check_and_create_table(name_en, metadata_list)
-
- # 3. 获取完整的存储路径并扫描Excel文件
- full_storage_path = get_full_storage_path(storage_location)
-
- if not os.path.exists(full_storage_path):
- # 如果目录不存在,创建它
- try:
- os.makedirs(full_storage_path, exist_ok=True)
- logger.info(f"创建目录: {full_storage_path}")
- except Exception as e:
- raise ValueError(f"无法创建存储路径: {full_storage_path}, 错误: {str(e)}")
-
- # 首先使用中文名查找文件
- excel_files = []
- if name_zh:
- excel_files = [
- f for f in os.listdir(full_storage_path)
- if f.startswith(name_zh) and f.endswith(('.xlsx', '.xls'))
- ]
- if excel_files:
- logger.info(f"使用中文名'{name_zh}'找到Excel文件: {excel_files}")
-
- # 如果使用中文名没找到文件,尝试使用英文名
- if not excel_files and name_en:
- excel_files = [
- f for f in os.listdir(full_storage_path)
- if f.startswith(name_en) and f.endswith(('.xlsx', '.xls'))
- ]
- if excel_files:
- logger.info(f"使用英文名'{name_en}'找到Excel文件: {excel_files}")
-
- # 如果两种方式都没找到文件,报错
- if not excel_files:
- error_msg = (
- f"未找到匹配的Excel文件\n"
- f"搜索路径: {full_storage_path}\n"
- f"尝试查找的文件名模式:\n"
- f"1. {name_zh}*.xlsx/xls (中文名)\n"
- f"2. {name_en}*.xlsx/xls (英文名)\n"
- f"请确认文件已上传到正确位置,且文件名以资源的中文名或英文名开头"
- )
- logger.error(error_msg)
- raise ValueError(error_msg)
-
- # 4. 加载数据并归档文件
- total_records = 0
- processed_files = []
- archived_files = []
-
- for excel_file in excel_files:
- file_path = os.path.join(full_storage_path, excel_file)
- try:
- # 如果元数据为空,尝试从Excel文件中推断
- if not metadata_list:
- logger.info(f"尝试从Excel文件 {excel_file} 推断元数据")
- metadata_list = extract_metadata_from_excel(file_path, name_en)
- if metadata_list:
- # 重新尝试创建表
- check_and_create_table(name_en, metadata_list)
- else:
- logger.warning("无法从Excel文件推断元数据,将尝试直接加载数据")
-
- # 加载数据到PostgreSQL
- records = load_excel_to_postgresql(file_path, name_en, metadata_list)
- total_records += records
- processed_files.append(excel_file)
-
- # 归档成功处理的文件
- archived_path = archive_excel_file(file_path)
- archived_files.append(archived_path)
-
- logger.info(f"已处理并归档文件 {excel_file}, 加载 {records} 条记录")
- except Exception as e:
- logger.error(f"处理文件 {excel_file} 失败: {str(e)}", exc_info=True)
- raise
-
- return {
- "status": "success",
- "message": f"数据加载成功,共处理 {len(processed_files)} 个文件,加载 {total_records} 条记录",
- "total_records": total_records,
- "files_processed": processed_files,
- "files_archived": archived_files
- }
-
- except Exception as e:
- logger.error(f"执行Excel加载失败: {str(e)}", exc_info=True)
- return {
- "status": "error",
- "message": str(e)
- }
- def extract_metadata_from_excel(file_path, table_name):
- """
- 从Excel文件中提取元数据
-
- Args:
- file_path: Excel文件路径
- table_name: 表名(用于翻译列名)
-
- Returns:
- list: 元数据列表
- """
- try:
- # 读取Excel文件的第一行作为列名
- df = pd.read_excel(file_path, nrows=0)
-
- if df.empty:
- logger.warning(f"Excel文件 {file_path} 为空")
- return []
-
- # 获取列名
- column_names = df.columns.tolist()
-
- # 翻译列名
- metadata_list = []
- for name in column_names:
- # 使用已有的翻译功能
- try:
- from app.core.meta_data import translate_and_parse
- from app.core.meta_data import infer_column_type
-
- # 翻译列名
- name_en = translate_and_parse(name)[0] if name else f"column_{len(metadata_list)}"
-
- # 确保列名是合法的SQL标识符
- name_en = re.sub(r'\W+', '_', name_en).lower()
-
- # 推断数据类型
- df_sample = pd.read_excel(file_path, nrows=10)
- col_index = column_names.index(name)
- col_types = infer_column_type(df_sample)
- data_type = col_types[col_index] if col_index < len(col_types) else 'VARCHAR(255)'
-
- metadata_list.append({
- 'name_zh': name,
- 'name_en': name_en,
- 'data_type': data_type
- })
- except Exception as e:
- logger.error(f"处理列 {name} 时出错: {str(e)}")
- # 使用默认值
- name_en = f"column_{len(metadata_list)}"
- metadata_list.append({
- 'name_zh': name,
- 'name_en': name_en,
- 'data_type': 'VARCHAR(255)'
- })
-
- logger.info(f"从Excel推断出的元数据: {metadata_list}")
- return metadata_list
- except Exception as e:
- logger.error(f"从Excel文件提取元数据失败: {str(e)}")
- return []
- def execute_ddl_extraction(resource_id):
- """
- 执行DDL资源数据抽取
-
- Args:
- resource_id: 数据资源ID
-
- Returns:
- dict: 执行结果
- """
- try:
- from sqlalchemy import create_engine, text
- import pandas as pd
-
- logger.info(f"开始执行DDL资源数据抽取,ID: {resource_id}")
-
- # 1. 获取资源详情
- resource_data = get_resource_details(resource_id)
- if not resource_data:
- return {"status": "error", "message": f"资源不存在,ID: {resource_id}"}
-
- # 2. 获取资源元数据
- metadata_list = resource_data.get('meta_list', [])
- if not metadata_list:
- return {"status": "error", "message": "资源没有元数据信息,无法创建表"}
-
- # 3. 获取资源表名
- target_table_name = resource_data.get('name_en')
- if not target_table_name:
- return {"status": "error", "message": "资源没有英文名称,无法确定目标表名"}
-
- # 4. 获取关联的数据源信息
- data_source_info = get_resource_data_source(resource_id)
- if not data_source_info:
- return {"status": "error", "message": "无法获取关联的数据源信息"}
-
- # 5. 在PostgreSQL中创建目标表
- create_result = create_target_table(target_table_name, metadata_list)
- if not create_result["success"]:
- return {"status": "error", "message": f"创建目标表失败: {create_result['message']}"}
-
- # 6. 执行数据抽取
- extract_result = extract_data_to_postgres(data_source_info, target_table_name, metadata_list)
-
- return {
- "status": "success",
- "message": f"数据抽取成功,从{extract_result['source_table']}表抽取到{extract_result['target_table']}表,共处理 {extract_result['total_records']} 条记录,执行了 {extract_result['execution_time']:.2f} 秒",
- "total_records": extract_result['total_records'],
- "source_table": extract_result['source_table'],
- "target_table": extract_result['target_table'],
- "execution_time": extract_result['execution_time']
- }
-
- except Exception as e:
- logger.error(f"DDL数据抽取失败: {str(e)}", exc_info=True)
- return {
- "status": "error",
- "message": str(e)
- }
- def get_resource_details(resource_id):
- """
- 获取资源详细信息
-
- Args:
- resource_id: 数据资源ID
-
- Returns:
- dict: 资源详情
- """
- from app.core.data_resource.resource import handle_id_resource
- return handle_id_resource(resource_id)
- def get_resource_data_source(resource_id):
- """获取数据资源关联的数据源信息"""
- try:
- with neo4j_driver.get_session() as session:
- # 查询数据资源节点连接的数据源节点
- cypher = """
- MATCH (n:DataResource)-[:originates_from]->(ds:DataSource)
- WHERE id(n) = $resource_id
- RETURN ds
- """
-
- result = session.run(cypher, resource_id=int(resource_id))
- record = result.single()
-
- if not record:
- logger.warning(f"资源ID {resource_id} 没有关联的数据源")
- return None
-
- # 构建数据源连接信息
- data_source = dict(record["ds"])
- return {
- "type": data_source.get("type", "").lower(),
- "host": data_source.get("host"),
- "port": data_source.get("port"),
- "database": data_source.get("database"),
- "username": data_source.get("username"),
- "password": data_source.get("password")
- # 如果需要其他参数可以添加
- # "param": data_source.get("param")
- }
- except Exception as e:
- logger.error(f"获取数据源信息失败: {str(e)}")
- return None
- def create_target_table(table_name, metadata_list):
- """
- 在PostgreSQL中创建目标表
-
- Args:
- table_name: 表名
- metadata_list: 元数据列表
-
- Returns:
- dict: {"success": bool, "message": str}
- """
- try:
- import psycopg2
- from flask import current_app
-
- # 获取PostgreSQL配置
- pg_config = get_pg_config()
-
- conn = psycopg2.connect(**pg_config)
- cur = conn.cursor()
-
- # 检查schema是否存在
- cur.execute("CREATE SCHEMA IF NOT EXISTS ods;")
-
- # 检查表是否存在
- cur.execute("""
- SELECT EXISTS (
- SELECT FROM information_schema.tables
- WHERE table_schema = 'ods'
- AND table_name = %s
- );
- """, (table_name,))
-
- table_exists = cur.fetchone()[0]
-
- if table_exists:
- logger.info(f"表 ods.{table_name} 已存在,将跳过创建")
- return {"success": True, "message": f"表 ods.{table_name} 已存在"}
-
- # 构建列定义
- columns = []
- for meta in metadata_list:
- column_name = meta.get('name_en')
- data_type = meta.get('data_type')
-
- if column_name and data_type:
- columns.append(f"{column_name} {data_type}")
-
- if not columns:
- return {"success": False, "message": "没有有效的列定义"}
-
- # 构建建表SQL
- sql = f"""
- CREATE TABLE ods.{table_name} (
- id SERIAL PRIMARY KEY,
- {", ".join(columns)},
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- """
-
- logger.info(f"创建表SQL: {sql}")
- cur.execute(sql)
- conn.commit()
- logger.info(f"表 ods.{table_name} 创建成功")
-
- return {"success": True, "message": f"表 ods.{table_name} 创建成功"}
-
- except Exception as e:
- logger.error(f"创建目标表失败: {str(e)}")
- if 'conn' in locals() and conn:
- conn.rollback()
- return {"success": False, "message": str(e)}
- finally:
- if 'cur' in locals() and cur:
- cur.close()
- if 'conn' in locals() and conn:
- conn.close()
- def extract_data_to_postgres(source_conn_info, target_table, metadata_list):
- """
- 从源数据库抽取数据到PostgreSQL
-
- Args:
- source_conn_info: 源数据库连接信息
- target_table: 目标表名
- metadata_list: 元数据列表
-
- Returns:
- dict: 抽取结果
- """
- try:
- from sqlalchemy import create_engine, text
- import pandas as pd
- from flask import current_app
-
- # 源表名称与目标表相同
- source_table = target_table
-
- # 批处理大小
- batch_size = current_app.config.get('DATA_EXTRACT_BATCH_SIZE', 1000)
-
- # 源数据库连接字符串构建
- db_type = source_conn_info["type"]
- if db_type == "mysql":
- # 对用户名、密码和数据库名进行URL编码,处理特殊字符
- encoded_username = quote(source_conn_info['username'], safe='')
- encoded_password = quote(source_conn_info['password'], safe='')
- encoded_database = quote(source_conn_info['database'], safe='')
- connection_string = f"mysql+pymysql://{encoded_username}:{encoded_password}@{source_conn_info['host']}:{source_conn_info['port']}/{encoded_database}"
-
- # 检查是否存在param参数,如存在则添加到连接字符串中
- if 'param' in source_conn_info and source_conn_info['param']:
- # 确保param参数以&开头
- param = source_conn_info['param']
- if not param.startswith('&'):
- param = '&' + param
- connection_string = f"{connection_string}?{param[1:]}"
- logger.debug(f"添加了数据源的param参数: {param}")
-
- elif db_type == "postgresql":
- # 对用户名、密码和数据库名进行URL编码,处理特殊字符
- encoded_username = quote(source_conn_info['username'], safe='')
- encoded_password = quote(source_conn_info['password'], safe='')
- encoded_database = quote(source_conn_info['database'], safe='')
- connection_string = f"postgresql://{encoded_username}:{encoded_password}@{source_conn_info['host']}:{source_conn_info['port']}/{encoded_database}"
- else:
- raise ValueError(f"不支持的数据库类型: {db_type}")
-
- # 目标数据库连接参数
- pg_config = get_pg_config()
- # 对用户名、密码和数据库名进行URL编码,处理特殊字符
- encoded_user = quote(pg_config['user'], safe='')
- encoded_password = quote(pg_config['password'], safe='')
- encoded_dbname = quote(pg_config['dbname'], safe='')
- target_connection_string = f"postgresql://{encoded_user}:{encoded_password}@{pg_config['host']}:{pg_config['port']}/{encoded_dbname}"
-
- # 记录最终连接字符串
- logger.debug(f"python连接源表的最终连接字符串: {connection_string}")
-
- # 连接源数据库
- source_engine = create_engine(connection_string)
-
- # 连接目标数据库
- target_engine = create_engine(target_connection_string)
-
- # 获取元数据列名,构建查询字段列表
- column_names = [meta.get('name_en') for meta in metadata_list if meta.get('name_en')]
- if not column_names:
- raise ValueError("没有有效的列名")
-
- # 构建查询语句
- select_columns = ", ".join(column_names)
- query = f"SELECT {select_columns} FROM {source_table}"
-
- # 获取记录总数
- with source_engine.connect() as conn:
- count_result = conn.execute(text(f"SELECT COUNT(*) FROM {source_table}"))
- total_count = count_result.scalar()
-
- # 分批抽取数据
- total_records = 0
- offset = 0
-
- # 计算开始时间
- start_time = time.time()
-
- while offset < total_count:
- # 构建带有分页的查询
- paginated_query = f"{query} LIMIT {batch_size} OFFSET {offset}"
-
- # 读取数据批次
- df = pd.read_sql(paginated_query, source_engine)
- batch_count = len(df)
-
- if batch_count == 0:
- break
-
- # 写入目标数据库
- df.to_sql(
- target_table,
- target_engine,
- schema='ods',
- if_exists='append',
- index=False
- )
-
- total_records += batch_count
- offset += batch_size
-
- logger.info(f"已抽取 {total_records}/{total_count} 条记录")
-
- # 计算执行时间
- end_time = time.time()
- execution_time = end_time - start_time
- logger.info(f"作业抽取了 {total_records} 条记录,执行了 {execution_time:.2f} 秒")
-
- return {
- "total_records": total_records,
- "source_table": source_table,
- "target_table": f"ods.{target_table}",
- "execution_time": execution_time
- }
-
- except Exception as e:
- logger.error(f"数据抽取失败: {str(e)}")
- raise
|