Ver Fonte

Prepare to modify the translate module.

wangxq há 1 mês atrás
pai
commit
85034e23ba

+ 4 - 1
.gitignore

@@ -56,4 +56,7 @@ htmlcov/
 .coverage.*
 .cache
 coverage.xml
-*.cover 
+*.cover
+
+# Test directories
+app/tests/ 

+ 63 - 0
CHANGELOG.md

@@ -0,0 +1,63 @@
+# 更新日志
+
+本文档记录DataOps平台的所有重要变更。
+
+## [未发布]
+
+### 新增
+- 创建了初始项目结构
+- 添加了README.md文件
+
+### 修改
+- 尚无
+
+### 修复
+- 尚无
+
+## [0.1.0] - 2024-03-20
+
+### 新增
+- 初始化Flask应用结构
+- 创建了基本API蓝图:
+  - 元数据管理 (meta_data)
+  - 数据资源管理 (data_resource)
+  - 数据模型管理 (data_model)
+  - 数据接口管理 (data_interface)
+  - 数据指标管理 (data_metric)
+  - 生产线流程管理 (production_line)
+  - 图形化数据 (graph)
+  - 系统管理 (system)
+- 配置了日志系统
+- 配置了CORS支持
+- 添加了SQLAlchemy数据库连接
+
+### 修改
+- 优化了API路由结构,使用更简洁的前缀
+
+### 修复
+- 尚无
+
+## 格式说明
+
+变更记录格式如下:
+
+```
+## [版本号] - 发布日期
+
+### 新增
+- 新增的功能或文件
+
+### 修改
+- 修改的功能或文件
+
+### 修复
+- 修复的问题或缺陷
+
+### 移除
+- 移除的功能或文件
+```
+
+记录时请注明具体的修改内容,例如:
+- 修改了`app/services/data_service.py`中的`process_data`函数,增加了数据验证
+- 添加了`app/utils/validators.py`文件,实现数据验证功能
+- 修复了数据导入时的编码问题 

+ 43 - 3
app/api/production_line/routes.py

@@ -1,10 +1,14 @@
-from flask import request
+import json
+import logging
+from flask import request, jsonify
 from app.api.production_line import bp
 from app.models.result import success, failed
 from app.api.graph.routes import MyEncoder, connect_graph
 from app.core.production_line import production_draw_graph
-import json
+from app.core.production_line.production_line import execute_production_line
+
 
+logger = logging.getLogger(__name__)
 
 # 生产线列表
 @bp.route('/production/line/list', methods=['POST'])
@@ -90,4 +94,40 @@ def production_line_graph():
 
     except Exception as e:
         res = failed({}, {"error": f"{e}"})
-        return json.dumps(res, ensure_ascii=False, cls=MyEncoder) 
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+
+"""
+Manual execution API endpoint
+Author: paul
+Date: 2024-03-20
+"""
+
+@bp.route('/production/line/execute', methods=['POST'])
+def production_line_execute():
+    """
+    手动执行数据资源加载
+    
+    Args (通过JSON请求体):
+        id (int): 数据资源ID
+        
+    Returns:
+        JSON: 执行结果
+    """
+    try:
+        # 获取资源ID
+        resource_id = request.json.get('id')
+        if resource_id is None:  # 修改检查逻辑,只有当ID为None时才报错
+            return jsonify(failed("资源ID不能为空"))
+            
+        # 执行加载
+        result = execute_production_line(resource_id)
+        
+        if result['status'] == 'success':
+            return jsonify(success(result))
+        else:
+            return jsonify(failed(result['message']))
+            
+    except Exception as e:
+        logger.error(f"执行数据资源加载失败: {str(e)}")
+        return jsonify(failed(str(e))) 

+ 16 - 0
app/config/config.py

