from app.core.graph.graph_operations import connect_graph from flask import current_app import os import pandas as pd from datetime import datetime import psycopg2 from psycopg2 import sql import logging from app.services.neo4j_driver import neo4j_driver import shutil import re from psycopg2.extras import execute_values import time from urllib.parse import urlparse, unquote, quote def production_draw_graph(id, type): """ 根据节点ID和类型绘制生产线图谱 Args: id: 节点ID type: 节点类型(DataModel, DataResource, DataMetric) Returns: dict: 包含节点、连线和根节点ID的图谱数据 """ # 获取Neo4j连接 driver = connect_graph() if not driver: logger.error("无法连接到数据库") return {"nodes": [], "lines": [], "rootId": "", "error": "无法连接到数据库"} try: # 首先验证节点是否存在 with driver.session() as session: check_node_query = """ MATCH (n) WHERE id(n) = $nodeId RETURN n, labels(n) as labels, n.name_zh as name_zh """ check_result = session.run(check_node_query, nodeId=id).single() if not check_result: logger.error(f"节点不存在: ID={id}") return {"nodes": [], "lines": [], "rootId": "", "error": "节点不存在"} actual_type = check_result["labels"][0] # 获取实际的节点类型 node_name = check_result["name_zh"] # 如果提供的类型与实际类型不匹配,使用实际类型 if type.lower() != actual_type.lower(): logger.warning(f"提供的类型({type})与实际类型({actual_type})不匹配,使用实际类型") type = actual_type # 数据模型 if type.lower() == "DataModel": cql = """ MATCH (n:DataModel) WHERE id(n) = $nodeId OPTIONAL MATCH (n)-[r:connection]-(m:meta_node) OPTIONAL MATCH (n)-[r2:clean_model]-(d:data_standard) OPTIONAL MATCH (d)-[r3:clean_model]-(m) WITH collect({from: toString(id(n)), to: toString(id(m)), text: "包含"}) AS line1, collect({from: toString(id(n)), to: toString(id(d)), text: "清洗"}) AS line2, collect({from: toString(id(d)), to: toString(id(m)), text: "清洗"}) AS line3, collect({id: toString(id(n)), text: n.name_zh, type: "model"}) AS node1, collect({id: toString(id(m)), text: m.name}) AS node2, collect({id: toString(id(d)), text: d.name, type: "standard"}) AS node3,n WITH apoc.coll.toSet(line1 + line2 + line3) AS lines, apoc.coll.toSet(node1 + node2 + node3) AS nodes, toString(id(n)) as res RETURN lines,nodes,res """ # 数据资源 elif type.lower() == "DataResource": cql = """ MATCH (n:DataResource) WHERE id(n) = $nodeId OPTIONAL MATCH (n)-[r:connection]-(m:meta_node) OPTIONAL MATCH (n)-[r2:clean_resource]-(d:data_standard) OPTIONAL MATCH (d)-[r3:clean_resource]-(m) WITH collect({from: toString(id(n)), to: toString(id(m)), text: "包含"}) AS lines1, collect({from: toString(id(n)), to: toString(id(d)), text: "清洗"}) AS lines2, collect({from: toString(id(d)), to: toString(id(m)), text: "清洗"}) AS lines3, collect({id: toString(id(n)), text: n.name_zh, type: "resource"}) AS nodes1, collect({id: toString(id(m)), text: m.name}) AS nodes2, collect({id: toString(id(d)), text: d.name, type: "standard"}) AS nodes3,n WITH apoc.coll.toSet(lines1 + lines2 + lines3) AS lines, apoc.coll.toSet(nodes1 + nodes2 + nodes3) AS nodes, toString(id(n)) AS res RETURN lines, nodes, res """ # 数据指标 elif type.lower() == "DataMetric": cql = """ MATCH (n:DataMetric) WHERE id(n) = $nodeId OPTIONAL MATCH (n)-[r:connection]-(m:meta_node) WITH collect({from: toString(id(n)), to: toString(id(m)), text: "处理"}) AS line1, collect({id: toString(id(n)), text: n.name_zh, type: "metric"}) AS node1, collect({id: toString(id(m)), text: m.name}) AS node2,n WITH apoc.coll.toSet(line1) AS lines, apoc.coll.toSet(node1 + node2) AS nodes, toString(id(n)) as res RETURN lines,nodes,res """ else: # 处理未知节点类型 cql = """ MATCH (n) WHERE id(n) = $nodeId OPTIONAL MATCH (n)-[r]-(m) WITH collect({from: toString(id(n)), to: toString(id(m)), text: type(r)}) AS lines, collect({id: toString(id(n)), text: n.name_zh, type: labels(n)[0]}) AS nodes1, collect({id: toString(id(m)), text: m.name, type: labels(m)[0]}) AS nodes2, toString(id(n)) as res RETURN apoc.coll.toSet(lines) AS lines, apoc.coll.toSet(nodes1 + nodes2) AS nodes, res """ with driver.session() as session: try: result = session.run(cql, nodeId=id) data = result.data() # 如果没有数据,返回节点自身 if not data: return { "nodes": [{"id": str(id), "text": node_name, "type": type}], "lines": [], "rootId": str(id) } res = {} for item in data: res = { "nodes": [record for record in item['nodes'] if record.get('id')], "lines": [record for record in item['lines'] if record.get('from') and record.get('to')], "rootId": item['res'], } # 确保节点列表不为空 if not res.get("nodes"): res["nodes"] = [{"id": str(id), "text": node_name, "type": type}] return res except Exception as e: logger.error(f"执行图谱查询失败: {str(e)}") return { "nodes": [{"id": str(id), "text": node_name, "type": type}], "lines": [], "rootId": str(id), "error": f"查询执行失败: {str(e)}" } except Exception as e: logger.error(f"生成图谱失败: {str(e)}") return {"nodes": [], "lines": [], "rootId": "", "error": str(e)} """ Manual execution functions for production line Author: paul Date: 2024-03-20 """ # 配置日志 logger = logging.getLogger(__name__) # PostgreSQL配置 def get_pg_config(): """从配置文件获取PostgreSQL配置,支持包含特殊字符的密码""" db_uri = current_app.config['SQLALCHEMY_DATABASE_URI'] # 尝试使用urlparse解析 uri = urlparse(db_uri) # 如果解析失败或密码包含特殊字符导致解析错误,使用手动解析 if uri.username is None or uri.password is None: # 手动解析URI: postgresql://username:password@host:port/database scheme_end = db_uri.find('://') if scheme_end == -1: raise ValueError("Invalid database URI format") auth_and_host = db_uri[scheme_end + 3:] # 跳过 '://' at_pos = auth_and_host.rfind('@') # 从右向左查找最后一个@ if at_pos == -1: raise ValueError("Invalid database URI: missing @ separator") auth_part = auth_and_host[:at_pos] host_part = auth_and_host[at_pos + 1:] # 解析用户名和密码(可能包含特殊字符) colon_pos = auth_part.find(':') if colon_pos == -1: username = unquote(auth_part) password = None else: username = unquote(auth_part[:colon_pos]) password = unquote(auth_part[colon_pos + 1:]) # 解析主机、端口和数据库 slash_pos = host_part.find('/') if slash_pos == -1: raise ValueError("Invalid database URI: missing database name") host_port = host_part[:slash_pos] database = unquote(host_part[slash_pos + 1:]) # 解析主机和端口 colon_pos = host_port.find(':') if colon_pos == -1: hostname = host_port port = 5432 else: hostname = host_port[:colon_pos] port = int(host_port[colon_pos + 1:]) else: # urlparse解析成功,解码可能被URL编码的字段 username = unquote(uri.username) if uri.username else None password = unquote(uri.password) if uri.password else None database = unquote(uri.path[1:]) if uri.path and len(uri.path) > 1 else None hostname = uri.hostname port = uri.port or 5432 if not all([username, password, database, hostname]): raise ValueError("Missing required database connection parameters") return { 'dbname': database, 'user': username, 'password': password, 'host': hostname, 'port': str(port) } def get_resource_storage_info(resource_id): """ 获取数据资源的存储位置和元数据信息 Returns: tuple: (storage_location, name_zh, name_en, metadata_list) - storage_location: 存储位置 - name_zh: 资源中文名(用于查找Excel文件) - name_en: 资源英文名(用于数据库表名) - metadata_list: 元数据列表 """ try: with neo4j_driver.get_session() as session: # 获取资源基本信息 resource_query = """ MATCH (n:DataResource) WHERE id(n) = $resource_id RETURN n.storage_location as storage_location, n.name_zh as name_zh, n.name_en as name_en """ result = session.run(resource_query, resource_id=int(resource_id)) resource_data = result.single() if not resource_data: raise ValueError(f"找不到ID为{resource_id}的数据资源") if not resource_data['storage_location']: raise ValueError("存储位置未配置") # 查询元数据节点 metadata_query = """ MATCH (n:DataResource)-[:INCLUDES]->(m:DataMeta) WHERE id(n) = $resource_id RETURN m.name_zh as name, m.name_en as name_en, m.data_type as data_type """ result = session.run(metadata_query, resource_id=int(resource_id)) metadata_list = [dict(record) for record in result] # 检查元数据列表是否为空 if not metadata_list: logger.warning(f"数据资源 {resource_id} 没有元数据节点,将尝试从Excel文件推断元数据") # 检查英文名是否存在 if not resource_data['name_en']: raise ValueError("数据资源的英文名不能为空") return ( resource_data['storage_location'], resource_data['name_zh'], resource_data['name_en'], metadata_list ) except Exception as e: logger.error(f"获取资源存储信息失败: {str(e)}") raise def check_and_create_table(table_name, metadata_list): """ 检查并创建PostgreSQL表 Args: table_name: 表名 metadata_list: 元数据列表 """ try: conn = psycopg2.connect(**get_pg_config()) cur = conn.cursor() # 检查schema是否存在 cur.execute("CREATE SCHEMA IF NOT EXISTS ods;") # 检查表是否存在 cur.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'ods' AND table_name = %s ); """, (table_name,)) table_exists = cur.fetchone()[0] if not table_exists: # 如果元数据列表为空,无法创建表 if not metadata_list: logger.warning(f"元数据列表为空,无法创建表。将在加载数据时自动创建") return # 打印元数据列表用于调试 logger.info(f"元数据列表: {metadata_list}") # 构建建表SQL columns = [ f"{meta['name_en']} {meta['data_type']}" for meta in metadata_list if 'name_en' in meta and meta['name_en'] and 'data_type' in meta and meta['data_type'] ] if not columns: logger.warning("没有有效的列定义,无法创建表") return sql = f""" CREATE TABLE ods.{table_name} ( id SERIAL PRIMARY KEY, {", ".join(columns)}, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ logger.info(f"创建表SQL: {sql}") cur.execute(sql) conn.commit() logger.info(f"表 ods.{table_name} 创建成功") else: logger.info(f"表 ods.{table_name} 已存在") # 检查是否存在insert_dt列,如果存在,移除它(因为我们只使用created_at) cur.execute(f""" SELECT EXISTS ( SELECT FROM information_schema.columns WHERE table_schema = 'ods' AND table_name = '{table_name}' AND column_name = 'insert_dt' ); """) insert_dt_exists = cur.fetchone()[0] # 如果insert_dt列存在,记录警告但不进行删除(删除列可能导致数据丢失) if insert_dt_exists: logger.warning(f"表 ods.{table_name} 存在冗余的insert_dt列,请考虑后续手动删除") # 检查是否存在created_at列,如果不存在,添加它 cur.execute(f""" SELECT EXISTS ( SELECT FROM information_schema.columns WHERE table_schema = 'ods' AND table_name = '{table_name}' AND column_name = 'created_at' ); """) created_at_exists = cur.fetchone()[0] # 如果created_at列不存在,添加它 if not created_at_exists: alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;" logger.info(f"添加created_at列: {alter_sql}") cur.execute(alter_sql) conn.commit() # 检查是否需要添加新列 if metadata_list: # 获取现有列 cur.execute(f""" SELECT column_name FROM information_schema.columns WHERE table_schema = 'ods' AND table_name = '{table_name}' """) existing_columns = [row[0] for row in cur.fetchall()] # 检查每个元数据是否需要作为新列添加 for meta in metadata_list: if 'name_en' in meta and meta['name_en'] and meta['name_en'].lower() not in (col.lower() for col in existing_columns): column_type = meta.get('data_type', 'VARCHAR(255)') alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN {meta['name_en']} {column_type};" logger.info(f"添加新列: {alter_sql}") try: cur.execute(alter_sql) conn.commit() except Exception as e: logger.error(f"添加列失败: {str(e)}") except Exception as e: logger.error(f"创建表失败: {str(e)}") conn.rollback() raise finally: if cur: cur.close() if conn: conn.close() def load_excel_to_postgresql(file_path, table_name, metadata_list): """ 加载Excel数据到PostgreSQL表 Args: file_path: Excel文件路径 table_name: 表名 metadata_list: 元数据列表 Returns: int: 加载的记录数 """ conn = None cur = None try: # 读取Excel数据 df = pd.read_excel(file_path) # 如果Excel文件为空,返回0 if df.empty: logger.warning(f"Excel文件 {file_path} 为空") return 0 # 如果元数据列表为空,尝试自动创建表 if not metadata_list: logger.warning("元数据列表为空,尝试根据Excel文件自动创建表") # 创建数据库连接 conn = psycopg2.connect(**get_pg_config()) cur = conn.cursor() # 检查schema是否存在 cur.execute("CREATE SCHEMA IF NOT EXISTS ods;") # 检查表是否存在 cur.execute(f""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'ods' AND table_name = '{table_name}' ); """) table_exists = cur.fetchone()[0] # 如果表不存在,根据DataFrame自动创建 if not table_exists: # 生成列定义 columns = [] for col_name in df.columns: # 生成有效的SQL列名 sql_col_name = re.sub(r'\W+', '_', col_name).lower() # 根据数据类型推断SQL类型 dtype = df[col_name].dtype if pd.api.types.is_integer_dtype(dtype): sql_type = 'INTEGER' elif pd.api.types.is_float_dtype(dtype): sql_type = 'NUMERIC(15,2)' elif pd.api.types.is_datetime64_dtype(dtype): sql_type = 'TIMESTAMP' elif pd.api.types.is_bool_dtype(dtype): sql_type = 'BOOLEAN' else: sql_type = 'VARCHAR(255)' columns.append(f"{sql_col_name} {sql_type}") # 创建表,只包含created_at时间戳字段 create_sql = f""" CREATE TABLE ods.{table_name} ( id SERIAL PRIMARY KEY, {', '.join(columns)}, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """ logger.info(f"自动生成的建表SQL: {create_sql}") cur.execute(create_sql) conn.commit() logger.info(f"表 ods.{table_name} 自动创建成功") else: # 检查是否存在created_at列 cur.execute(f""" SELECT EXISTS ( SELECT FROM information_schema.columns WHERE table_schema = 'ods' AND table_name = '{table_name}' AND column_name = 'created_at' ); """) created_at_exists = cur.fetchone()[0] # 如果created_at列不存在,添加它 if not created_at_exists: alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;" logger.info(f"添加created_at列: {alter_sql}") cur.execute(alter_sql) conn.commit() cur.close() conn.close() cur = None conn = None # 创建临时元数据列表用于插入数据 metadata_list = [] for col_name in df.columns: sql_col_name = re.sub(r'\W+', '_', col_name).lower() metadata_list.append({ 'name_zh': col_name, 'name_en': sql_col_name }) # 创建数据库连接 conn = psycopg2.connect(**get_pg_config()) cur = conn.cursor() # 准备插入数据 records = [] for _, row in df.iterrows(): record = {} for meta in metadata_list: if 'name_zh' in meta and meta['name_zh'] in df.columns and 'name_en' in meta: # 获取Excel中的值 value = row[meta['name_zh']] # 处理NaN和None值 if pd.isna(value): value = None record[meta['name_en']] = value records.append(record) # 如果没有有效记录,返回0 if not records: logger.warning("没有有效记录可插入") return 0 # 获取列名列表,只包括元数据列(不再包括insert_dt) columns = [meta['name_en'] for meta in metadata_list if 'name_en' in meta] if not columns: logger.warning("没有有效列名") return 0 # 正确使用execute_values的方式 insert_sql = f""" INSERT INTO ods.{table_name} ({", ".join(columns)}) VALUES %s """ # 准备要插入的数据元组 values = [] for record in records: # 只包含数据列的值,不再需要添加时间戳 row_values = tuple(record.get(col, None) for col in columns) values.append(row_values) # 执行批量插入 execute_values(cur, insert_sql, values) conn.commit() # 返回插入的记录数 return len(values) except Exception as e: logger.error(f"加载Excel数据到PostgreSQL失败: {str(e)}", exc_info=True) if conn: conn.rollback() raise finally: if cur: cur.close() if conn: conn.close() def get_full_storage_path(relative_path): """ 根据相对路径获取完整的存储路径 Args: relative_path: Neo4j中存储的相对路径 Returns: str: 完整的存储路径 """ base_path = current_app.config['UPLOAD_BASE_PATH'] # 移除路径开头的斜杠(如果有) relative_path = relative_path.lstrip('\\/') # 根据操作系统使用正确的路径分隔符 if os.name == 'nt': # Windows full_path = os.path.join(base_path, relative_path.replace('/', '\\')) else: # Linux/Unix full_path = os.path.join(base_path, relative_path.replace('\\', '/')) return os.path.normpath(full_path) def get_archive_path(): """ 获取当前日期的归档路径 Returns: str: 归档路径 """ base_path = current_app.config['ARCHIVE_BASE_PATH'] date_folder = datetime.now().strftime('%Y-%m-%d') archive_path = os.path.join(base_path, date_folder) # 确保归档目录存在 os.makedirs(archive_path, exist_ok=True) return archive_path def archive_excel_file(file_path): """ 将Excel文件复制到归档目录,保持原始文件名 Args: file_path: Excel文件的完整路径 Returns: str: 归档后的文件路径 """ archive_path = get_archive_path() file_name = os.path.basename(file_path) archive_file_path = os.path.join(archive_path, file_name) # 如果文件已经存在于归档目录,替换它 if os.path.exists(archive_file_path): os.remove(archive_file_path) logger.info(f"覆盖已存在的归档文件: {archive_file_path}") # 复制文件到归档目录 shutil.copy2(file_path, archive_file_path) logger.info(f"文件已归档: {archive_file_path}") # 删除原始文件 os.remove(file_path) logger.info(f"删除原始文件: {file_path}") return archive_file_path def execute_production_line(resource_id): """ 执行生产线数据加载 Args: resource_id: 数据资源ID Returns: dict: 执行结果 """ try: # 首先获取资源信息,判断类型 resource_type = get_resource_type(resource_id) # 根据资源类型执行不同的加载逻辑 if resource_type == 'ddl': # DDL类型资源,执行数据库抽取 return execute_ddl_extraction(resource_id) else: # 其他类型(structure等),执行Excel数据加载 return execute_excel_loading(resource_id) except Exception as e: logger.error(f"执行生产线失败: {str(e)}", exc_info=True) return { "status": "error", "message": str(e) } def get_resource_type(resource_id): """ 获取资源类型 Args: resource_id: 数据资源ID Returns: str: 资源类型,如'ddl'或'structure' """ try: with neo4j_driver.get_session() as session: # 查询资源类型 cypher = """ MATCH (n:DataResource) WHERE id(n) = $resource_id RETURN n.type as type """ result = session.run(cypher, resource_id=int(resource_id)) record = result.single() if not record: raise ValueError(f"找不到ID为{resource_id}的数据资源") return record["type"] or 'structure' # 默认为structure类型 except Exception as e: logger.error(f"获取资源类型失败: {str(e)}") raise def execute_excel_loading(resource_id): """ 执行Excel文件数据加载(原有的加载逻辑) Args: resource_id: 数据资源ID Returns: dict: 执行结果 """ try: # 获取PostgreSQL配置 pg_config = get_pg_config() # 1. 获取存储信息 storage_location, name_zh, name_en, metadata_list = get_resource_storage_info(resource_id) # 2. 检查并创建表 check_and_create_table(name_en, metadata_list) # 3. 获取完整的存储路径并扫描Excel文件 full_storage_path = get_full_storage_path(storage_location) if not os.path.exists(full_storage_path): # 如果目录不存在,创建它 try: os.makedirs(full_storage_path, exist_ok=True) logger.info(f"创建目录: {full_storage_path}") except Exception as e: raise ValueError(f"无法创建存储路径: {full_storage_path}, 错误: {str(e)}") # 首先使用中文名查找文件 excel_files = [] if name_zh: excel_files = [ f for f in os.listdir(full_storage_path) if f.startswith(name_zh) and f.endswith(('.xlsx', '.xls')) ] if excel_files: logger.info(f"使用中文名'{name_zh}'找到Excel文件: {excel_files}") # 如果使用中文名没找到文件,尝试使用英文名 if not excel_files and name_en: excel_files = [ f for f in os.listdir(full_storage_path) if f.startswith(name_en) and f.endswith(('.xlsx', '.xls')) ] if excel_files: logger.info(f"使用英文名'{name_en}'找到Excel文件: {excel_files}") # 如果两种方式都没找到文件,报错 if not excel_files: error_msg = ( f"未找到匹配的Excel文件\n" f"搜索路径: {full_storage_path}\n" f"尝试查找的文件名模式:\n" f"1. {name_zh}*.xlsx/xls (中文名)\n" f"2. {name_en}*.xlsx/xls (英文名)\n" f"请确认文件已上传到正确位置,且文件名以资源的中文名或英文名开头" ) logger.error(error_msg) raise ValueError(error_msg) # 4. 加载数据并归档文件 total_records = 0 processed_files = [] archived_files = [] for excel_file in excel_files: file_path = os.path.join(full_storage_path, excel_file) try: # 如果元数据为空,尝试从Excel文件中推断 if not metadata_list: logger.info(f"尝试从Excel文件 {excel_file} 推断元数据") metadata_list = extract_metadata_from_excel(file_path, name_en) if metadata_list: # 重新尝试创建表 check_and_create_table(name_en, metadata_list) else: logger.warning("无法从Excel文件推断元数据,将尝试直接加载数据") # 加载数据到PostgreSQL records = load_excel_to_postgresql(file_path, name_en, metadata_list) total_records += records processed_files.append(excel_file) # 归档成功处理的文件 archived_path = archive_excel_file(file_path) archived_files.append(archived_path) logger.info(f"已处理并归档文件 {excel_file}, 加载 {records} 条记录") except Exception as e: logger.error(f"处理文件 {excel_file} 失败: {str(e)}", exc_info=True) raise return { "status": "success", "message": f"数据加载成功,共处理 {len(processed_files)} 个文件,加载 {total_records} 条记录", "total_records": total_records, "files_processed": processed_files, "files_archived": archived_files } except Exception as e: logger.error(f"执行Excel加载失败: {str(e)}", exc_info=True) return { "status": "error", "message": str(e) } def extract_metadata_from_excel(file_path, table_name): """ 从Excel文件中提取元数据 Args: file_path: Excel文件路径 table_name: 表名(用于翻译列名) Returns: list: 元数据列表 """ try: # 读取Excel文件的第一行作为列名 df = pd.read_excel(file_path, nrows=0) if df.empty: logger.warning(f"Excel文件 {file_path} 为空") return [] # 获取列名 column_names = df.columns.tolist() # 翻译列名 metadata_list = [] for name in column_names: # 使用已有的翻译功能 try: from app.core.meta_data import translate_and_parse from app.core.meta_data import infer_column_type # 翻译列名 name_en = translate_and_parse(name)[0] if name else f"column_{len(metadata_list)}" # 确保列名是合法的SQL标识符 name_en = re.sub(r'\W+', '_', name_en).lower() # 推断数据类型 df_sample = pd.read_excel(file_path, nrows=10) col_index = column_names.index(name) col_types = infer_column_type(df_sample) data_type = col_types[col_index] if col_index < len(col_types) else 'VARCHAR(255)' metadata_list.append({ 'name_zh': name, 'name_en': name_en, 'data_type': data_type }) except Exception as e: logger.error(f"处理列 {name} 时出错: {str(e)}") # 使用默认值 name_en = f"column_{len(metadata_list)}" metadata_list.append({ 'name_zh': name, 'name_en': name_en, 'data_type': 'VARCHAR(255)' }) logger.info(f"从Excel推断出的元数据: {metadata_list}") return metadata_list except Exception as e: logger.error(f"从Excel文件提取元数据失败: {str(e)}") return [] def execute_ddl_extraction(resource_id): """ 执行DDL资源数据抽取 Args: resource_id: 数据资源ID Returns: dict: 执行结果 """ try: from sqlalchemy import create_engine, text import pandas as pd logger.info(f"开始执行DDL资源数据抽取,ID: {resource_id}") # 1. 获取资源详情 resource_data = get_resource_details(resource_id) if not resource_data: return {"status": "error", "message": f"资源不存在,ID: {resource_id}"} # 2. 获取资源元数据 metadata_list = resource_data.get('meta_list', []) if not metadata_list: return {"status": "error", "message": "资源没有元数据信息,无法创建表"} # 3. 获取资源表名 target_table_name = resource_data.get('name_en') if not target_table_name: return {"status": "error", "message": "资源没有英文名称,无法确定目标表名"} # 4. 获取关联的数据源信息 data_source_info = get_resource_data_source(resource_id) if not data_source_info: return {"status": "error", "message": "无法获取关联的数据源信息"} # 5. 在PostgreSQL中创建目标表 create_result = create_target_table(target_table_name, metadata_list) if not create_result["success"]: return {"status": "error", "message": f"创建目标表失败: {create_result['message']}"} # 6. 执行数据抽取 extract_result = extract_data_to_postgres(data_source_info, target_table_name, metadata_list) return { "status": "success", "message": f"数据抽取成功,从{extract_result['source_table']}表抽取到{extract_result['target_table']}表,共处理 {extract_result['total_records']} 条记录,执行了 {extract_result['execution_time']:.2f} 秒", "total_records": extract_result['total_records'], "source_table": extract_result['source_table'], "target_table": extract_result['target_table'], "execution_time": extract_result['execution_time'] } except Exception as e: logger.error(f"DDL数据抽取失败: {str(e)}", exc_info=True) return { "status": "error", "message": str(e) } def get_resource_details(resource_id): """ 获取资源详细信息 Args: resource_id: 数据资源ID Returns: dict: 资源详情 """ from app.core.data_resource.resource import handle_id_resource return handle_id_resource(resource_id) def get_resource_data_source(resource_id): """获取数据资源关联的数据源信息""" try: with neo4j_driver.get_session() as session: # 查询数据资源节点连接的数据源节点 cypher = """ MATCH (n:DataResource)-[:originates_from]->(ds:DataSource) WHERE id(n) = $resource_id RETURN ds """ result = session.run(cypher, resource_id=int(resource_id)) record = result.single() if not record: logger.warning(f"资源ID {resource_id} 没有关联的数据源") return None # 构建数据源连接信息 data_source = dict(record["ds"]) return { "type": data_source.get("type", "").lower(), "host": data_source.get("host"), "port": data_source.get("port"), "database": data_source.get("database"), "username": data_source.get("username"), "password": data_source.get("password") # 如果需要其他参数可以添加 # "param": data_source.get("param") } except Exception as e: logger.error(f"获取数据源信息失败: {str(e)}") return None def create_target_table(table_name, metadata_list): """ 在PostgreSQL中创建目标表 Args: table_name: 表名 metadata_list: 元数据列表 Returns: dict: {"success": bool, "message": str} """ try: import psycopg2 from flask import current_app # 获取PostgreSQL配置 pg_config = get_pg_config() conn = psycopg2.connect(**pg_config) cur = conn.cursor() # 检查schema是否存在 cur.execute("CREATE SCHEMA IF NOT EXISTS ods;") # 检查表是否存在 cur.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'ods' AND table_name = %s ); """, (table_name,)) table_exists = cur.fetchone()[0] if table_exists: logger.info(f"表 ods.{table_name} 已存在,将跳过创建") return {"success": True, "message": f"表 ods.{table_name} 已存在"} # 构建列定义 columns = [] for meta in metadata_list: column_name = meta.get('name_en') data_type = meta.get('data_type') if column_name and data_type: columns.append(f"{column_name} {data_type}") if not columns: return {"success": False, "message": "没有有效的列定义"} # 构建建表SQL sql = f""" CREATE TABLE ods.{table_name} ( id SERIAL PRIMARY KEY, {", ".join(columns)}, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ logger.info(f"创建表SQL: {sql}") cur.execute(sql) conn.commit() logger.info(f"表 ods.{table_name} 创建成功") return {"success": True, "message": f"表 ods.{table_name} 创建成功"} except Exception as e: logger.error(f"创建目标表失败: {str(e)}") if 'conn' in locals() and conn: conn.rollback() return {"success": False, "message": str(e)} finally: if 'cur' in locals() and cur: cur.close() if 'conn' in locals() and conn: conn.close() def extract_data_to_postgres(source_conn_info, target_table, metadata_list): """ 从源数据库抽取数据到PostgreSQL Args: source_conn_info: 源数据库连接信息 target_table: 目标表名 metadata_list: 元数据列表 Returns: dict: 抽取结果 """ try: from sqlalchemy import create_engine, text import pandas as pd from flask import current_app # 源表名称与目标表相同 source_table = target_table # 批处理大小 batch_size = current_app.config.get('DATA_EXTRACT_BATCH_SIZE', 1000) # 源数据库连接字符串构建 db_type = source_conn_info["type"] if db_type == "mysql": # 对用户名、密码和数据库名进行URL编码,处理特殊字符 encoded_username = quote(source_conn_info['username'], safe='') encoded_password = quote(source_conn_info['password'], safe='') encoded_database = quote(source_conn_info['database'], safe='') connection_string = f"mysql+pymysql://{encoded_username}:{encoded_password}@{source_conn_info['host']}:{source_conn_info['port']}/{encoded_database}" # 检查是否存在param参数,如存在则添加到连接字符串中 if 'param' in source_conn_info and source_conn_info['param']: # 确保param参数以&开头 param = source_conn_info['param'] if not param.startswith('&'): param = '&' + param connection_string = f"{connection_string}?{param[1:]}" logger.debug(f"添加了数据源的param参数: {param}") elif db_type == "postgresql": # 对用户名、密码和数据库名进行URL编码,处理特殊字符 encoded_username = quote(source_conn_info['username'], safe='') encoded_password = quote(source_conn_info['password'], safe='') encoded_database = quote(source_conn_info['database'], safe='') connection_string = f"postgresql://{encoded_username}:{encoded_password}@{source_conn_info['host']}:{source_conn_info['port']}/{encoded_database}" else: raise ValueError(f"不支持的数据库类型: {db_type}") # 目标数据库连接参数 pg_config = get_pg_config() # 对用户名、密码和数据库名进行URL编码,处理特殊字符 encoded_user = quote(pg_config['user'], safe='') encoded_password = quote(pg_config['password'], safe='') encoded_dbname = quote(pg_config['dbname'], safe='') target_connection_string = f"postgresql://{encoded_user}:{encoded_password}@{pg_config['host']}:{pg_config['port']}/{encoded_dbname}" # 记录最终连接字符串 logger.debug(f"python连接源表的最终连接字符串: {connection_string}") # 连接源数据库 source_engine = create_engine(connection_string) # 连接目标数据库 target_engine = create_engine(target_connection_string) # 获取元数据列名,构建查询字段列表 column_names = [meta.get('name_en') for meta in metadata_list if meta.get('name_en')] if not column_names: raise ValueError("没有有效的列名") # 构建查询语句 select_columns = ", ".join(column_names) query = f"SELECT {select_columns} FROM {source_table}" # 获取记录总数 with source_engine.connect() as conn: count_result = conn.execute(text(f"SELECT COUNT(*) FROM {source_table}")) total_count = count_result.scalar() # 分批抽取数据 total_records = 0 offset = 0 # 计算开始时间 start_time = time.time() while offset < total_count: # 构建带有分页的查询 paginated_query = f"{query} LIMIT {batch_size} OFFSET {offset}" # 读取数据批次 df = pd.read_sql(paginated_query, source_engine) batch_count = len(df) if batch_count == 0: break # 写入目标数据库 df.to_sql( target_table, target_engine, schema='ods', if_exists='append', index=False ) total_records += batch_count offset += batch_size logger.info(f"已抽取 {total_records}/{total_count} 条记录") # 计算执行时间 end_time = time.time() execution_time = end_time - start_time logger.info(f"作业抽取了 {total_records} 条记录,执行了 {execution_time:.2f} 秒") return { "total_records": total_records, "source_table": source_table, "target_table": f"ods.{target_table}", "execution_time": execution_time } except Exception as e: logger.error(f"数据抽取失败: {str(e)}") raise