|
|
@@ -0,0 +1,1201 @@
|
|
|
+from app.core.graph.graph_operations import connect_graph
|
|
|
+from flask import current_app
|
|
|
+import os
|
|
|
+import pandas as pd
|
|
|
+from datetime import datetime
|
|
|
+import psycopg2
|
|
|
+from psycopg2 import sql
|
|
|
+import logging
|
|
|
+from app.services.neo4j_driver import neo4j_driver
|
|
|
+import shutil
|
|
|
+import re
|
|
|
+from psycopg2.extras import execute_values
|
|
|
+import time
|
|
|
+from urllib.parse import urlparse, unquote, quote
|
|
|
+
|
|
|
+def production_draw_graph(id, type):
|
|
|
+ """
|
|
|
+ 根据节点ID和类型绘制生产线图谱
|
|
|
+
|
|
|
+ Args:
|
|
|
+ id: 节点ID
|
|
|
+ type: 节点类型(DataModel, DataResource, DataMetric)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ dict: 包含节点、连线和根节点ID的图谱数据
|
|
|
+ """
|
|
|
+ # 获取Neo4j连接
|
|
|
+ driver = connect_graph()
|
|
|
+ if not driver:
|
|
|
+ logger.error("无法连接到数据库")
|
|
|
+ return {"nodes": [], "lines": [], "rootId": "", "error": "无法连接到数据库"}
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 首先验证节点是否存在
|
|
|
+ with driver.session() as session:
|
|
|
+ check_node_query = """
|
|
|
+ MATCH (n)
|
|
|
+ WHERE id(n) = $nodeId
|
|
|
+ RETURN n, labels(n) as labels, n.name_zh as name_zh
|
|
|
+ """
|
|
|
+ check_result = session.run(check_node_query, nodeId=id).single()
|
|
|
+
|
|
|
+ if not check_result:
|
|
|
+ logger.error(f"节点不存在: ID={id}")
|
|
|
+ return {"nodes": [], "lines": [], "rootId": "", "error": "节点不存在"}
|
|
|
+
|
|
|
+ actual_type = check_result["labels"][0] # 获取实际的节点类型
|
|
|
+ node_name = check_result["name_zh"]
|
|
|
+
|
|
|
+ # 如果提供的类型与实际类型不匹配,使用实际类型
|
|
|
+ if type.lower() != actual_type.lower():
|
|
|
+ logger.warning(f"提供的类型({type})与实际类型({actual_type})不匹配,使用实际类型")
|
|
|
+ type = actual_type
|
|
|
+
|
|
|
+ # 数据模型
|
|
|
+ if type.lower() == "DataModel":
|
|
|
+ cql = """
|
|
|
+ MATCH (n:DataModel)
|
|
|
+ WHERE id(n) = $nodeId
|
|
|
+ OPTIONAL MATCH (n)-[r:connection]-(m:meta_node)
|
|
|
+ OPTIONAL MATCH (n)-[r2:clean_model]-(d:data_standard)
|
|
|
+ OPTIONAL MATCH (d)-[r3:clean_model]-(m)
|
|
|
+ WITH
|
|
|
+ collect({from: toString(id(n)), to: toString(id(m)), text: "包含"}) AS line1,
|
|
|
+ collect({from: toString(id(n)), to: toString(id(d)), text: "清洗"}) AS line2,
|
|
|
+ collect({from: toString(id(d)), to: toString(id(m)), text: "清洗"}) AS line3,
|
|
|
+ collect({id: toString(id(n)), text: n.name_zh, type: "model"}) AS node1,
|
|
|
+ collect({id: toString(id(m)), text: m.name}) AS node2,
|
|
|
+ collect({id: toString(id(d)), text: d.name, type: "standard"}) AS node3,n
|
|
|
+ WITH apoc.coll.toSet(line1 + line2 + line3) AS lines,
|
|
|
+ apoc.coll.toSet(node1 + node2 + node3) AS nodes,
|
|
|
+ toString(id(n)) as res
|
|
|
+ RETURN lines,nodes,res
|
|
|
+ """
|
|
|
+ # 数据资源
|
|
|
+ elif type.lower() == "DataResource":
|
|
|
+ cql = """
|
|
|
+ MATCH (n:DataResource)
|
|
|
+ WHERE id(n) = $nodeId
|
|
|
+ OPTIONAL MATCH (n)-[r:connection]-(m:meta_node)
|
|
|
+ OPTIONAL MATCH (n)-[r2:clean_resource]-(d:data_standard)
|
|
|
+ OPTIONAL MATCH (d)-[r3:clean_resource]-(m)
|
|
|
+ WITH
|
|
|
+ collect({from: toString(id(n)), to: toString(id(m)), text: "包含"}) AS lines1,
|
|
|
+ collect({from: toString(id(n)), to: toString(id(d)), text: "清洗"}) AS lines2,
|
|
|
+ collect({from: toString(id(d)), to: toString(id(m)), text: "清洗"}) AS lines3,
|
|
|
+ collect({id: toString(id(n)), text: n.name_zh, type: "resource"}) AS nodes1,
|
|
|
+ collect({id: toString(id(m)), text: m.name}) AS nodes2,
|
|
|
+ collect({id: toString(id(d)), text: d.name, type: "standard"}) AS nodes3,n
|
|
|
+ WITH
|
|
|
+ apoc.coll.toSet(lines1 + lines2 + lines3) AS lines,
|
|
|
+ apoc.coll.toSet(nodes1 + nodes2 + nodes3) AS nodes,
|
|
|
+ toString(id(n)) AS res
|
|
|
+ RETURN lines, nodes, res
|
|
|
+ """
|
|
|
+ # 数据指标
|
|
|
+ elif type.lower() == "DataMetric":
|
|
|
+ cql = """
|
|
|
+ MATCH (n:DataMetric)
|
|
|
+ WHERE id(n) = $nodeId
|
|
|
+ OPTIONAL MATCH (n)-[r:connection]-(m:meta_node)
|
|
|
+ WITH collect({from: toString(id(n)), to: toString(id(m)), text: "处理"}) AS line1,
|
|
|
+ collect({id: toString(id(n)), text: n.name_zh, type: "metric"}) AS node1,
|
|
|
+ collect({id: toString(id(m)), text: m.name}) AS node2,n
|
|
|
+ WITH apoc.coll.toSet(line1) AS lines,
|
|
|
+ apoc.coll.toSet(node1 + node2) AS nodes,
|
|
|
+ toString(id(n)) as res
|
|
|
+ RETURN lines,nodes,res
|
|
|
+ """
|
|
|
+ else:
|
|
|
+ # 处理未知节点类型
|
|
|
+ cql = """
|
|
|
+ MATCH (n)
|
|
|
+ WHERE id(n) = $nodeId
|
|
|
+ OPTIONAL MATCH (n)-[r]-(m)
|
|
|
+ WITH collect({from: toString(id(n)), to: toString(id(m)), text: type(r)}) AS lines,
|
|
|
+ collect({id: toString(id(n)), text: n.name_zh, type: labels(n)[0]}) AS nodes1,
|
|
|
+ collect({id: toString(id(m)), text: m.name, type: labels(m)[0]}) AS nodes2,
|
|
|
+ toString(id(n)) as res
|
|
|
+ RETURN apoc.coll.toSet(lines) AS lines,
|
|
|
+ apoc.coll.toSet(nodes1 + nodes2) AS nodes,
|
|
|
+ res
|
|
|
+ """
|
|
|
+
|
|
|
+ with driver.session() as session:
|
|
|
+ try:
|
|
|
+ result = session.run(cql, nodeId=id)
|
|
|
+ data = result.data()
|
|
|
+
|
|
|
+ # 如果没有数据,返回节点自身
|
|
|
+ if not data:
|
|
|
+ return {
|
|
|
+ "nodes": [{"id": str(id), "text": node_name, "type": type}],
|
|
|
+ "lines": [],
|
|
|
+ "rootId": str(id)
|
|
|
+ }
|
|
|
+
|
|
|
+ res = {}
|
|
|
+ for item in data:
|
|
|
+ res = {
|
|
|
+ "nodes": [record for record in item['nodes'] if record.get('id')],
|
|
|
+ "lines": [record for record in item['lines'] if record.get('from') and record.get('to')],
|
|
|
+ "rootId": item['res'],
|
|
|
+ }
|
|
|
+
|
|
|
+ # 确保节点列表不为空
|
|
|
+ if not res.get("nodes"):
|
|
|
+ res["nodes"] = [{"id": str(id), "text": node_name, "type": type}]
|
|
|
+
|
|
|
+ return res
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"执行图谱查询失败: {str(e)}")
|
|
|
+ return {
|
|
|
+ "nodes": [{"id": str(id), "text": node_name, "type": type}],
|
|
|
+ "lines": [],
|
|
|
+ "rootId": str(id),
|
|
|
+ "error": f"查询执行失败: {str(e)}"
|
|
|
+ }
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"生成图谱失败: {str(e)}")
|
|
|
+ return {"nodes": [], "lines": [], "rootId": "", "error": str(e)}
|
|
|
+
|
|
|
+"""
|
|
|
+Manual execution functions for production line
|
|
|
+Author: paul
|
|
|
+Date: 2024-03-20
|
|
|
+"""
|
|
|
+
|
|
|
+# 配置日志
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+# PostgreSQL配置
|
|
|
+def get_pg_config():
|
|
|
+ """从配置文件获取PostgreSQL配置,支持包含特殊字符的密码"""
|
|
|
+ db_uri = current_app.config['SQLALCHEMY_DATABASE_URI']
|
|
|
+
|
|
|
+ # 尝试使用urlparse解析
|
|
|
+ uri = urlparse(db_uri)
|
|
|
+
|
|
|
+ # 如果解析失败或密码包含特殊字符导致解析错误,使用手动解析
|
|
|
+ if uri.username is None or uri.password is None:
|
|
|
+ # 手动解析URI: postgresql://username:password@host:port/database
|
|
|
+ scheme_end = db_uri.find('://')
|
|
|
+ if scheme_end == -1:
|
|
|
+ raise ValueError("Invalid database URI format")
|
|
|
+
|
|
|
+ auth_and_host = db_uri[scheme_end + 3:] # 跳过 '://'
|
|
|
+ at_pos = auth_and_host.rfind('@') # 从右向左查找最后一个@
|
|
|
+
|
|
|
+ if at_pos == -1:
|
|
|
+ raise ValueError("Invalid database URI: missing @ separator")
|
|
|
+
|
|
|
+ auth_part = auth_and_host[:at_pos]
|
|
|
+ host_part = auth_and_host[at_pos + 1:]
|
|
|
+
|
|
|
+ # 解析用户名和密码(可能包含特殊字符)
|
|
|
+ colon_pos = auth_part.find(':')
|
|
|
+ if colon_pos == -1:
|
|
|
+ username = unquote(auth_part)
|
|
|
+ password = None
|
|
|
+ else:
|
|
|
+ username = unquote(auth_part[:colon_pos])
|
|
|
+ password = unquote(auth_part[colon_pos + 1:])
|
|
|
+
|
|
|
+ # 解析主机、端口和数据库
|
|
|
+ slash_pos = host_part.find('/')
|
|
|
+ if slash_pos == -1:
|
|
|
+ raise ValueError("Invalid database URI: missing database name")
|
|
|
+
|
|
|
+ host_port = host_part[:slash_pos]
|
|
|
+ database = unquote(host_part[slash_pos + 1:])
|
|
|
+
|
|
|
+ # 解析主机和端口
|
|
|
+ colon_pos = host_port.find(':')
|
|
|
+ if colon_pos == -1:
|
|
|
+ hostname = host_port
|
|
|
+ port = 5432
|
|
|
+ else:
|
|
|
+ hostname = host_port[:colon_pos]
|
|
|
+ port = int(host_port[colon_pos + 1:])
|
|
|
+ else:
|
|
|
+ # urlparse解析成功,解码可能被URL编码的字段
|
|
|
+ username = unquote(uri.username) if uri.username else None
|
|
|
+ password = unquote(uri.password) if uri.password else None
|
|
|
+ database = unquote(uri.path[1:]) if uri.path and len(uri.path) > 1 else None
|
|
|
+ hostname = uri.hostname
|
|
|
+ port = uri.port or 5432
|
|
|
+
|
|
|
+ if not all([username, password, database, hostname]):
|
|
|
+ raise ValueError("Missing required database connection parameters")
|
|
|
+
|
|
|
+ return {
|
|
|
+ 'dbname': database,
|
|
|
+ 'user': username,
|
|
|
+ 'password': password,
|
|
|
+ 'host': hostname,
|
|
|
+ 'port': str(port)
|
|
|
+ }
|
|
|
+
|
|
|
+def get_resource_storage_info(resource_id):
|
|
|
+ """
|
|
|
+ 获取数据资源的存储位置和元数据信息
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ tuple: (storage_location, name_zh, name_en, metadata_list)
|
|
|
+ - storage_location: 存储位置
|
|
|
+ - name_zh: 资源中文名(用于查找Excel文件)
|
|
|
+ - name_en: 资源英文名(用于数据库表名)
|
|
|
+ - metadata_list: 元数据列表
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ with neo4j_driver.get_session() as session:
|
|
|
+ # 获取资源基本信息
|
|
|
+ resource_query = """
|
|
|
+ MATCH (n:DataResource)
|
|
|
+ WHERE id(n) = $resource_id
|
|
|
+ RETURN n.storage_location as storage_location,
|
|
|
+ n.name_zh as name_zh,
|
|
|
+ n.name_en as name_en
|
|
|
+ """
|
|
|
+ result = session.run(resource_query, resource_id=int(resource_id))
|
|
|
+ resource_data = result.single()
|
|
|
+
|
|
|
+ if not resource_data:
|
|
|
+ raise ValueError(f"找不到ID为{resource_id}的数据资源")
|
|
|
+
|
|
|
+ if not resource_data['storage_location']:
|
|
|
+ raise ValueError("存储位置未配置")
|
|
|
+
|
|
|
+ # 查询元数据节点
|
|
|
+ metadata_query = """
|
|
|
+ MATCH (n:DataResource)-[:INCLUDES]->(m:DataMeta)
|
|
|
+ WHERE id(n) = $resource_id
|
|
|
+ RETURN m.name_zh as name, m.name_en as name_en, m.data_type as data_type
|
|
|
+ """
|
|
|
+ result = session.run(metadata_query, resource_id=int(resource_id))
|
|
|
+ metadata_list = [dict(record) for record in result]
|
|
|
+
|
|
|
+ # 检查元数据列表是否为空
|
|
|
+ if not metadata_list:
|
|
|
+ logger.warning(f"数据资源 {resource_id} 没有元数据节点,将尝试从Excel文件推断元数据")
|
|
|
+
|
|
|
+ # 检查英文名是否存在
|
|
|
+ if not resource_data['name_en']:
|
|
|
+ raise ValueError("数据资源的英文名不能为空")
|
|
|
+
|
|
|
+ return (
|
|
|
+ resource_data['storage_location'],
|
|
|
+ resource_data['name_zh'],
|
|
|
+ resource_data['name_en'],
|
|
|
+ metadata_list
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"获取资源存储信息失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+def check_and_create_table(table_name, metadata_list):
|
|
|
+ """
|
|
|
+ 检查并创建PostgreSQL表
|
|
|
+
|
|
|
+ Args:
|
|
|
+ table_name: 表名
|
|
|
+ metadata_list: 元数据列表
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ conn = psycopg2.connect(**get_pg_config())
|
|
|
+ cur = conn.cursor()
|
|
|
+
|
|
|
+ # 检查schema是否存在
|
|
|
+ cur.execute("CREATE SCHEMA IF NOT EXISTS ods;")
|
|
|
+
|
|
|
+ # 检查表是否存在
|
|
|
+ cur.execute("""
|
|
|
+ SELECT EXISTS (
|
|
|
+ SELECT FROM information_schema.tables
|
|
|
+ WHERE table_schema = 'ods'
|
|
|
+ AND table_name = %s
|
|
|
+ );
|
|
|
+ """, (table_name,))
|
|
|
+
|
|
|
+ table_exists = cur.fetchone()[0]
|
|
|
+
|
|
|
+ if not table_exists:
|
|
|
+ # 如果元数据列表为空,无法创建表
|
|
|
+ if not metadata_list:
|
|
|
+ logger.warning(f"元数据列表为空,无法创建表。将在加载数据时自动创建")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 打印元数据列表用于调试
|
|
|
+ logger.info(f"元数据列表: {metadata_list}")
|
|
|
+
|
|
|
+ # 构建建表SQL
|
|
|
+ columns = [
|
|
|
+ f"{meta['name_en']} {meta['data_type']}"
|
|
|
+ for meta in metadata_list
|
|
|
+ if 'name_en' in meta and meta['name_en'] and 'data_type' in meta and meta['data_type']
|
|
|
+ ]
|
|
|
+
|
|
|
+ if not columns:
|
|
|
+ logger.warning("没有有效的列定义,无法创建表")
|
|
|
+ return
|
|
|
+
|
|
|
+ sql = f"""
|
|
|
+ CREATE TABLE ods.{table_name} (
|
|
|
+ id SERIAL PRIMARY KEY,
|
|
|
+ {", ".join(columns)},
|
|
|
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
+ )
|
|
|
+ """
|
|
|
+
|
|
|
+ logger.info(f"创建表SQL: {sql}")
|
|
|
+ cur.execute(sql)
|
|
|
+ conn.commit()
|
|
|
+ logger.info(f"表 ods.{table_name} 创建成功")
|
|
|
+ else:
|
|
|
+ logger.info(f"表 ods.{table_name} 已存在")
|
|
|
+
|
|
|
+ # 检查是否存在insert_dt列,如果存在,移除它(因为我们只使用created_at)
|
|
|
+ cur.execute(f"""
|
|
|
+ SELECT EXISTS (
|
|
|
+ SELECT FROM information_schema.columns
|
|
|
+ WHERE table_schema = 'ods'
|
|
|
+ AND table_name = '{table_name}'
|
|
|
+ AND column_name = 'insert_dt'
|
|
|
+ );
|
|
|
+ """)
|
|
|
+ insert_dt_exists = cur.fetchone()[0]
|
|
|
+
|
|
|
+ # 如果insert_dt列存在,记录警告但不进行删除(删除列可能导致数据丢失)
|
|
|
+ if insert_dt_exists:
|
|
|
+ logger.warning(f"表 ods.{table_name} 存在冗余的insert_dt列,请考虑后续手动删除")
|
|
|
+
|
|
|
+ # 检查是否存在created_at列,如果不存在,添加它
|
|
|
+ cur.execute(f"""
|
|
|
+ SELECT EXISTS (
|
|
|
+ SELECT FROM information_schema.columns
|
|
|
+ WHERE table_schema = 'ods'
|
|
|
+ AND table_name = '{table_name}'
|
|
|
+ AND column_name = 'created_at'
|
|
|
+ );
|
|
|
+ """)
|
|
|
+ created_at_exists = cur.fetchone()[0]
|
|
|
+
|
|
|
+ # 如果created_at列不存在,添加它
|
|
|
+ if not created_at_exists:
|
|
|
+ alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;"
|
|
|
+ logger.info(f"添加created_at列: {alter_sql}")
|
|
|
+ cur.execute(alter_sql)
|
|
|
+ conn.commit()
|
|
|
+
|
|
|
+ # 检查是否需要添加新列
|
|
|
+ if metadata_list:
|
|
|
+ # 获取现有列
|
|
|
+ cur.execute(f"""
|
|
|
+ SELECT column_name
|
|
|
+ FROM information_schema.columns
|
|
|
+ WHERE table_schema = 'ods'
|
|
|
+ AND table_name = '{table_name}'
|
|
|
+ """)
|
|
|
+ existing_columns = [row[0] for row in cur.fetchall()]
|
|
|
+
|
|
|
+ # 检查每个元数据是否需要作为新列添加
|
|
|
+ for meta in metadata_list:
|
|
|
+ if 'name_en' in meta and meta['name_en'] and meta['name_en'].lower() not in (col.lower() for col in existing_columns):
|
|
|
+ column_type = meta.get('data_type', 'VARCHAR(255)')
|
|
|
+ alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN {meta['name_en']} {column_type};"
|
|
|
+ logger.info(f"添加新列: {alter_sql}")
|
|
|
+ try:
|
|
|
+ cur.execute(alter_sql)
|
|
|
+ conn.commit()
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"添加列失败: {str(e)}")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"创建表失败: {str(e)}")
|
|
|
+ conn.rollback()
|
|
|
+ raise
|
|
|
+ finally:
|
|
|
+ if cur:
|
|
|
+ cur.close()
|
|
|
+ if conn:
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+def load_excel_to_postgresql(file_path, table_name, metadata_list):
|
|
|
+ """
|
|
|
+ 加载Excel数据到PostgreSQL表
|
|
|
+
|
|
|
+ Args:
|
|
|
+ file_path: Excel文件路径
|
|
|
+ table_name: 表名
|
|
|
+ metadata_list: 元数据列表
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ int: 加载的记录数
|
|
|
+ """
|
|
|
+ conn = None
|
|
|
+ cur = None
|
|
|
+ try:
|
|
|
+ # 读取Excel数据
|
|
|
+ df = pd.read_excel(file_path)
|
|
|
+
|
|
|
+ # 如果Excel文件为空,返回0
|
|
|
+ if df.empty:
|
|
|
+ logger.warning(f"Excel文件 {file_path} 为空")
|
|
|
+ return 0
|
|
|
+
|
|
|
+ # 如果元数据列表为空,尝试自动创建表
|
|
|
+ if not metadata_list:
|
|
|
+ logger.warning("元数据列表为空,尝试根据Excel文件自动创建表")
|
|
|
+
|
|
|
+ # 创建数据库连接
|
|
|
+ conn = psycopg2.connect(**get_pg_config())
|
|
|
+ cur = conn.cursor()
|
|
|
+
|
|
|
+ # 检查schema是否存在
|
|
|
+ cur.execute("CREATE SCHEMA IF NOT EXISTS ods;")
|
|
|
+
|
|
|
+ # 检查表是否存在
|
|
|
+ cur.execute(f"""
|
|
|
+ SELECT EXISTS (
|
|
|
+ SELECT FROM information_schema.tables
|
|
|
+ WHERE table_schema = 'ods'
|
|
|
+ AND table_name = '{table_name}'
|
|
|
+ );
|
|
|
+ """)
|
|
|
+ table_exists = cur.fetchone()[0]
|
|
|
+
|
|
|
+ # 如果表不存在,根据DataFrame自动创建
|
|
|
+ if not table_exists:
|
|
|
+ # 生成列定义
|
|
|
+ columns = []
|
|
|
+ for col_name in df.columns:
|
|
|
+ # 生成有效的SQL列名
|
|
|
+ sql_col_name = re.sub(r'\W+', '_', col_name).lower()
|
|
|
+
|
|
|
+ # 根据数据类型推断SQL类型
|
|
|
+ dtype = df[col_name].dtype
|
|
|
+ if pd.api.types.is_integer_dtype(dtype):
|
|
|
+ sql_type = 'INTEGER'
|
|
|
+ elif pd.api.types.is_float_dtype(dtype):
|
|
|
+ sql_type = 'NUMERIC(15,2)'
|
|
|
+ elif pd.api.types.is_datetime64_dtype(dtype):
|
|
|
+ sql_type = 'TIMESTAMP'
|
|
|
+ elif pd.api.types.is_bool_dtype(dtype):
|
|
|
+ sql_type = 'BOOLEAN'
|
|
|
+ else:
|
|
|
+ sql_type = 'VARCHAR(255)'
|
|
|
+
|
|
|
+ columns.append(f"{sql_col_name} {sql_type}")
|
|
|
+
|
|
|
+ # 创建表,只包含created_at时间戳字段
|
|
|
+ create_sql = f"""
|
|
|
+ CREATE TABLE ods.{table_name} (
|
|
|
+ id SERIAL PRIMARY KEY,
|
|
|
+ {', '.join(columns)},
|
|
|
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
+ );
|
|
|
+ """
|
|
|
+ logger.info(f"自动生成的建表SQL: {create_sql}")
|
|
|
+ cur.execute(create_sql)
|
|
|
+ conn.commit()
|
|
|
+ logger.info(f"表 ods.{table_name} 自动创建成功")
|
|
|
+ else:
|
|
|
+ # 检查是否存在created_at列
|
|
|
+ cur.execute(f"""
|
|
|
+ SELECT EXISTS (
|
|
|
+ SELECT FROM information_schema.columns
|
|
|
+ WHERE table_schema = 'ods'
|
|
|
+ AND table_name = '{table_name}'
|
|
|
+ AND column_name = 'created_at'
|
|
|
+ );
|
|
|
+ """)
|
|
|
+ created_at_exists = cur.fetchone()[0]
|
|
|
+
|
|
|
+ # 如果created_at列不存在,添加它
|
|
|
+ if not created_at_exists:
|
|
|
+ alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;"
|
|
|
+ logger.info(f"添加created_at列: {alter_sql}")
|
|
|
+ cur.execute(alter_sql)
|
|
|
+ conn.commit()
|
|
|
+
|
|
|
+ cur.close()
|
|
|
+ conn.close()
|
|
|
+ cur = None
|
|
|
+ conn = None
|
|
|
+
|
|
|
+ # 创建临时元数据列表用于插入数据
|
|
|
+ metadata_list = []
|
|
|
+ for col_name in df.columns:
|
|
|
+ sql_col_name = re.sub(r'\W+', '_', col_name).lower()
|
|
|
+ metadata_list.append({
|
|
|
+ 'name_zh': col_name,
|
|
|
+ 'name_en': sql_col_name
|
|
|
+ })
|
|
|
+
|
|
|
+ # 创建数据库连接
|
|
|
+ conn = psycopg2.connect(**get_pg_config())
|
|
|
+ cur = conn.cursor()
|
|
|
+
|
|
|
+ # 准备插入数据
|
|
|
+ records = []
|
|
|
+ for _, row in df.iterrows():
|
|
|
+ record = {}
|
|
|
+ for meta in metadata_list:
|
|
|
+ if 'name_zh' in meta and meta['name_zh'] in df.columns and 'name_en' in meta:
|
|
|
+ # 获取Excel中的值
|
|
|
+ value = row[meta['name_zh']]
|
|
|
+ # 处理NaN和None值
|
|
|
+ if pd.isna(value):
|
|
|
+ value = None
|
|
|
+ record[meta['name_en']] = value
|
|
|
+ records.append(record)
|
|
|
+
|
|
|
+ # 如果没有有效记录,返回0
|
|
|
+ if not records:
|
|
|
+ logger.warning("没有有效记录可插入")
|
|
|
+ return 0
|
|
|
+
|
|
|
+ # 获取列名列表,只包括元数据列(不再包括insert_dt)
|
|
|
+ columns = [meta['name_en'] for meta in metadata_list if 'name_en' in meta]
|
|
|
+ if not columns:
|
|
|
+ logger.warning("没有有效列名")
|
|
|
+ return 0
|
|
|
+
|
|
|
+ # 正确使用execute_values的方式
|
|
|
+ insert_sql = f"""
|
|
|
+ INSERT INTO ods.{table_name} ({", ".join(columns)})
|
|
|
+ VALUES %s
|
|
|
+ """
|
|
|
+
|
|
|
+ # 准备要插入的数据元组
|
|
|
+ values = []
|
|
|
+ for record in records:
|
|
|
+ # 只包含数据列的值,不再需要添加时间戳
|
|
|
+ row_values = tuple(record.get(col, None) for col in columns)
|
|
|
+ values.append(row_values)
|
|
|
+
|
|
|
+ # 执行批量插入
|
|
|
+ execute_values(cur, insert_sql, values)
|
|
|
+ conn.commit()
|
|
|
+
|
|
|
+ # 返回插入的记录数
|
|
|
+ return len(values)
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"加载Excel数据到PostgreSQL失败: {str(e)}", exc_info=True)
|
|
|
+ if conn:
|
|
|
+ conn.rollback()
|
|
|
+ raise
|
|
|
+ finally:
|
|
|
+ if cur:
|
|
|
+ cur.close()
|
|
|
+ if conn:
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+def get_full_storage_path(relative_path):
|
|
|
+ """
|
|
|
+ 根据相对路径获取完整的存储路径
|
|
|
+
|
|
|
+ Args:
|
|
|
+ relative_path: Neo4j中存储的相对路径
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ str: 完整的存储路径
|
|
|
+ """
|
|
|
+ base_path = current_app.config['UPLOAD_BASE_PATH']
|
|
|
+ # 移除路径开头的斜杠(如果有)
|
|
|
+ relative_path = relative_path.lstrip('\\/')
|
|
|
+ # 根据操作系统使用正确的路径分隔符
|
|
|
+ if os.name == 'nt': # Windows
|
|
|
+ full_path = os.path.join(base_path, relative_path.replace('/', '\\'))
|
|
|
+ else: # Linux/Unix
|
|
|
+ full_path = os.path.join(base_path, relative_path.replace('\\', '/'))
|
|
|
+ return os.path.normpath(full_path)
|
|
|
+
|
|
|
+def get_archive_path():
|
|
|
+ """
|
|
|
+ 获取当前日期的归档路径
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ str: 归档路径
|
|
|
+ """
|
|
|
+ base_path = current_app.config['ARCHIVE_BASE_PATH']
|
|
|
+ date_folder = datetime.now().strftime('%Y-%m-%d')
|
|
|
+ archive_path = os.path.join(base_path, date_folder)
|
|
|
+
|
|
|
+ # 确保归档目录存在
|
|
|
+ os.makedirs(archive_path, exist_ok=True)
|
|
|
+ return archive_path
|
|
|
+
|
|
|
+def archive_excel_file(file_path):
|
|
|
+ """
|
|
|
+ 将Excel文件复制到归档目录,保持原始文件名
|
|
|
+
|
|
|
+ Args:
|
|
|
+ file_path: Excel文件的完整路径
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ str: 归档后的文件路径
|
|
|
+ """
|
|
|
+ archive_path = get_archive_path()
|
|
|
+ file_name = os.path.basename(file_path)
|
|
|
+ archive_file_path = os.path.join(archive_path, file_name)
|
|
|
+
|
|
|
+ # 如果文件已经存在于归档目录,替换它
|
|
|
+ if os.path.exists(archive_file_path):
|
|
|
+ os.remove(archive_file_path)
|
|
|
+ logger.info(f"覆盖已存在的归档文件: {archive_file_path}")
|
|
|
+
|
|
|
+ # 复制文件到归档目录
|
|
|
+ shutil.copy2(file_path, archive_file_path)
|
|
|
+ logger.info(f"文件已归档: {archive_file_path}")
|
|
|
+
|
|
|
+ # 删除原始文件
|
|
|
+ os.remove(file_path)
|
|
|
+ logger.info(f"删除原始文件: {file_path}")
|
|
|
+
|
|
|
+ return archive_file_path
|
|
|
+
|
|
|
+def execute_production_line(resource_id):
|
|
|
+ """
|
|
|
+ 执行生产线数据加载
|
|
|
+
|
|
|
+ Args:
|
|
|
+ resource_id: 数据资源ID
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ dict: 执行结果
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 首先获取资源信息,判断类型
|
|
|
+ resource_type = get_resource_type(resource_id)
|
|
|
+
|
|
|
+ # 根据资源类型执行不同的加载逻辑
|
|
|
+ if resource_type == 'ddl':
|
|
|
+ # DDL类型资源,执行数据库抽取
|
|
|
+ return execute_ddl_extraction(resource_id)
|
|
|
+ else:
|
|
|
+ # 其他类型(structure等),执行Excel数据加载
|
|
|
+ return execute_excel_loading(resource_id)
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"执行生产线失败: {str(e)}", exc_info=True)
|
|
|
+ return {
|
|
|
+ "status": "error",
|
|
|
+ "message": str(e)
|
|
|
+ }
|
|
|
+
|
|
|
+def get_resource_type(resource_id):
|
|
|
+ """
|
|
|
+ 获取资源类型
|
|
|
+
|
|
|
+ Args:
|
|
|
+ resource_id: 数据资源ID
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ str: 资源类型,如'ddl'或'structure'
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ with neo4j_driver.get_session() as session:
|
|
|
+ # 查询资源类型
|
|
|
+ cypher = """
|
|
|
+ MATCH (n:DataResource)
|
|
|
+ WHERE id(n) = $resource_id
|
|
|
+ RETURN n.type as type
|
|
|
+ """
|
|
|
+ result = session.run(cypher, resource_id=int(resource_id))
|
|
|
+ record = result.single()
|
|
|
+
|
|
|
+ if not record:
|
|
|
+ raise ValueError(f"找不到ID为{resource_id}的数据资源")
|
|
|
+
|
|
|
+ return record["type"] or 'structure' # 默认为structure类型
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"获取资源类型失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+def execute_excel_loading(resource_id):
|
|
|
+ """
|
|
|
+ 执行Excel文件数据加载(原有的加载逻辑)
|
|
|
+
|
|
|
+ Args:
|
|
|
+ resource_id: 数据资源ID
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ dict: 执行结果
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 获取PostgreSQL配置
|
|
|
+ pg_config = get_pg_config()
|
|
|
+
|
|
|
+ # 1. 获取存储信息
|
|
|
+ storage_location, name_zh, name_en, metadata_list = get_resource_storage_info(resource_id)
|
|
|
+
|
|
|
+ # 2. 检查并创建表
|
|
|
+ check_and_create_table(name_en, metadata_list)
|
|
|
+
|
|
|
+ # 3. 获取完整的存储路径并扫描Excel文件
|
|
|
+ full_storage_path = get_full_storage_path(storage_location)
|
|
|
+
|
|
|
+ if not os.path.exists(full_storage_path):
|
|
|
+ # 如果目录不存在,创建它
|
|
|
+ try:
|
|
|
+ os.makedirs(full_storage_path, exist_ok=True)
|
|
|
+ logger.info(f"创建目录: {full_storage_path}")
|
|
|
+ except Exception as e:
|
|
|
+ raise ValueError(f"无法创建存储路径: {full_storage_path}, 错误: {str(e)}")
|
|
|
+
|
|
|
+ # 首先使用中文名查找文件
|
|
|
+ excel_files = []
|
|
|
+ if name_zh:
|
|
|
+ excel_files = [
|
|
|
+ f for f in os.listdir(full_storage_path)
|
|
|
+ if f.startswith(name_zh) and f.endswith(('.xlsx', '.xls'))
|
|
|
+ ]
|
|
|
+ if excel_files:
|
|
|
+ logger.info(f"使用中文名'{name_zh}'找到Excel文件: {excel_files}")
|
|
|
+
|
|
|
+ # 如果使用中文名没找到文件,尝试使用英文名
|
|
|
+ if not excel_files and name_en:
|
|
|
+ excel_files = [
|
|
|
+ f for f in os.listdir(full_storage_path)
|
|
|
+ if f.startswith(name_en) and f.endswith(('.xlsx', '.xls'))
|
|
|
+ ]
|
|
|
+ if excel_files:
|
|
|
+ logger.info(f"使用英文名'{name_en}'找到Excel文件: {excel_files}")
|
|
|
+
|
|
|
+ # 如果两种方式都没找到文件,报错
|
|
|
+ if not excel_files:
|
|
|
+ error_msg = (
|
|
|
+ f"未找到匹配的Excel文件\n"
|
|
|
+ f"搜索路径: {full_storage_path}\n"
|
|
|
+ f"尝试查找的文件名模式:\n"
|
|
|
+ f"1. {name_zh}*.xlsx/xls (中文名)\n"
|
|
|
+ f"2. {name_en}*.xlsx/xls (英文名)\n"
|
|
|
+ f"请确认文件已上传到正确位置,且文件名以资源的中文名或英文名开头"
|
|
|
+ )
|
|
|
+ logger.error(error_msg)
|
|
|
+ raise ValueError(error_msg)
|
|
|
+
|
|
|
+ # 4. 加载数据并归档文件
|
|
|
+ total_records = 0
|
|
|
+ processed_files = []
|
|
|
+ archived_files = []
|
|
|
+
|
|
|
+ for excel_file in excel_files:
|
|
|
+ file_path = os.path.join(full_storage_path, excel_file)
|
|
|
+ try:
|
|
|
+ # 如果元数据为空,尝试从Excel文件中推断
|
|
|
+ if not metadata_list:
|
|
|
+ logger.info(f"尝试从Excel文件 {excel_file} 推断元数据")
|
|
|
+ metadata_list = extract_metadata_from_excel(file_path, name_en)
|
|
|
+ if metadata_list:
|
|
|
+ # 重新尝试创建表
|
|
|
+ check_and_create_table(name_en, metadata_list)
|
|
|
+ else:
|
|
|
+ logger.warning("无法从Excel文件推断元数据,将尝试直接加载数据")
|
|
|
+
|
|
|
+ # 加载数据到PostgreSQL
|
|
|
+ records = load_excel_to_postgresql(file_path, name_en, metadata_list)
|
|
|
+ total_records += records
|
|
|
+ processed_files.append(excel_file)
|
|
|
+
|
|
|
+ # 归档成功处理的文件
|
|
|
+ archived_path = archive_excel_file(file_path)
|
|
|
+ archived_files.append(archived_path)
|
|
|
+
|
|
|
+ logger.info(f"已处理并归档文件 {excel_file}, 加载 {records} 条记录")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"处理文件 {excel_file} 失败: {str(e)}", exc_info=True)
|
|
|
+ raise
|
|
|
+
|
|
|
+ return {
|
|
|
+ "status": "success",
|
|
|
+ "message": f"数据加载成功,共处理 {len(processed_files)} 个文件,加载 {total_records} 条记录",
|
|
|
+ "total_records": total_records,
|
|
|
+ "files_processed": processed_files,
|
|
|
+ "files_archived": archived_files
|
|
|
+ }
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"执行Excel加载失败: {str(e)}", exc_info=True)
|
|
|
+ return {
|
|
|
+ "status": "error",
|
|
|
+ "message": str(e)
|
|
|
+ }
|
|
|
+
|
|
|
+def extract_metadata_from_excel(file_path, table_name):
|
|
|
+ """
|
|
|
+ 从Excel文件中提取元数据
|
|
|
+
|
|
|
+ Args:
|
|
|
+ file_path: Excel文件路径
|
|
|
+ table_name: 表名(用于翻译列名)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ list: 元数据列表
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 读取Excel文件的第一行作为列名
|
|
|
+ df = pd.read_excel(file_path, nrows=0)
|
|
|
+
|
|
|
+ if df.empty:
|
|
|
+ logger.warning(f"Excel文件 {file_path} 为空")
|
|
|
+ return []
|
|
|
+
|
|
|
+ # 获取列名
|
|
|
+ column_names = df.columns.tolist()
|
|
|
+
|
|
|
+ # 翻译列名
|
|
|
+ metadata_list = []
|
|
|
+ for name in column_names:
|
|
|
+ # 使用已有的翻译功能
|
|
|
+ try:
|
|
|
+ from app.core.meta_data import translate_and_parse
|
|
|
+ from app.core.meta_data import infer_column_type
|
|
|
+
|
|
|
+ # 翻译列名
|
|
|
+ name_en = translate_and_parse(name)[0] if name else f"column_{len(metadata_list)}"
|
|
|
+
|
|
|
+ # 确保列名是合法的SQL标识符
|
|
|
+ name_en = re.sub(r'\W+', '_', name_en).lower()
|
|
|
+
|
|
|
+ # 推断数据类型
|
|
|
+ df_sample = pd.read_excel(file_path, nrows=10)
|
|
|
+ col_index = column_names.index(name)
|
|
|
+ col_types = infer_column_type(df_sample)
|
|
|
+ data_type = col_types[col_index] if col_index < len(col_types) else 'VARCHAR(255)'
|
|
|
+
|
|
|
+ metadata_list.append({
|
|
|
+ 'name_zh': name,
|
|
|
+ 'name_en': name_en,
|
|
|
+ 'data_type': data_type
|
|
|
+ })
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"处理列 {name} 时出错: {str(e)}")
|
|
|
+ # 使用默认值
|
|
|
+ name_en = f"column_{len(metadata_list)}"
|
|
|
+ metadata_list.append({
|
|
|
+ 'name_zh': name,
|
|
|
+ 'name_en': name_en,
|
|
|
+ 'data_type': 'VARCHAR(255)'
|
|
|
+ })
|
|
|
+
|
|
|
+ logger.info(f"从Excel推断出的元数据: {metadata_list}")
|
|
|
+ return metadata_list
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"从Excel文件提取元数据失败: {str(e)}")
|
|
|
+ return []
|
|
|
+
|
|
|
+def execute_ddl_extraction(resource_id):
|
|
|
+ """
|
|
|
+ 执行DDL资源数据抽取
|
|
|
+
|
|
|
+ Args:
|
|
|
+ resource_id: 数据资源ID
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ dict: 执行结果
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ from sqlalchemy import create_engine, text
|
|
|
+ import pandas as pd
|
|
|
+
|
|
|
+ logger.info(f"开始执行DDL资源数据抽取,ID: {resource_id}")
|
|
|
+
|
|
|
+ # 1. 获取资源详情
|
|
|
+ resource_data = get_resource_details(resource_id)
|
|
|
+ if not resource_data:
|
|
|
+ return {"status": "error", "message": f"资源不存在,ID: {resource_id}"}
|
|
|
+
|
|
|
+ # 2. 获取资源元数据
|
|
|
+ metadata_list = resource_data.get('meta_list', [])
|
|
|
+ if not metadata_list:
|
|
|
+ return {"status": "error", "message": "资源没有元数据信息,无法创建表"}
|
|
|
+
|
|
|
+ # 3. 获取资源表名
|
|
|
+ target_table_name = resource_data.get('name_en')
|
|
|
+ if not target_table_name:
|
|
|
+ return {"status": "error", "message": "资源没有英文名称,无法确定目标表名"}
|
|
|
+
|
|
|
+ # 4. 获取关联的数据源信息
|
|
|
+ data_source_info = get_resource_data_source(resource_id)
|
|
|
+ if not data_source_info:
|
|
|
+ return {"status": "error", "message": "无法获取关联的数据源信息"}
|
|
|
+
|
|
|
+ # 5. 在PostgreSQL中创建目标表
|
|
|
+ create_result = create_target_table(target_table_name, metadata_list)
|
|
|
+ if not create_result["success"]:
|
|
|
+ return {"status": "error", "message": f"创建目标表失败: {create_result['message']}"}
|
|
|
+
|
|
|
+ # 6. 执行数据抽取
|
|
|
+ extract_result = extract_data_to_postgres(data_source_info, target_table_name, metadata_list)
|
|
|
+
|
|
|
+ return {
|
|
|
+ "status": "success",
|
|
|
+ "message": f"数据抽取成功,从{extract_result['source_table']}表抽取到{extract_result['target_table']}表,共处理 {extract_result['total_records']} 条记录,执行了 {extract_result['execution_time']:.2f} 秒",
|
|
|
+ "total_records": extract_result['total_records'],
|
|
|
+ "source_table": extract_result['source_table'],
|
|
|
+ "target_table": extract_result['target_table'],
|
|
|
+ "execution_time": extract_result['execution_time']
|
|
|
+ }
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"DDL数据抽取失败: {str(e)}", exc_info=True)
|
|
|
+ return {
|
|
|
+ "status": "error",
|
|
|
+ "message": str(e)
|
|
|
+ }
|
|
|
+
|
|
|
+def get_resource_details(resource_id):
|
|
|
+ """
|
|
|
+ 获取资源详细信息
|
|
|
+
|
|
|
+ Args:
|
|
|
+ resource_id: 数据资源ID
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ dict: 资源详情
|
|
|
+ """
|
|
|
+ from app.core.data_resource.resource import handle_id_resource
|
|
|
+ return handle_id_resource(resource_id)
|
|
|
+
|
|
|
+def get_resource_data_source(resource_id):
|
|
|
+ """获取数据资源关联的数据源信息"""
|
|
|
+ try:
|
|
|
+ with neo4j_driver.get_session() as session:
|
|
|
+ # 查询数据资源节点连接的数据源节点
|
|
|
+ cypher = """
|
|
|
+ MATCH (n:DataResource)-[:originates_from]->(ds:DataSource)
|
|
|
+ WHERE id(n) = $resource_id
|
|
|
+ RETURN ds
|
|
|
+ """
|
|
|
+
|
|
|
+ result = session.run(cypher, resource_id=int(resource_id))
|
|
|
+ record = result.single()
|
|
|
+
|
|
|
+ if not record:
|
|
|
+ logger.warning(f"资源ID {resource_id} 没有关联的数据源")
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 构建数据源连接信息
|
|
|
+ data_source = dict(record["ds"])
|
|
|
+ return {
|
|
|
+ "type": data_source.get("type", "").lower(),
|
|
|
+ "host": data_source.get("host"),
|
|
|
+ "port": data_source.get("port"),
|
|
|
+ "database": data_source.get("database"),
|
|
|
+ "username": data_source.get("username"),
|
|
|
+ "password": data_source.get("password")
|
|
|
+ # 如果需要其他参数可以添加
|
|
|
+ # "param": data_source.get("param")
|
|
|
+ }
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"获取数据源信息失败: {str(e)}")
|
|
|
+ return None
|
|
|
+
|
|
|
+def create_target_table(table_name, metadata_list):
|
|
|
+ """
|
|
|
+ 在PostgreSQL中创建目标表
|
|
|
+
|
|
|
+ Args:
|
|
|
+ table_name: 表名
|
|
|
+ metadata_list: 元数据列表
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ dict: {"success": bool, "message": str}
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ import psycopg2
|
|
|
+ from flask import current_app
|
|
|
+
|
|
|
+ # 获取PostgreSQL配置
|
|
|
+ pg_config = get_pg_config()
|
|
|
+
|
|
|
+ conn = psycopg2.connect(**pg_config)
|
|
|
+ cur = conn.cursor()
|
|
|
+
|
|
|
+ # 检查schema是否存在
|
|
|
+ cur.execute("CREATE SCHEMA IF NOT EXISTS ods;")
|
|
|
+
|
|
|
+ # 检查表是否存在
|
|
|
+ cur.execute("""
|
|
|
+ SELECT EXISTS (
|
|
|
+ SELECT FROM information_schema.tables
|
|
|
+ WHERE table_schema = 'ods'
|
|
|
+ AND table_name = %s
|
|
|
+ );
|
|
|
+ """, (table_name,))
|
|
|
+
|
|
|
+ table_exists = cur.fetchone()[0]
|
|
|
+
|
|
|
+ if table_exists:
|
|
|
+ logger.info(f"表 ods.{table_name} 已存在,将跳过创建")
|
|
|
+ return {"success": True, "message": f"表 ods.{table_name} 已存在"}
|
|
|
+
|
|
|
+ # 构建列定义
|
|
|
+ columns = []
|
|
|
+ for meta in metadata_list:
|
|
|
+ column_name = meta.get('name_en')
|
|
|
+ data_type = meta.get('data_type')
|
|
|
+
|
|
|
+ if column_name and data_type:
|
|
|
+ columns.append(f"{column_name} {data_type}")
|
|
|
+
|
|
|
+ if not columns:
|
|
|
+ return {"success": False, "message": "没有有效的列定义"}
|
|
|
+
|
|
|
+ # 构建建表SQL
|
|
|
+ sql = f"""
|
|
|
+ CREATE TABLE ods.{table_name} (
|
|
|
+ id SERIAL PRIMARY KEY,
|
|
|
+ {", ".join(columns)},
|
|
|
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
+ )
|
|
|
+ """
|
|
|
+
|
|
|
+ logger.info(f"创建表SQL: {sql}")
|
|
|
+ cur.execute(sql)
|
|
|
+ conn.commit()
|
|
|
+ logger.info(f"表 ods.{table_name} 创建成功")
|
|
|
+
|
|
|
+ return {"success": True, "message": f"表 ods.{table_name} 创建成功"}
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"创建目标表失败: {str(e)}")
|
|
|
+ if 'conn' in locals() and conn:
|
|
|
+ conn.rollback()
|
|
|
+ return {"success": False, "message": str(e)}
|
|
|
+ finally:
|
|
|
+ if 'cur' in locals() and cur:
|
|
|
+ cur.close()
|
|
|
+ if 'conn' in locals() and conn:
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+def extract_data_to_postgres(source_conn_info, target_table, metadata_list):
|
|
|
+ """
|
|
|
+ 从源数据库抽取数据到PostgreSQL
|
|
|
+
|
|
|
+ Args:
|
|
|
+ source_conn_info: 源数据库连接信息
|
|
|
+ target_table: 目标表名
|
|
|
+ metadata_list: 元数据列表
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ dict: 抽取结果
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ from sqlalchemy import create_engine, text
|
|
|
+ import pandas as pd
|
|
|
+ from flask import current_app
|
|
|
+
|
|
|
+ # 源表名称与目标表相同
|
|
|
+ source_table = target_table
|
|
|
+
|
|
|
+ # 批处理大小
|
|
|
+ batch_size = current_app.config.get('DATA_EXTRACT_BATCH_SIZE', 1000)
|
|
|
+
|
|
|
+ # 源数据库连接字符串构建
|
|
|
+ db_type = source_conn_info["type"]
|
|
|
+ if db_type == "mysql":
|
|
|
+ # 对用户名、密码和数据库名进行URL编码,处理特殊字符
|
|
|
+ encoded_username = quote(source_conn_info['username'], safe='')
|
|
|
+ encoded_password = quote(source_conn_info['password'], safe='')
|
|
|
+ encoded_database = quote(source_conn_info['database'], safe='')
|
|
|
+ connection_string = f"mysql+pymysql://{encoded_username}:{encoded_password}@{source_conn_info['host']}:{source_conn_info['port']}/{encoded_database}"
|
|
|
+
|
|
|
+ # 检查是否存在param参数,如存在则添加到连接字符串中
|
|
|
+ if 'param' in source_conn_info and source_conn_info['param']:
|
|
|
+ # 确保param参数以&开头
|
|
|
+ param = source_conn_info['param']
|
|
|
+ if not param.startswith('&'):
|
|
|
+ param = '&' + param
|
|
|
+ connection_string = f"{connection_string}?{param[1:]}"
|
|
|
+ logger.debug(f"添加了数据源的param参数: {param}")
|
|
|
+
|
|
|
+ elif db_type == "postgresql":
|
|
|
+ # 对用户名、密码和数据库名进行URL编码,处理特殊字符
|
|
|
+ encoded_username = quote(source_conn_info['username'], safe='')
|
|
|
+ encoded_password = quote(source_conn_info['password'], safe='')
|
|
|
+ encoded_database = quote(source_conn_info['database'], safe='')
|
|
|
+ connection_string = f"postgresql://{encoded_username}:{encoded_password}@{source_conn_info['host']}:{source_conn_info['port']}/{encoded_database}"
|
|
|
+ else:
|
|
|
+ raise ValueError(f"不支持的数据库类型: {db_type}")
|
|
|
+
|
|
|
+ # 目标数据库连接参数
|
|
|
+ pg_config = get_pg_config()
|
|
|
+ # 对用户名、密码和数据库名进行URL编码,处理特殊字符
|
|
|
+ encoded_user = quote(pg_config['user'], safe='')
|
|
|
+ encoded_password = quote(pg_config['password'], safe='')
|
|
|
+ encoded_dbname = quote(pg_config['dbname'], safe='')
|
|
|
+ target_connection_string = f"postgresql://{encoded_user}:{encoded_password}@{pg_config['host']}:{pg_config['port']}/{encoded_dbname}"
|
|
|
+
|
|
|
+ # 记录最终连接字符串
|
|
|
+ logger.debug(f"python连接源表的最终连接字符串: {connection_string}")
|
|
|
+
|
|
|
+ # 连接源数据库
|
|
|
+ source_engine = create_engine(connection_string)
|
|
|
+
|
|
|
+ # 连接目标数据库
|
|
|
+ target_engine = create_engine(target_connection_string)
|
|
|
+
|
|
|
+ # 获取元数据列名,构建查询字段列表
|
|
|
+ column_names = [meta.get('name_en') for meta in metadata_list if meta.get('name_en')]
|
|
|
+ if not column_names:
|
|
|
+ raise ValueError("没有有效的列名")
|
|
|
+
|
|
|
+ # 构建查询语句
|
|
|
+ select_columns = ", ".join(column_names)
|
|
|
+ query = f"SELECT {select_columns} FROM {source_table}"
|
|
|
+
|
|
|
+ # 获取记录总数
|
|
|
+ with source_engine.connect() as conn:
|
|
|
+ count_result = conn.execute(text(f"SELECT COUNT(*) FROM {source_table}"))
|
|
|
+ total_count = count_result.scalar()
|
|
|
+
|
|
|
+ # 分批抽取数据
|
|
|
+ total_records = 0
|
|
|
+ offset = 0
|
|
|
+
|
|
|
+ # 计算开始时间
|
|
|
+ start_time = time.time()
|
|
|
+
|
|
|
+ while offset < total_count:
|
|
|
+ # 构建带有分页的查询
|
|
|
+ paginated_query = f"{query} LIMIT {batch_size} OFFSET {offset}"
|
|
|
+
|
|
|
+ # 读取数据批次
|
|
|
+ df = pd.read_sql(paginated_query, source_engine)
|
|
|
+ batch_count = len(df)
|
|
|
+
|
|
|
+ if batch_count == 0:
|
|
|
+ break
|
|
|
+
|
|
|
+ # 写入目标数据库
|
|
|
+ df.to_sql(
|
|
|
+ target_table,
|
|
|
+ target_engine,
|
|
|
+ schema='ods',
|
|
|
+ if_exists='append',
|
|
|
+ index=False
|
|
|
+ )
|
|
|
+
|
|
|
+ total_records += batch_count
|
|
|
+ offset += batch_size
|
|
|
+
|
|
|
+ logger.info(f"已抽取 {total_records}/{total_count} 条记录")
|
|
|
+
|
|
|
+ # 计算执行时间
|
|
|
+ end_time = time.time()
|
|
|
+ execution_time = end_time - start_time
|
|
|
+ logger.info(f"作业抽取了 {total_records} 条记录,执行了 {execution_time:.2f} 秒")
|
|
|
+
|
|
|
+ return {
|
|
|
+ "total_records": total_records,
|
|
|
+ "source_table": source_table,
|
|
|
+ "target_table": f"ods.{target_table}",
|
|
|
+ "execution_time": execution_time
|
|
|
+ }
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"数据抽取失败: {str(e)}")
|
|
|
+ raise
|