@@ -93,6 +93,22 @@ class Config:
     NEO4J_ENCRYPTED = False  # 内网环境可关闭加密
     """
 
+      # File paths
+    if PLATFORM == 'windows':
+        FILE_PATH = os.environ.get('FILE_PATH') or 'C:/temp/'
+    elif PLATFORM == 'linux':
+        FILE_PATH = os.environ.get('FILE_PATH') or '/tmp/'
+    
+
+    # 数据资源,Windows 和 Linux 的文件上传和归档根路径配置
+    # 例如,当excle文件完成加载后,会被自动移动到归档路径。
+    if PLATFORM == 'windows':       
+        UPLOAD_BASE_PATH = 'C:\\tmp\\upload'
+        ARCHIVE_BASE_PATH = 'C:\\tmp\\archive'
+    elif PLATFORM == 'linux':
+        UPLOAD_BASE_PATH = '/data/upload'
+        ARCHIVE_BASE_PATH = '/data/archive'
+
 class DevelopmentConfig(Config):
     """Development configuration"""
     DEBUG = True

+ 344 - 1
app/core/production_line/production_line.py

@@ -1,4 +1,13 @@
 from app.core.graph.graph_operations import connect_graph
+from flask import current_app
+import os
+import pandas as pd
+from datetime import datetime
+import psycopg2
+from psycopg2 import sql
+import logging
+from app.services.neo4j_driver import neo4j_driver
+import shutil
 
 def production_draw_graph(id, type):
     """
@@ -95,4 +104,338 @@ def production_draw_graph(id, type):
                 "rootId": item['res'],
             }
 
