|
@@ -608,6 +608,63 @@ def execute_production_line(resource_id):
|
|
"""
|
|
"""
|
|
执行生产线数据加载
|
|
执行生产线数据加载
|
|
|
|
|
|
|
|
+ Args:
|
|
|
|
+ resource_id: 数据资源ID
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ dict: 执行结果
|
|
|
|
+ """
|
|
|
|
+ try:
|
|
|
|
+ # 首先获取资源信息,判断类型
|
|
|
|
+ resource_type = get_resource_type(resource_id)
|
|
|
|
+
|
|
|
|
+ # 根据资源类型执行不同的加载逻辑
|
|
|
|
+ if resource_type == 'ddl':
|
|
|
|
+ # DDL类型资源,执行数据库抽取
|
|
|
|
+ return execute_ddl_extraction(resource_id)
|
|
|
|
+ else:
|
|
|
|
+ # 其他类型(structure等),执行Excel数据加载
|
|
|
|
+ return execute_excel_loading(resource_id)
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.error(f"执行生产线失败: {str(e)}", exc_info=True)
|
|
|
|
+ return {
|
|
|
|
+ "status": "error",
|
|
|
|
+ "message": str(e)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+def get_resource_type(resource_id):
|
|
|
|
+ """
|
|
|
|
+ 获取资源类型
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ resource_id: 数据资源ID
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ str: 资源类型,如'ddl'或'structure'
|
|
|
|
+ """
|
|
|
|
+ try:
|
|
|
|
+ with neo4j_driver.get_session() as session:
|
|
|
|
+ # 查询资源类型
|
|
|
|
+ cypher = """
|
|
|
|
+ MATCH (n:data_resource)
|
|
|
|
+ WHERE id(n) = $resource_id
|
|
|
|
+ RETURN n.type as type
|
|
|
|
+ """
|
|
|
|
+ result = session.run(cypher, resource_id=int(resource_id))
|
|
|
|
+ record = result.single()
|
|
|
|
+
|
|
|
|
+ if not record:
|
|
|
|
+ raise ValueError(f"找不到ID为{resource_id}的数据资源")
|
|
|
|
+
|
|
|
|
+ return record["type"] or 'structure' # 默认为structure类型
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.error(f"获取资源类型失败: {str(e)}")
|
|
|
|
+ raise
|
|
|
|
+
|
|
|
|
+def execute_excel_loading(resource_id):
|
|
|
|
+ """
|
|
|
|
+ 执行Excel文件数据加载(原有的加载逻辑)
|
|
|
|
+
|
|
Args:
|
|
Args:
|
|
resource_id: 数据资源ID
|
|
resource_id: 数据资源ID
|
|
|
|
|
|
@@ -708,7 +765,7 @@ def execute_production_line(resource_id):
|
|
}
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
- logger.error(f"执行失败: {str(e)}", exc_info=True)
|
|
|
|
|
|
+ logger.error(f"执行Excel加载失败: {str(e)}", exc_info=True)
|
|
return {
|
|
return {
|
|
"status": "error",
|
|
"status": "error",
|
|
"message": str(e)
|
|
"message": str(e)
|
|
@@ -775,4 +832,297 @@ def extract_metadata_from_excel(file_path, table_name):
|
|
return metadata_list
|
|
return metadata_list
|
|
except Exception as e:
|
|
except Exception as e:
|
|
logger.error(f"从Excel文件提取元数据失败: {str(e)}")
|
|
logger.error(f"从Excel文件提取元数据失败: {str(e)}")
|
|
- return []
|
|
|
|
|
|
+ return []
|
|
|
|
+
|
|
|
|
+def execute_ddl_extraction(resource_id):
|
|
|
|
+ """
|
|
|
|
+ 执行DDL资源数据抽取
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ resource_id: 数据资源ID
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ dict: 执行结果
|
|
|
|
+ """
|
|
|
|
+ try:
|
|
|
|
+ from sqlalchemy import create_engine, text
|
|
|
|
+ import pandas as pd
|
|
|
|
+
|
|
|
|
+ logger.info(f"开始执行DDL资源数据抽取,ID: {resource_id}")
|
|
|
|
+
|
|
|
|
+ # 1. 获取资源详情
|
|
|
|
+ resource_data = get_resource_details(resource_id)
|
|
|
|
+ if not resource_data:
|
|
|
|
+ return {"status": "error", "message": f"资源不存在,ID: {resource_id}"}
|
|
|
|
+
|
|
|
|
+ # 2. 获取资源元数据
|
|
|
|
+ metadata_list = resource_data.get('meta_list', [])
|
|
|
|
+ if not metadata_list:
|
|
|
|
+ return {"status": "error", "message": "资源没有元数据信息,无法创建表"}
|
|
|
|
+
|
|
|
|
+ # 3. 获取资源表名
|
|
|
|
+ target_table_name = resource_data.get('en_name')
|
|
|
|
+ if not target_table_name:
|
|
|
|
+ return {"status": "error", "message": "资源没有英文名称,无法确定目标表名"}
|
|
|
|
+
|
|
|
|
+ # 4. 获取关联的数据源信息
|
|
|
|
+ data_source_info = get_resource_data_source(resource_id)
|
|
|
|
+ if not data_source_info:
|
|
|
|
+ return {"status": "error", "message": "无法获取关联的数据源信息"}
|
|
|
|
+
|
|
|
|
+ # 5. 在PostgreSQL中创建目标表
|
|
|
|
+ create_result = create_target_table(target_table_name, metadata_list)
|
|
|
|
+ if not create_result["success"]:
|
|
|
|
+ return {"status": "error", "message": f"创建目标表失败: {create_result['message']}"}
|
|
|
|
+
|
|
|
|
+ # 6. 执行数据抽取
|
|
|
|
+ extract_result = extract_data_to_postgres(data_source_info, target_table_name, metadata_list)
|
|
|
|
+
|
|
|
|
+ return {
|
|
|
|
+ "status": "success",
|
|
|
|
+ "message": f"数据抽取成功,从{extract_result['source_table']}表抽取到{extract_result['target_table']}表,共处理 {extract_result['total_records']} 条记录",
|
|
|
|
+ "total_records": extract_result['total_records'],
|
|
|
|
+ "source_table": extract_result['source_table'],
|
|
|
|
+ "target_table": extract_result['target_table']
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.error(f"DDL数据抽取失败: {str(e)}", exc_info=True)
|
|
|
|
+ return {
|
|
|
|
+ "status": "error",
|
|
|
|
+ "message": str(e)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+def get_resource_details(resource_id):
|
|
|
|
+ """
|
|
|
|
+ 获取资源详细信息
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ resource_id: 数据资源ID
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ dict: 资源详情
|
|
|
|
+ """
|
|
|
|
+ from app.core.data_resource.resource import handle_id_resource
|
|
|
|
+ return handle_id_resource(resource_id)
|
|
|
|
+
|
|
|
|
+def get_resource_data_source(resource_id):
|
|
|
|
+ """
|
|
|
|
+ 获取数据资源关联的数据源信息
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ resource_id: 数据资源ID
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ dict: 数据源连接信息
|
|
|
|
+ """
|
|
|
|
+ try:
|
|
|
|
+ with neo4j_driver.get_session() as session:
|
|
|
|
+ # 查询数据资源关联的数据源节点
|
|
|
|
+ cypher = """
|
|
|
|
+ MATCH (n:data_resource)-[:isbelongto]->(ds:data_source)
|
|
|
|
+ WHERE id(n) = $resource_id
|
|
|
|
+ RETURN ds
|
|
|
|
+ """
|
|
|
|
+ result = session.run(cypher, resource_id=int(resource_id))
|
|
|
|
+ record = result.single()
|
|
|
|
+
|
|
|
|
+ if not record:
|
|
|
|
+ logger.warning(f"资源 {resource_id} 没有关联的数据源节点")
|
|
|
|
+ return None
|
|
|
|
+
|
|
|
|
+ data_source = dict(record["ds"])
|
|
|
|
+
|
|
|
|
+ # 构建连接信息
|
|
|
|
+ connection_info = {
|
|
|
|
+ "type": data_source.get("type", "").lower(),
|
|
|
|
+ "host": data_source.get("host"),
|
|
|
|
+ "port": data_source.get("port"),
|
|
|
|
+ "database": data_source.get("database"),
|
|
|
|
+ "username": data_source.get("username"),
|
|
|
|
+ "password": data_source.get("password")
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ # 验证必要字段
|
|
|
|
+ if not all([connection_info["type"], connection_info["host"],
|
|
|
|
+ connection_info["database"], connection_info["username"]]):
|
|
|
|
+ logger.error(f"数据源信息不完整: {connection_info}")
|
|
|
|
+ return None
|
|
|
|
+
|
|
|
|
+ return connection_info
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.error(f"获取数据源信息失败: {str(e)}")
|
|
|
|
+ return None
|
|
|
|
+
|
|
|
|
+def create_target_table(table_name, metadata_list):
|
|
|
|
+ """
|
|
|
|
+ 在PostgreSQL中创建目标表
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ table_name: 表名
|
|
|
|
+ metadata_list: 元数据列表
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ dict: {"success": bool, "message": str}
|
|
|
|
+ """
|
|
|
|
+ try:
|
|
|
|
+ import psycopg2
|
|
|
|
+ from flask import current_app
|
|
|
|
+
|
|
|
|
+ # 获取PostgreSQL配置
|
|
|
|
+ pg_config = get_pg_config()
|
|
|
|
+
|
|
|
|
+ conn = psycopg2.connect(**pg_config)
|
|
|
|
+ cur = conn.cursor()
|
|
|
|
+
|
|
|
|
+ # 检查schema是否存在
|
|
|
|
+ cur.execute("CREATE SCHEMA IF NOT EXISTS ods;")
|
|
|
|
+
|
|
|
|
+ # 检查表是否存在
|
|
|
|
+ cur.execute("""
|
|
|
|
+ SELECT EXISTS (
|
|
|
|
+ SELECT FROM information_schema.tables
|
|
|
|
+ WHERE table_schema = 'ods'
|
|
|
|
+ AND table_name = %s
|
|
|
|
+ );
|
|
|
|
+ """, (table_name,))
|
|
|
|
+
|
|
|
|
+ table_exists = cur.fetchone()[0]
|
|
|
|
+
|
|
|
|
+ if table_exists:
|
|
|
|
+ logger.info(f"表 ods.{table_name} 已存在,将跳过创建")
|
|
|
|
+ return {"success": True, "message": f"表 ods.{table_name} 已存在"}
|
|
|
|
+
|
|
|
|
+ # 构建列定义
|
|
|
|
+ columns = []
|
|
|
|
+ for meta in metadata_list:
|
|
|
|
+ column_name = meta.get('en_name')
|
|
|
|
+ data_type = meta.get('data_type')
|
|
|
|
+
|
|
|
|
+ if column_name and data_type:
|
|
|
|
+ columns.append(f"{column_name} {data_type}")
|
|
|
|
+
|
|
|
|
+ if not columns:
|
|
|
|
+ return {"success": False, "message": "没有有效的列定义"}
|
|
|
|
+
|
|
|
|
+ # 构建建表SQL
|
|
|
|
+ sql = f"""
|
|
|
|
+ CREATE TABLE ods.{table_name} (
|
|
|
|
+ id SERIAL PRIMARY KEY,
|
|
|
|
+ {", ".join(columns)},
|
|
|
|
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
|
+ )
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ logger.info(f"创建表SQL: {sql}")
|
|
|
|
+ cur.execute(sql)
|
|
|
|
+ conn.commit()
|
|
|
|
+ logger.info(f"表 ods.{table_name} 创建成功")
|
|
|
|
+
|
|
|
|
+ return {"success": True, "message": f"表 ods.{table_name} 创建成功"}
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.error(f"创建目标表失败: {str(e)}")
|
|
|
|
+ if 'conn' in locals() and conn:
|
|
|
|
+ conn.rollback()
|
|
|
|
+ return {"success": False, "message": str(e)}
|
|
|
|
+ finally:
|
|
|
|
+ if 'cur' in locals() and cur:
|
|
|
|
+ cur.close()
|
|
|
|
+ if 'conn' in locals() and conn:
|
|
|
|
+ conn.close()
|
|
|
|
+
|
|
|
|
+def extract_data_to_postgres(source_conn_info, target_table, metadata_list):
|
|
|
|
+ """
|
|
|
|
+ 从源数据库抽取数据到PostgreSQL
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ source_conn_info: 源数据库连接信息
|
|
|
|
+ target_table: 目标表名
|
|
|
|
+ metadata_list: 元数据列表
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ dict: 抽取结果
|
|
|
|
+ """
|
|
|
|
+ try:
|
|
|
|
+ from sqlalchemy import create_engine, text
|
|
|
|
+ import pandas as pd
|
|
|
|
+ from flask import current_app
|
|
|
|
+
|
|
|
|
+ # 源表名称与目标表相同
|
|
|
|
+ source_table = target_table
|
|
|
|
+
|
|
|
|
+ # 批处理大小
|
|
|
|
+ batch_size = current_app.config.get('DATA_EXTRACT_BATCH_SIZE', 1000)
|
|
|
|
+
|
|
|
|
+ # 源数据库连接字符串构建
|
|
|
|
+ db_type = source_conn_info["type"]
|
|
|
|
+ if db_type == "mysql":
|
|
|
|
+ connection_string = f"mysql+pymysql://{source_conn_info['username']}:{source_conn_info['password']}@{source_conn_info['host']}:{source_conn_info['port']}/{source_conn_info['database']}"
|
|
|
|
+ elif db_type == "postgresql":
|
|
|
|
+ connection_string = f"postgresql://{source_conn_info['username']}:{source_conn_info['password']}@{source_conn_info['host']}:{source_conn_info['port']}/{source_conn_info['database']}"
|
|
|
|
+ else:
|
|
|
|
+ raise ValueError(f"不支持的数据库类型: {db_type}")
|
|
|
|
+
|
|
|
|
+ # 目标数据库连接参数
|
|
|
|
+ pg_config = get_pg_config()
|
|
|
|
+ target_connection_string = f"postgresql://{pg_config['user']}:{pg_config['password']}@{pg_config['host']}:{pg_config['port']}/{pg_config['dbname']}"
|
|
|
|
+
|
|
|
|
+ # 连接源数据库
|
|
|
|
+ source_engine = create_engine(connection_string)
|
|
|
|
+
|
|
|
|
+ # 连接目标数据库
|
|
|
|
+ target_engine = create_engine(target_connection_string)
|
|
|
|
+
|
|
|
|
+ # 获取元数据列名,构建查询字段列表
|
|
|
|
+ column_names = [meta.get('en_name') for meta in metadata_list if meta.get('en_name')]
|
|
|
|
+ if not column_names:
|
|
|
|
+ raise ValueError("没有有效的列名")
|
|
|
|
+
|
|
|
|
+ # 构建查询语句
|
|
|
|
+ select_columns = ", ".join(column_names)
|
|
|
|
+ query = f"SELECT {select_columns} FROM {source_table}"
|
|
|
|
+
|
|
|
|
+ # 获取记录总数
|
|
|
|
+ with source_engine.connect() as conn:
|
|
|
|
+ count_result = conn.execute(text(f"SELECT COUNT(*) FROM {source_table}"))
|
|
|
|
+ total_count = count_result.scalar()
|
|
|
|
+
|
|
|
|
+ # 分批抽取数据
|
|
|
|
+ total_records = 0
|
|
|
|
+ offset = 0
|
|
|
|
+
|
|
|
|
+ while offset < total_count:
|
|
|
|
+ # 构建带有分页的查询
|
|
|
|
+ paginated_query = f"{query} LIMIT {batch_size} OFFSET {offset}"
|
|
|
|
+
|
|
|
|
+ # 读取数据批次
|
|
|
|
+ df = pd.read_sql(paginated_query, source_engine)
|
|
|
|
+ batch_count = len(df)
|
|
|
|
+
|
|
|
|
+ if batch_count == 0:
|
|
|
|
+ break
|
|
|
|
+
|
|
|
|
+ # 写入目标数据库
|
|
|
|
+ df.to_sql(
|
|
|
|
+ target_table,
|
|
|
|
+ target_engine,
|
|
|
|
+ schema='ods',
|
|
|
|
+ if_exists='append',
|
|
|
|
+ index=False
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ total_records += batch_count
|
|
|
|
+ offset += batch_size
|
|
|
|
+
|
|
|
|
+ logger.info(f"已抽取 {total_records}/{total_count} 条记录")
|
|
|
|
+
|
|
|
|
+ return {
|
|
|
|
+ "total_records": total_records,
|
|
|
|
+ "source_table": source_table,
|
|
|
|
+ "target_table": f"ods.{target_table}"
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.error(f"数据抽取失败: {str(e)}")
|
|
|
|
+ raise
|