|
@@ -8,6 +8,8 @@ from psycopg2 import sql
|
|
|
import logging
|
|
|
from app.services.neo4j_driver import neo4j_driver
|
|
|
import shutil
|
|
|
+import re
|
|
|
+from psycopg2.extras import execute_values
|
|
|
|
|
|
def production_draw_graph(id, type):
|
|
|
"""
|
|
@@ -208,7 +210,10 @@ def get_resource_storage_info(resource_id):
|
|
|
result = session.run(resource_query, resource_id=int(resource_id))
|
|
|
resource_data = result.single()
|
|
|
|
|
|
- if not resource_data or not resource_data['storage_location']:
|
|
|
+ if not resource_data:
|
|
|
+ raise ValueError(f"找不到ID为{resource_id}的数据资源")
|
|
|
+
|
|
|
+ if not resource_data['storage_location']:
|
|
|
raise ValueError("存储位置未配置")
|
|
|
|
|
|
# 查询元数据节点
|
|
@@ -220,6 +225,10 @@ def get_resource_storage_info(resource_id):
|
|
|
result = session.run(metadata_query, resource_id=int(resource_id))
|
|
|
metadata_list = [dict(record) for record in result]
|
|
|
|
|
|
+ # 检查元数据列表是否为空
|
|
|
+ if not metadata_list:
|
|
|
+ logger.warning(f"数据资源 {resource_id} 没有元数据节点,将尝试从Excel文件推断元数据")
|
|
|
+
|
|
|
# 检查英文名是否存在
|
|
|
if not resource_data['en_name']:
|
|
|
raise ValueError("数据资源的英文名不能为空")
|
|
@@ -261,6 +270,11 @@ def check_and_create_table(table_name, metadata_list):
|
|
|
table_exists = cur.fetchone()[0]
|
|
|
|
|
|
if not table_exists:
|
|
|
+ # 如果元数据列表为空,无法创建表
|
|
|
+ if not metadata_list:
|
|
|
+ logger.warning(f"元数据列表为空,无法创建表。将在加载数据时自动创建")
|
|
|
+ return
|
|
|
+
|
|
|
# 打印元数据列表用于调试
|
|
|
logger.info(f"元数据列表: {metadata_list}")
|
|
|
|
|
@@ -268,89 +282,248 @@ def check_and_create_table(table_name, metadata_list):
|
|
|
columns = [
|
|
|
f"{meta['en_name']} {meta['type']}"
|
|
|
for meta in metadata_list
|
|
|
+ if 'en_name' in meta and meta['en_name'] and 'type' in meta and meta['type']
|
|
|
]
|
|
|
- columns.append("insert_dt timestamp")
|
|
|
|
|
|
- create_table_sql = sql.SQL("""
|
|
|
- CREATE TABLE ods.{} (
|
|
|
- {}
|
|
|
- );
|
|
|
- """).format(
|
|
|
- sql.Identifier(table_name),
|
|
|
- sql.SQL(', ').join(map(sql.SQL, columns))
|
|
|
+ if not columns:
|
|
|
+ logger.warning("没有有效的列定义,无法创建表")
|
|
|
+ return
|
|
|
+
|
|
|
+ sql = f"""
|
|
|
+ CREATE TABLE ods.{table_name} (
|
|
|
+ id SERIAL PRIMARY KEY,
|
|
|
+ {", ".join(columns)},
|
|
|
+ insert_dt TIMESTAMP,
|
|
|
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
)
|
|
|
+ """
|
|
|
|
|
|
- # 打印完整的建表SQL
|
|
|
- formatted_sql = create_table_sql.as_string(conn)
|
|
|
- logger.info(f"建表SQL: {formatted_sql}")
|
|
|
-
|
|
|
- cur.execute(create_table_sql)
|
|
|
+ logger.info(f"创建表SQL: {sql}")
|
|
|
+ cur.execute(sql)
|
|
|
+ conn.commit()
|
|
|
logger.info(f"表 ods.{table_name} 创建成功")
|
|
|
+ else:
|
|
|
+ logger.info(f"表 ods.{table_name} 已存在")
|
|
|
|
|
|
- conn.commit()
|
|
|
+ # 检查是否存在insert_dt列
|
|
|
+ cur.execute(f"""
|
|
|
+ SELECT EXISTS (
|
|
|
+ SELECT FROM information_schema.columns
|
|
|
+ WHERE table_schema = 'ods'
|
|
|
+ AND table_name = '{table_name}'
|
|
|
+ AND column_name = 'insert_dt'
|
|
|
+ );
|
|
|
+ """)
|
|
|
+ insert_dt_exists = cur.fetchone()[0]
|
|
|
+
|
|
|
+ # 如果insert_dt列不存在,添加它
|
|
|
+ if not insert_dt_exists:
|
|
|
+ alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN insert_dt TIMESTAMP;"
|
|
|
+ logger.info(f"添加insert_dt列: {alter_sql}")
|
|
|
+ cur.execute(alter_sql)
|
|
|
+ conn.commit()
|
|
|
+
|
|
|
+ # 检查是否需要添加新列
|
|
|
+ if metadata_list:
|
|
|
+ # 获取现有列
|
|
|
+ cur.execute(f"""
|
|
|
+ SELECT column_name
|
|
|
+ FROM information_schema.columns
|
|
|
+ WHERE table_schema = 'ods'
|
|
|
+ AND table_name = '{table_name}'
|
|
|
+ """)
|
|
|
+ existing_columns = [row[0] for row in cur.fetchall()]
|
|
|
+
|
|
|
+ # 检查每个元数据是否需要作为新列添加
|
|
|
+ for meta in metadata_list:
|
|
|
+ if 'en_name' in meta and meta['en_name'] and meta['en_name'].lower() not in (col.lower() for col in existing_columns):
|
|
|
+ column_type = meta.get('type', 'VARCHAR(255)')
|
|
|
+ alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN {meta['en_name']} {column_type};"
|
|
|
+ logger.info(f"添加新列: {alter_sql}")
|
|
|
+ try:
|
|
|
+ cur.execute(alter_sql)
|
|
|
+ conn.commit()
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"添加列失败: {str(e)}")
|
|
|
except Exception as e:
|
|
|
- logger.error(f"检查创建表失败: {str(e)}")
|
|
|
+ logger.error(f"创建表失败: {str(e)}")
|
|
|
+ conn.rollback()
|
|
|
raise
|
|
|
finally:
|
|
|
- cur.close()
|
|
|
- conn.close()
|
|
|
+ if cur:
|
|
|
+ cur.close()
|
|
|
+ if conn:
|
|
|
+ conn.close()
|
|
|
|
|
|
def load_excel_to_postgresql(file_path, table_name, metadata_list):
|
|
|
"""
|
|
|
- 加载Excel数据到PostgreSQL
|
|
|
+ 加载Excel数据到PostgreSQL表
|
|
|
+
|
|
|
+ Args:
|
|
|
+ file_path: Excel文件路径
|
|
|
+ table_name: 表名
|
|
|
+ metadata_list: 元数据列表
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ int: 加载的记录数
|
|
|
"""
|
|
|
conn = None
|
|
|
cur = None
|
|
|
try:
|
|
|
- # 读取Excel文件
|
|
|
+ # 读取Excel数据
|
|
|
df = pd.read_excel(file_path)
|
|
|
|
|
|
- # 构建字段映射关系(中文名到英文名的映射)
|
|
|
- field_mapping = {}
|
|
|
- for meta in metadata_list:
|
|
|
- # 优先使用中文名作为Excel中的列名
|
|
|
- excel_column = meta['name'] if meta['name'] else meta['en_name']
|
|
|
- field_mapping[excel_column] = meta['en_name']
|
|
|
+ # 如果Excel文件为空,返回0
|
|
|
+ if df.empty:
|
|
|
+ logger.warning(f"Excel文件 {file_path} 为空")
|
|
|
+ return 0
|
|
|
+
|
|
|
+ # 如果元数据列表为空,尝试自动创建表
|
|
|
+ if not metadata_list:
|
|
|
+ logger.warning("元数据列表为空,尝试根据Excel文件自动创建表")
|
|
|
+
|
|
|
+ # 创建数据库连接
|
|
|
+ conn = psycopg2.connect(**get_pg_config())
|
|
|
+ cur = conn.cursor()
|
|
|
+
|
|
|
+ # 检查schema是否存在
|
|
|
+ cur.execute("CREATE SCHEMA IF NOT EXISTS ods;")
|
|
|
+
|
|
|
+ # 检查表是否存在
|
|
|
+ cur.execute(f"""
|
|
|
+ SELECT EXISTS (
|
|
|
+ SELECT FROM information_schema.tables
|
|
|
+ WHERE table_schema = 'ods'
|
|
|
+ AND table_name = '{table_name}'
|
|
|
+ );
|
|
|
+ """)
|
|
|
+ table_exists = cur.fetchone()[0]
|
|
|
+
|
|
|
+ # 如果表不存在,根据DataFrame自动创建
|
|
|
+ if not table_exists:
|
|
|
+ # 生成列定义
|
|
|
+ columns = []
|
|
|
+ for col_name in df.columns:
|
|
|
+ # 生成有效的SQL列名
|
|
|
+ sql_col_name = re.sub(r'\W+', '_', col_name).lower()
|
|
|
+
|
|
|
+ # 根据数据类型推断SQL类型
|
|
|
+ dtype = df[col_name].dtype
|
|
|
+ if pd.api.types.is_integer_dtype(dtype):
|
|
|
+ sql_type = 'INTEGER'
|
|
|
+ elif pd.api.types.is_float_dtype(dtype):
|
|
|
+ sql_type = 'NUMERIC(15,2)'
|
|
|
+ elif pd.api.types.is_datetime64_dtype(dtype):
|
|
|
+ sql_type = 'TIMESTAMP'
|
|
|
+ elif pd.api.types.is_bool_dtype(dtype):
|
|
|
+ sql_type = 'BOOLEAN'
|
|
|
+ else:
|
|
|
+ sql_type = 'VARCHAR(255)'
|
|
|
+
|
|
|
+ columns.append(f"{sql_col_name} {sql_type}")
|
|
|
+
|
|
|
+ # 创建表,包含insert_dt时间戳字段
|
|
|
+ create_sql = f"""
|
|
|
+ CREATE TABLE ods.{table_name} (
|
|
|
+ id SERIAL PRIMARY KEY,
|
|
|
+ {', '.join(columns)},
|
|
|
+ insert_dt TIMESTAMP,
|
|
|
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
+ );
|
|
|
+ """
|
|
|
+ logger.info(f"自动生成的建表SQL: {create_sql}")
|
|
|
+ cur.execute(create_sql)
|
|
|
+ conn.commit()
|
|
|
+ logger.info(f"表 ods.{table_name} 自动创建成功")
|
|
|
+ else:
|
|
|
+ # 检查是否存在insert_dt列
|
|
|
+ cur.execute(f"""
|
|
|
+ SELECT EXISTS (
|
|
|
+ SELECT FROM information_schema.columns
|
|
|
+ WHERE table_schema = 'ods'
|
|
|
+ AND table_name = '{table_name}'
|
|
|
+ AND column_name = 'insert_dt'
|
|
|
+ );
|
|
|
+ """)
|
|
|
+ insert_dt_exists = cur.fetchone()[0]
|
|
|
+
|
|
|
+ # 如果insert_dt列不存在,添加它
|
|
|
+ if not insert_dt_exists:
|
|
|
+ alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN insert_dt TIMESTAMP;"
|
|
|
+ logger.info(f"添加insert_dt列: {alter_sql}")
|
|
|
+ cur.execute(alter_sql)
|
|
|
+ conn.commit()
|
|
|
+
|
|
|
+ cur.close()
|
|
|
+ conn.close()
|
|
|
+ cur = None
|
|
|
+ conn = None
|
|
|
+
|
|
|
+ # 创建临时元数据列表用于插入数据
|
|
|
+ metadata_list = []
|
|
|
+ for col_name in df.columns:
|
|
|
+ sql_col_name = re.sub(r'\W+', '_', col_name).lower()
|
|
|
+ metadata_list.append({
|
|
|
+ 'name': col_name,
|
|
|
+ 'en_name': sql_col_name
|
|
|
+ })
|
|
|
|
|
|
- # 验证列名
|
|
|
- excel_columns = set(df.columns)
|
|
|
- expected_columns = {(meta['name'] if meta['name'] else meta['en_name']) for meta in metadata_list}
|
|
|
+ # 创建数据库连接
|
|
|
+ conn = psycopg2.connect(**get_pg_config())
|
|
|
+ cur = conn.cursor()
|
|
|
|
|
|
- if not expected_columns.issubset(excel_columns):
|
|
|
- missing_columns = expected_columns - excel_columns
|
|
|
- raise ValueError(f"Excel文件缺少必要的列: {missing_columns}")
|
|
|
+ # 准备插入数据
|
|
|
+ records = []
|
|
|
+ for _, row in df.iterrows():
|
|
|
+ record = {}
|
|
|
+ for meta in metadata_list:
|
|
|
+ if 'name' in meta and meta['name'] in df.columns and 'en_name' in meta:
|
|
|
+ # 获取Excel中的值
|
|
|
+ value = row[meta['name']]
|
|
|
+ # 处理NaN和None值
|
|
|
+ if pd.isna(value):
|
|
|
+ value = None
|
|
|
+ record[meta['en_name']] = value
|
|
|
+ records.append(record)
|
|
|
|
|
|
- # 重命名列(从中文名到英文名)
|
|
|
- df = df.rename(columns=field_mapping)
|
|
|
+ # 如果没有有效记录,返回0
|
|
|
+ if not records:
|
|
|
+ logger.warning("没有有效记录可插入")
|
|
|
+ return 0
|
|
|
|
|
|
- # 添加insert_dt列
|
|
|
- df['insert_dt'] = datetime.now()
|
|
|
+ # 获取列名列表,包括所有元数据列和insert_dt
|
|
|
+ columns = [meta['en_name'] for meta in metadata_list if 'en_name' in meta]
|
|
|
+ if not columns:
|
|
|
+ logger.warning("没有有效列名")
|
|
|
+ return 0
|
|
|
+
|
|
|
+ # 添加insert_dt列名
|
|
|
+ columns.append('insert_dt')
|
|
|
|
|
|
- # 连接数据库
|
|
|
- conn = psycopg2.connect(**get_pg_config())
|
|
|
- cur = conn.cursor()
|
|
|
+ # 正确使用execute_values的方式
|
|
|
+ insert_sql = f"""
|
|
|
+ INSERT INTO ods.{table_name} ({", ".join(columns)})
|
|
|
+ VALUES %s
|
|
|
+ """
|
|
|
|
|
|
- # 构建INSERT语句
|
|
|
- columns = [meta['en_name'] for meta in metadata_list] + ['insert_dt']
|
|
|
- insert_sql = sql.SQL("""
|
|
|
- INSERT INTO ods.{} ({})
|
|
|
- VALUES ({})
|
|
|
- """).format(
|
|
|
- sql.Identifier(table_name),
|
|
|
- sql.SQL(', ').join(map(sql.Identifier, columns)),
|
|
|
- sql.SQL(', ').join(sql.Placeholder() * len(columns))
|
|
|
- )
|
|
|
-
|
|
|
- # 批量插入数据
|
|
|
- records = df[columns].values.tolist()
|
|
|
- cur.executemany(insert_sql, records)
|
|
|
+ # 准备要插入的数据元组,包括当前时间戳
|
|
|
+ current_timestamp = datetime.now()
|
|
|
+ values = []
|
|
|
+ for record in records:
|
|
|
+ # 为每条记录添加当前时间戳
|
|
|
+ row_values = tuple(list(record.get(col, None) for col in columns[:-1]) + [current_timestamp])
|
|
|
+ values.append(row_values)
|
|
|
|
|
|
+ # 执行批量插入
|
|
|
+ execute_values(cur, insert_sql, values)
|
|
|
conn.commit()
|
|
|
- return len(records)
|
|
|
+
|
|
|
+ # 返回插入的记录数
|
|
|
+ return len(values)
|
|
|
except Exception as e:
|
|
|
+ logger.error(f"加载Excel数据到PostgreSQL失败: {str(e)}", exc_info=True)
|
|
|
if conn:
|
|
|
conn.rollback()
|
|
|
- logger.error(f"加载数据失败: {str(e)}")
|
|
|
raise
|
|
|
finally:
|
|
|
if cur:
|
|
@@ -395,7 +568,7 @@ def get_archive_path():
|
|
|
|
|
|
def archive_excel_file(file_path):
|
|
|
"""
|
|
|
- 将Excel文件移动到归档目录
|
|
|
+ 将Excel文件复制到归档目录,保持原始文件名
|
|
|
|
|
|
Args:
|
|
|
file_path: Excel文件的完整路径
|
|
@@ -407,14 +580,19 @@ def archive_excel_file(file_path):
|
|
|
file_name = os.path.basename(file_path)
|
|
|
archive_file_path = os.path.join(archive_path, file_name)
|
|
|
|
|
|
- # 如果目标文件已存在,添加时间戳
|
|
|
+ # 如果文件已经存在于归档目录,替换它
|
|
|
if os.path.exists(archive_file_path):
|
|
|
- name, ext = os.path.splitext(file_name)
|
|
|
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
- archive_file_path = os.path.join(archive_path, f"{name}_{timestamp}{ext}")
|
|
|
+ os.remove(archive_file_path)
|
|
|
+ logger.info(f"覆盖已存在的归档文件: {archive_file_path}")
|
|
|
|
|
|
- shutil.move(file_path, archive_file_path)
|
|
|
+ # 复制文件到归档目录
|
|
|
+ shutil.copy2(file_path, archive_file_path)
|
|
|
logger.info(f"文件已归档: {archive_file_path}")
|
|
|
+
|
|
|
+ # 删除原始文件
|
|
|
+ os.remove(file_path)
|
|
|
+ logger.info(f"删除原始文件: {file_path}")
|
|
|
+
|
|
|
return archive_file_path
|
|
|
|
|
|
def execute_production_line(resource_id):
|
|
@@ -488,6 +666,16 @@ def execute_production_line(resource_id):
|
|
|
for excel_file in excel_files:
|
|
|
file_path = os.path.join(full_storage_path, excel_file)
|
|
|
try:
|
|
|
+ # 如果元数据为空,尝试从Excel文件中推断
|
|
|
+ if not metadata_list:
|
|
|
+ logger.info(f"尝试从Excel文件 {excel_file} 推断元数据")
|
|
|
+ metadata_list = extract_metadata_from_excel(file_path, en_name)
|
|
|
+ if metadata_list:
|
|
|
+ # 重新尝试创建表
|
|
|
+ check_and_create_table(en_name, metadata_list)
|
|
|
+ else:
|
|
|
+ logger.warning("无法从Excel文件推断元数据,将尝试直接加载数据")
|
|
|
+
|
|
|
# 加载数据到PostgreSQL
|
|
|
records = load_excel_to_postgresql(file_path, en_name, metadata_list)
|
|
|
total_records += records
|
|
@@ -499,7 +687,7 @@ def execute_production_line(resource_id):
|
|
|
|
|
|
logger.info(f"已处理并归档文件 {excel_file}, 加载 {records} 条记录")
|
|
|
except Exception as e:
|
|
|
- logger.error(f"处理文件 {excel_file} 失败: {str(e)}")
|
|
|
+ logger.error(f"处理文件 {excel_file} 失败: {str(e)}", exc_info=True)
|
|
|
raise
|
|
|
|
|
|
return {
|
|
@@ -511,8 +699,71 @@ def execute_production_line(resource_id):
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
- logger.error(f"执行失败: {str(e)}")
|
|
|
+ logger.error(f"执行失败: {str(e)}", exc_info=True)
|
|
|
return {
|
|
|
"status": "error",
|
|
|
"message": str(e)
|
|
|
- }
|
|
|
+ }
|
|
|
+
|
|
|
+def extract_metadata_from_excel(file_path, table_name):
|
|
|
+ """
|
|
|
+ 从Excel文件中提取元数据
|
|
|
+
|
|
|
+ Args:
|
|
|
+ file_path: Excel文件路径
|
|
|
+ table_name: 表名(用于翻译列名)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ list: 元数据列表
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 读取Excel文件的第一行作为列名
|
|
|
+ df = pd.read_excel(file_path, nrows=0)
|
|
|
+
|
|
|
+ if df.empty:
|
|
|
+ logger.warning(f"Excel文件 {file_path} 为空")
|
|
|
+ return []
|
|
|
+
|
|
|
+ # 获取列名
|
|
|
+ column_names = df.columns.tolist()
|
|
|
+
|
|
|
+ # 翻译列名
|
|
|
+ metadata_list = []
|
|
|
+ for name in column_names:
|
|
|
+ # 使用已有的翻译功能
|
|
|
+ try:
|
|
|
+ from app.core.meta_data import translate_and_parse
|
|
|
+ from app.core.meta_data import infer_column_type
|
|
|
+
|
|
|
+ # 翻译列名
|
|
|
+ en_name = translate_and_parse(name)[0] if name else f"column_{len(metadata_list)}"
|
|
|
+
|
|
|
+ # 确保列名是合法的SQL标识符
|
|
|
+ en_name = re.sub(r'\W+', '_', en_name).lower()
|
|
|
+
|
|
|
+ # 推断数据类型
|
|
|
+ df_sample = pd.read_excel(file_path, nrows=10)
|
|
|
+ col_index = column_names.index(name)
|
|
|
+ col_types = infer_column_type(df_sample)
|
|
|
+ data_type = col_types[col_index] if col_index < len(col_types) else 'VARCHAR(255)'
|
|
|
+
|
|
|
+ metadata_list.append({
|
|
|
+ 'name': name,
|
|
|
+ 'en_name': en_name,
|
|
|
+ 'type': data_type
|
|
|
+ })
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"处理列 {name} 时出错: {str(e)}")
|
|
|
+ # 使用默认值
|
|
|
+ en_name = f"column_{len(metadata_list)}"
|
|
|
+ metadata_list.append({
|
|
|
+ 'name': name,
|
|
|
+ 'en_name': en_name,
|
|
|
+ 'type': 'VARCHAR(255)'
|
|
|
+ })
|
|
|
+
|
|
|
+ logger.info(f"从Excel推断出的元数据: {metadata_list}")
|
|
|
+ return metadata_list
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"从Excel文件提取元数据失败: {str(e)}")
|
|
|
+ return []
|