-        return res 
+        return res 
+
+"""
+Manual execution functions for production line
+Author: paul
+Date: 2024-03-20
+"""
+
+# 配置日志
+logger = logging.getLogger(__name__)
+
+# PostgreSQL配置
+def get_pg_config():
+    """从配置文件获取PostgreSQL配置"""
+    db_uri = current_app.config['SQLALCHEMY_DATABASE_URI']
+    # 解析数据库URI: postgresql://postgres:postgres@192.168.67.138:5432/dataops
+    parts = db_uri.replace('postgresql://', '').split('@')
+    user_pass = parts[0].split(':')
+    host_port_db = parts[1].split('/')
+    host_port = host_port_db[0].split(':')
+    
+    return {
+        'dbname': host_port_db[1],
+        'user': user_pass[0],
+        'password': user_pass[1],
+        'host': host_port[0],
+        'port': host_port[1]
+    }
+
+def get_resource_storage_info(resource_id):
+    """
+    获取数据资源的存储位置和元数据信息
+    
+    Returns:
+        tuple: (storage_location, table_name, db_table_name, metadata_list)
+        - storage_location: 存储位置
+        - table_name: 用于查找Excel文件的名称(优先使用中文名)
+        - db_table_name: 数据库表名(使用英文名)
+        - metadata_list: 元数据列表
+    """
+    try:
+        with neo4j_driver.get_session() as session:
+            # 修改查询,增加name(中文名)的获取
+            resource_query = """
+            MATCH (n:data_resource)
+            WHERE id(n) = $resource_id
+            RETURN n.storage_location as storage_location, 
+                   n.name as name,
+                   n.en_name as en_name
+            """
+            result = session.run(resource_query, resource_id=int(resource_id))
+            resource_data = result.single()
+            
+            if not resource_data or not resource_data['storage_location']:
+                raise ValueError("存储位置未配置")
+                
+            # 查询元数据节点,同时获取中文名和英文名
+            metadata_query = """
+            MATCH (n:data_resource)-[:contain]->(m:Metadata)
+            WHERE id(n) = $resource_id
+            RETURN m.name as name, m.en_name as en_name, m.type as type
+            """
+            result = session.run(metadata_query, resource_id=int(resource_id))
+            metadata_list = [dict(record) for record in result]
+            
+            # 用于查找Excel文件的名称(优先使用中文名)
+            table_name = resource_data['name'] if resource_data['name'] else resource_data['en_name']
+            # 数据库表名(使用英文名)
+            db_table_name = resource_data['en_name']
+            
+            if not db_table_name:
+                raise ValueError("数据资源的英文名不能为空")
+            
+            return resource_data['storage_location'], table_name, db_table_name, metadata_list
+    except Exception as e:
+        logger.error(f"获取资源存储信息失败: {str(e)}")
+        raise
+
+def check_and_create_table(table_name, metadata_list):
+    """
+    检查并创建PostgreSQL表
+    
+    Args:
+        table_name: 表名
+        metadata_list: 元数据列表
+    """
+    try:
+        conn = psycopg2.connect(**get_pg_config())
+        cur = conn.cursor()
+        
+        # 检查schema是否存在
+        cur.execute("CREATE SCHEMA IF NOT EXISTS ods;")
+        
+        # 检查表是否存在
+        cur.execute("""
+            SELECT EXISTS (
+                SELECT FROM information_schema.tables 
+                WHERE table_schema = 'ods' 
+                AND table_name = %s
+            );
+        """, (table_name,))
+        
+        table_exists = cur.fetchone()[0]
+        
+        if not table_exists:
+            # 构建建表SQL
+            columns = [
+                f"{meta['en_name']} {meta['type']}"
+                for meta in metadata_list
+            ]
+            columns.append("insert_dt timestamp")
+            
+            create_table_sql = sql.SQL("""
+                CREATE TABLE ods.{} (
+                    {}
+                );
+            """).format(
+                sql.Identifier(table_name),
+                sql.SQL(', ').join(map(sql.SQL, columns))
+            )
+            
+            cur.execute(create_table_sql)
+            logger.info(f"表 ods.{table_name} 创建成功")
+            
+        conn.commit()
+    except Exception as e:
+        logger.error(f"检查创建表失败: {str(e)}")
+        raise
+    finally:
+        cur.close()
+        conn.close()
+
+def load_excel_to_postgresql(file_path, table_name, metadata_list):
+    """
+    加载Excel数据到PostgreSQL
+    """
+    conn = None
+    cur = None
+    try:
+        # 读取Excel文件
+        df = pd.read_excel(file_path)
+        
+        # 构建字段映射关系(中文名到英文名的映射)
+        field_mapping = {}
+        for meta in metadata_list:
+            # 优先使用中文名作为Excel中的列名
+            excel_column = meta['name'] if meta['name'] else meta['en_name']
+            field_mapping[excel_column] = meta['en_name']
+        
+        # 验证列名
+        excel_columns = set(df.columns)
+        expected_columns = {(meta['name'] if meta['name'] else meta['en_name']) for meta in metadata_list}
+        
+        if not expected_columns.issubset(excel_columns):
+            missing_columns = expected_columns - excel_columns
+            raise ValueError(f"Excel文件缺少必要的列: {missing_columns}")
+        
+        # 重命名列(从中文名到英文名)
+        df = df.rename(columns=field_mapping)
+        
+        # 添加insert_dt列
+        df['insert_dt'] = datetime.now()
+        
+        # 连接数据库
+        conn = psycopg2.connect(**get_pg_config())
+        cur = conn.cursor()
+        
+        # 构建INSERT语句
+        columns = [meta['en_name'] for meta in metadata_list] + ['insert_dt']
+        insert_sql = sql.SQL("""
+            INSERT INTO ods.{} ({})
+            VALUES ({})
+        """).format(
+            sql.Identifier(table_name),
+            sql.SQL(', ').join(map(sql.Identifier, columns)),
+            sql.SQL(', ').join(sql.Placeholder() * len(columns))
+        )
+        
+        # 批量插入数据
+        records = df[columns].values.tolist()
+        cur.executemany(insert_sql, records)
+        
+        conn.commit()
+        return len(records)
+    except Exception as e:
+        if conn:
+            conn.rollback()
+        logger.error(f"加载数据失败: {str(e)}")
+        raise
+    finally:
+        if cur:
+            cur.close()
+        if conn:
+            conn.close()
+
+def get_full_storage_path(relative_path):
+    """
+    根据相对路径获取完整的存储路径
+    
+    Args:
+        relative_path: Neo4j中存储的相对路径
+        
+    Returns:
+        str: 完整的存储路径
+    """
+    base_path = current_app.config['UPLOAD_BASE_PATH']
+    # 移除路径开头的斜杠(如果有)
+    relative_path = relative_path.lstrip('\\/')
+    # 根据操作系统使用正确的路径分隔符
+    if os.name == 'nt':  # Windows
+        full_path = os.path.join(base_path, relative_path.replace('/', '\\'))
+    else:  # Linux/Unix
+        full_path = os.path.join(base_path, relative_path.replace('\\', '/'))
+    return os.path.normpath(full_path)
+
+def get_archive_path():
+    """
+    获取当前日期的归档路径
+    
+    Returns:
+        str: 归档路径
+    """
+    base_path = current_app.config['ARCHIVE_BASE_PATH']
+    date_folder = datetime.now().strftime('%Y-%m-%d')
+    archive_path = os.path.join(base_path, date_folder)
+    
+    # 确保归档目录存在
+    os.makedirs(archive_path, exist_ok=True)
+    return archive_path
+
+def archive_excel_file(file_path):
+    """
+    将Excel文件移动到归档目录
+    
+    Args:
+        file_path: Excel文件的完整路径
+        
+    Returns:
+        str: 归档后的文件路径
+    """
+    archive_path = get_archive_path()
+    file_name = os.path.basename(file_path)
+    archive_file_path = os.path.join(archive_path, file_name)
+    
+    # 如果目标文件已存在,添加时间戳
+    if os.path.exists(archive_file_path):
+        name, ext = os.path.splitext(file_name)
+        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
+        archive_file_path = os.path.join(archive_path, f"{name}_{timestamp}{ext}")
+    
+    shutil.move(file_path, archive_file_path)
+    logger.info(f"文件已归档: {archive_file_path}")
+    return archive_file_path
+
+def execute_production_line(resource_id):
+    """
+    执行生产线数据加载
+    
+    Args:
+        resource_id: 数据资源ID
+        
+    Returns:
+        dict: 执行结果
+    """
+    try:
+        # 获取PostgreSQL配置
+        pg_config = get_pg_config()
+        
+        # 1. 获取存储信息
+        storage_location, table_name, db_table_name, metadata_list = get_resource_storage_info(resource_id)
+        
+        # 2. 检查并创建表
+        check_and_create_table(db_table_name, metadata_list)
+        
+        # 3. 获取完整的存储路径并扫描Excel文件
+        full_storage_path = get_full_storage_path(storage_location)
+        
+        if not os.path.exists(full_storage_path):
+            # 如果目录不存在,创建它
+            try:
+                os.makedirs(full_storage_path, exist_ok=True)
+                logger.info(f"创建目录: {full_storage_path}")
+            except Exception as e:
+                raise ValueError(f"无法创建存储路径: {full_storage_path}, 错误: {str(e)}")
+            
+        excel_files = [
+            f for f in os.listdir(full_storage_path)
+            if f.startswith(table_name) and f.endswith(('.xlsx', '.xls'))
+        ]
+        
+        if not excel_files:
+            error_msg = (
+                f"未找到匹配的Excel文件: {table_name}*.xlsx/xls\n"
+                f"搜索路径: {full_storage_path}\n"
+                f"请确认文件已上传到正确位置,且文件名以'{table_name}'开头"
+            )
+            logger.error(error_msg)
+            raise ValueError(error_msg)
+        
+        # 4. 加载数据并归档文件
+        total_records = 0
+        processed_files = []
+        archived_files = []
+        
+        for excel_file in excel_files:
+            file_path = os.path.join(full_storage_path, excel_file)
+            try:
+                # 加载数据到PostgreSQL
+                records = load_excel_to_postgresql(file_path, db_table_name, metadata_list)
+                total_records += records
+                processed_files.append(excel_file)
+                
+                # 归档成功处理的文件
+                archived_path = archive_excel_file(file_path)
+                archived_files.append(archived_path)
+                
+                logger.info(f"已处理并归档文件 {excel_file}, 加载 {records} 条记录")
+            except Exception as e:
+                logger.error(f"处理文件 {excel_file} 失败: {str(e)}")
+                raise
+            
+        return {
+            "status": "success",
+            "message": f"数据加载成功,共处理 {len(processed_files)} 个文件,加载 {total_records} 条记录",
+            "total_records": total_records,
+            "files_processed": processed_files,
+            "files_archived": archived_files
+        }
+        
+    except Exception as e:
+        logger.error(f"执行失败: {str(e)}")
+        return {
+            "status": "error",
+            "message": str(e)
+        } 

+ 1 - 0
application.py

@@ -1,5 +1,6 @@
 from app import create_app
 from app.config.config import Config, DevelopmentConfig  # 新增配置导入
+from app.core.graph.graph_operations import MyEncoder
 
 app = create_app(DevelopmentConfig)