from io import BytesIO, StringIO import os import pandas as pd from flask import request, jsonify, send_file, current_app from app.api.data_resource import bp from app.models.result import success, failed import logging import json import re from minio import Minio from app.core.graph.graph_operations import MyEncoder from app.services.neo4j_driver import neo4j_driver from app.core.data_resource.resource import ( resource_list, handle_node, resource_kinship_graph, resource_impact_all_graph, model_resource_list, select_create_ddl, data_resource_edit, handle_id_resource, id_data_search_list, table_sql, select_sql, id_resource_graph, status_query ) from app.core.meta_data import ( translate_and_parse, infer_column_type, text_resource_solve, get_file_content, get_formatted_time ) import traceback from app.core.system.auth import require_auth from app.core.llm.ddl_parser import DDLParser logger = logging.getLogger("app") def get_minio_client(): """获取 MinIO 客户端实例""" return Minio( current_app.config['MINIO_HOST'], access_key=current_app.config['MINIO_USER'], secret_key=current_app.config['MINIO_PASSWORD'], secure=current_app.config['MINIO_SECURE'] ) def get_minio_config(): """获取 MinIO 配置""" return { 'bucket_name': current_app.config['BUCKET_NAME'], 'prefix': current_app.config['PREFIX'], 'allowed_extensions': current_app.config['ALLOWED_EXTENSIONS'] } def is_english(text): """检查文本是否为英文""" return text.isascii() and bool(re.match(r'^[a-zA-Z0-9_\s.,;:!?()\'"-]+$', text)) @bp.route('/translate', methods=['POST']) def data_resource_translate(): # 获取表单数据 data_resource = request.form.get('data_resource') meta_data = request.form.get('meta_data') file = request.files.get('file') if not data_resource or not file: return jsonify(failed("缺少必要参数:data_resource 或文件")) # 处理meta_data可能为None的情况 if meta_data: try: # 修复JSON解析问题,处理可能包含特殊引号的情况 # 替换可能存在的特殊引号字符 meta_data = meta_data.replace('â', '"').replace('"', '"').replace('"', '"') meta_data_list = json.loads(meta_data) except json.JSONDecodeError as e: logger.error(f"解析meta_data失败: {meta_data}, 错误: {str(e)}") # 尝试进行基本的字符串解析,以处理简单的数组格式 if meta_data.startswith('[') and meta_data.endswith(']'): try: # 使用ast.literal_eval作为备用解析方法 import ast meta_data_list = ast.literal_eval(meta_data) except Exception: # 如果仍然失败,使用简单的字符串分割 meta_data = meta_data.strip('[]') meta_data_list = [item.strip('"\'') for item in meta_data.split(',')] else: meta_data_list = [] else: logger.warning("meta_data为空,将使用空列表") meta_data_list = [] # 构建翻译后的内容组合 translated_meta_data_list = [] for meta_item in meta_data_list: if is_english(meta_item): # 检查是否为英文 translated_meta_data_list.append(meta_item) # 如果是英文,则直接添加 else: translated_meta_data_list.append(translate_and_parse(meta_item)[0]) # 否则翻译后添加 # 对 data_resource 进行翻译 translated_data_resource = translate_and_parse(data_resource) if translated_data_resource and len(translated_data_resource) > 0: translated_data_resource = translated_data_resource[0] else: translated_data_resource = data_resource # 翻译失败时使用原值 try: # 构建最终的翻译结果 resource = {"name": data_resource, "en_name": translated_data_resource} parsed_data = [] # 读取文件内容 file_content = file.read() # 重置文件指针 file.seek(0) try: df = pd.read_excel(BytesIO(file_content)) except Exception as e: return jsonify(failed(f"文件格式错误: {str(e)}")) # 获取列名和对应的数据类型 # 如果meta_data为空,使用DataFrame的列名 if not meta_data_list and not df.empty: meta_data_list = df.columns.tolist() translated_meta_data_list = [] for col in meta_data_list: if is_english(col): translated_meta_data_list.append(col) else: translated_meta_data_list.append(translate_and_parse(col)[0]) columns_and_types = infer_column_type(df) for i in range(len(meta_data_list)): zh = meta_data_list[i] en = translated_meta_data_list[i] data_type = columns_and_types[i] if i < len(columns_and_types) else "varchar(255)" parsed_item = {"name": zh, "en_name": en, "data_type": data_type} parsed_data.append(parsed_item) response_data = { "head_data": parsed_data, "data_resource": resource } return jsonify(success(response_data, "success")) except Exception as e: logger.error(f"翻译处理失败: {str(e)}", exc_info=True) return jsonify(failed({}, str(e))) @bp.route('/save', methods=['POST']) def data_resource_save(): """保存数据资源""" try: # 获取表单数据 # 表单以 receiver 开头时使用下面的方法: # receiver = request.json.get('receiver', {}) receiver = request.get_json() # 检查receiver是否存在 if not receiver: return jsonify(failed("参数不完整:缺少receiver")) # 检查url是否存在 if 'url' not in receiver: return jsonify(failed("参数不完整:缺少url")) additional_info = receiver['additional_info'] if not additional_info: return jsonify(failed("参数不完整: 缺少additional_info")) file_extension = receiver['url'].split('.')[-1] head_data = additional_info['head_data'] if file_extension == 'xlsx' or file_extension == 'xls' or file_extension == 'csv': # 如果文件是excel或csv,则需要检查storage_location是否存在 storage_location = receiver.get('storage_location', '') if not storage_location: return jsonify(failed("参数不完整:缺少storage_location或storage_location为空")) # 调用业务逻辑处理数据资源创建,设置resource_type为structure resource_id = handle_node(receiver, head_data, data_source=None, resource_type='structure') elif file_extension == 'sql': data_source = additional_info['data_source'] # 如果是ddl,则需要检查data_source是否存在 if not data_source or (isinstance(data_source, dict) and not data_source.get("en_name")): return jsonify(failed("数据源信息不完整或无效")) # 调用业务逻辑处理数据资源创建,设置resource_type为ddl resource_id = handle_node(receiver, head_data, data_source, resource_type='ddl') else: return jsonify(failed("文件格式错误")) return jsonify(success({"id": resource_id})) except Exception as e: logger.error(f"保存数据资源失败: {str(e)}") error_traceback = traceback.format_exc() logger.error(f"错误详情: {error_traceback}") return jsonify(failed(str(e))) @bp.route('/delete', methods=['POST']) def data_resource_delete(): """删除数据资源""" try: # 获取资源ID resource_id = request.json.get('id') if not resource_id: return jsonify(failed("资源ID不能为空")) with neo4j_driver.get_session() as session: # 删除数据资源节点及其关系 cypher = """ MATCH (n:data_resource) WHERE id(n) = $resource_id DETACH DELETE n """ session.run(cypher, resource_id=int(resource_id)) return jsonify(success({"message": "数据资源删除成功"})) except Exception as e: logger.error(f"删除数据资源失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/update', methods=['POST']) def data_resource_update(): """更新数据资源""" try: # 获取更新数据 data = request.json if not data or "id" not in data: return jsonify(failed("参数不完整")) # 调用业务逻辑更新数据资源 updated_data = data_resource_edit(data) return jsonify(success(updated_data)) except Exception as e: logger.error(f"更新数据资源失败: {str(e)}") return jsonify(failed(str(e))) # 解析ddl,使用正则表达式匹配,但没有进行翻译,也没有对注释进行识别 # 使用ddl创建数据资源时,调用该API @bp.route('/ddl', methods=['POST']) def id_data_ddl(): """解析数据资源的DDL""" try: # 获取SQL内容 sql_content = request.json.get('sql', '') if not sql_content: return jsonify(failed("SQL内容不能为空")) # 记录原始SQL用于调试 logger.debug(f"原始SQL: {sql_content}") # 提取创建表的DDL语句 create_ddl_list = select_create_ddl(sql_content) if not create_ddl_list: return jsonify(failed("未找到有效的CREATE TABLE语句")) # 解析每个表定义 tables_dict = {} # 最终返回的表字典 for ddl in create_ddl_list: table_info = table_sql(ddl) if table_info: # table_info格式: {"table_name": {"exist": bool, "meta": [...], "table_comment": "..."}} # 合并到结果字典中 tables_dict.update(table_info) if not tables_dict: return jsonify(failed("解析表结构失败")) # 记录结果 logger.debug(f"解析结果: {json.dumps(tables_dict, ensure_ascii=False)}") # 直接返回解析结果 return jsonify(success(tables_dict)) except Exception as e: logger.error(f"解析DDL失败: {str(e)}") logger.error(traceback.format_exc()) # 添加详细错误堆栈 return jsonify(failed(str(e))) @bp.route('/list', methods=['POST']) def data_resource_list(): """获取数据资源列表""" try: # 获取分页和筛选参数 page = int(request.json.get('current', 1)) page_size = int(request.json.get('size', 10)) en_name_filter = request.json.get('en_name') name_filter = request.json.get('name') type_filter = request.json.get('type', 'all') category_filter = request.json.get('category') tag_filter = request.json.get('tag') # 调用业务逻辑查询数据资源列表 resources, total_count = resource_list( page, page_size, en_name_filter, name_filter, type_filter, category_filter, tag_filter ) # 返回结果 return jsonify(success({ "records": resources, "total": total_count, "size": page_size, "current": page })) except Exception as e: logger.error(f"获取数据资源列表失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/search', methods=['POST']) def id_data_search(): """数据资源关联元数据搜索""" try: # 获取分页和筛选参数 page = int(request.json.get('current', 1)) page_size = int(request.json.get('size', 10)) resource_id = request.json.get('id') en_name_filter = request.json.get('en_name') name_filter = request.json.get('name') category_filter = request.json.get('category') tag_filter = request.json.get('tag') if resource_id is None: return jsonify(failed("资源ID不能为空")) # 确保传入的ID为整数 try: resource_id = int(resource_id) except (ValueError, TypeError): return jsonify(failed(f"资源ID必须为整数, 收到的是: {resource_id}")) # 记录请求信息 logger.info(f"获取资源关联元数据请求,ID: {resource_id}") # 调用业务逻辑查询关联元数据 metadata_list, total_count = id_data_search_list( resource_id, page, page_size, en_name_filter, name_filter, category_filter, tag_filter ) # 返回结果 return jsonify(success({ "records": metadata_list, "total": total_count, "size": page_size, "current": page })) except Exception as e: logger.error(f"数据资源关联元数据搜索失败: {str(e)}") return jsonify(failed(str(e))) def dynamic_type_conversion(value, target_type): """动态类型转换""" if value is None: return None if target_type == "int" or target_type == "INT": return int(value) elif target_type == "float" or target_type == "FLOAT" or target_type == "double" or target_type == "DOUBLE": return float(value) elif target_type == "bool" or target_type == "BOOL" or target_type == "boolean" or target_type == "BOOLEAN": if isinstance(value, str): return value.lower() in ('true', 'yes', '1', 't', 'y') return bool(value) else: return str(value) @bp.route('/graph/all', methods=['POST']) def data_resource_graph_all(): """获取数据资源完整图谱""" try: # 获取参数 resource_id = request.json.get('id') meta = request.json.get('meta', True) if resource_id is None: return jsonify(failed("资源ID不能为空")) # 确保传入的ID为整数 try: resource_id = int(resource_id) except (ValueError, TypeError): return jsonify(failed(f"资源ID必须为整数, 收到的是: {resource_id}")) # 调用业务逻辑获取完整图谱 graph_data = resource_impact_all_graph(resource_id, meta) return jsonify(success(graph_data)) except Exception as e: logger.error(f"获取数据资源完整图谱失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/graph', methods=['POST']) def data_resource_list_graph(): """获取数据资源亲缘关系图谱""" try: # 获取参数 resource_id = request.json.get('id') meta = request.json.get('meta', True) if resource_id is None: return jsonify(failed("资源ID不能为空")) # 确保传入的ID为整数 try: resource_id = int(resource_id) except (ValueError, TypeError): return jsonify(failed(f"资源ID必须为整数, 收到的是: {resource_id}")) # 记录请求信息 logger.info(f"获取图谱请求,ID: {resource_id}") # 调用业务逻辑获取图谱 graph_data = resource_kinship_graph(resource_id, meta) return jsonify(success(graph_data)) except Exception as e: logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/save/metadata', methods=['POST']) def id_data_save(): """保存数据资源关联的元数据""" try: # 获取参数 resource_id = request.json.get('id') metadata_list = request.json.get('data', []) if not resource_id: return jsonify(failed("资源ID不能为空")) if not metadata_list: return jsonify(failed("元数据列表不能为空")) # 处理元数据保存 with neo4j_driver.get_session() as session: # 获取数据资源名称 resource_query = """ MATCH (n:data_resource) WHERE id(n) = $resource_id RETURN n.name as resource_name """ resource_result = session.run(resource_query, resource_id=int(resource_id)) resource_record = resource_result.single() if not resource_record: return jsonify(failed(f"未找到ID为{resource_id}的数据资源")) resource_name = resource_record["resource_name"] # 先删除现有关系 cypher_delete = """ MATCH (n:data_resource)-[r:contain]->() WHERE id(n) = $resource_id DELETE r """ session.run(cypher_delete, resource_id=int(resource_id)) # 添加新关系 for meta in metadata_list: # 创建元数据节点 meta_cypher = """ MERGE (m:meta_data {name: $name}) ON CREATE SET m.en_name = $en_name, m.createTime = $create_time, m.data_type = $type ON MATCH SET m.data_type = $type RETURN m """ create_time = get_formatted_time() meta_result = session.run( meta_cypher, name=meta["name"], en_name=meta["en_name"], create_time=create_time, type=meta["data_type"] ) meta_node = meta_result.single()["m"] meta_id = meta_node.id # 打印节点ID信息,便于调试 logger.info(f"元数据节点ID: {meta_id}, 类型: {type(meta_id)}") logger.info(f"数据资源节点ID: {resource_id}, 类型: {type(resource_id)}") # 使用明确的属性名匹配而不是ID rel_cypher = """ MATCH (a:data_resource {name: $r_name}), (m:meta_data {name: $m_name}) MERGE (a)-[r:contain]->(m) RETURN r """ rel_result = session.run( rel_cypher, r_name=resource_name, m_name=meta["name"] ) # 检查关系是否创建成功 if rel_result.single(): logger.info(f"成功创建关系: {resource_name} -> {meta['name']}") else: logger.warning(f"关系创建结果为空") # 额外验证关系是否创建 verify_cypher = """ MATCH (a:data_resource {name: $r_name})-[r:contain]->(m:meta_data {name: $m_name}) RETURN count(r) as rel_count """ verify_result = session.run( verify_cypher, r_name=resource_name, m_name=meta["name"] ) count = verify_result.single()["rel_count"] logger.info(f"验证关系数量: {count}") return jsonify(success({"message": "元数据保存成功"})) except Exception as e: logger.error(f"保存数据资源关联的元数据失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/sql/test', methods=['POST']) def sql_test(): """测试SQL查询""" try: # 获取参数 sql_query = request.json.get('sql', '') if not sql_query: return jsonify(failed("SQL查询不能为空")) # 解析SQL parsed_sql = select_sql(sql_query) if not parsed_sql: return jsonify(failed("解析SQL失败")) # 返回解析结果 return jsonify(success(parsed_sql)) except Exception as e: logger.error(f"测试SQL查询失败: {str(e)}") return jsonify(failed(str(e))) # 使用LLM识别DDL语句,用来代替原来的正则的方式 # 用于在数据资源创建时,识别DDL语句 /api/resource/ddl/parse @bp.route('/ddl/parse', methods=['POST']) def ddl_identify(): """识别DDL语句""" try: # 获取参数 - 支持两种方式:上传文件或JSON sql_content = '' # 检查是否有文件上传 if 'file' in request.files: file = request.files['file'] # 检查文件是否存在且文件名不为空 if file and file.filename: # 检查是否是SQL文件 if not file.filename.lower().endswith('.sql'): return jsonify(failed("只接受SQL文件")) # 读取文件内容 sql_content = file.read().decode('utf-8') logger.info(f"从上传的文件中读取SQL内容,文件名: {file.filename}") # 如果没有文件上传,检查是否有JSON输入 elif request.is_json: sql_content = request.json.get('sql', '') # 如果两种方式都没有提供SQL内容,则返回错误 if not sql_content: return jsonify(failed("SQL内容不能为空,请上传SQL文件或提供SQL内容")) parser = DDLParser() # 提取创建表的DDL语句 ddl_list = parser.parse_ddl(sql_content) if not ddl_list: return jsonify(failed("未找到有效的CREATE TABLE语句")) # 处理结果 - 假设ddl_list已经包含tables结构 result = {} data_source = None # 处理数据源和表的存在状态 if isinstance(ddl_list, dict): # 处理数据源信息 if "data_source" in ddl_list: data_source = ddl_list.pop("data_source", None) if data_source: # 检查数据源是否包含en_name if "en_name" not in data_source: logger.debug(f"data_source内容: {json.dumps(data_source, ensure_ascii=False) if data_source is not None else 'None'}") return jsonify(failed("数据源信息不完整:缺少en_name字段")) try: # 查询数据源是否存在 data_source_name = data_source["en_name"] with neo4j_driver.get_session() as session: source_query = """ MATCH (n:data_source {en_name: $name}) RETURN n IS NOT NULL AS exists """ source_result = session.run(source_query, name=data_source_name) source_exists = source_result.single() if source_exists: data_source["exist"] = source_exists["exists"] else: data_source["exist"] = False except Exception as e: logger.error(f"检查数据源存在状态失败: {str(e)}") data_source["exist"] = False # 处理表的存在状态 - 假设tables已经在ddl_list中 if "tables" in ddl_list and isinstance(ddl_list["tables"], dict): table_names = list(ddl_list["tables"].keys()) if table_names: try: # 查询表是否存在 with neo4j_driver.get_session() as session: table_query = """ UNWIND $names AS name OPTIONAL MATCH (n:data_resource {en_name: name}) RETURN name, n IS NOT NULL AS exists """ table_results = session.run(table_query, names=table_names) # 处理结果 for record in table_results: table_name = record["name"] exists = record["exists"] if table_name in ddl_list["tables"]: ddl_list["tables"][table_name]["exist"] = exists # 确保所有表都有exist字段 for table_name in table_names: if "exist" not in ddl_list["tables"][table_name]: ddl_list["tables"][table_name]["exist"] = False except Exception as e: logger.error(f"检查表存在状态失败: {str(e)}") # 如果查询失败,所有表默认为不存在 for table_name in table_names: ddl_list["tables"][table_name]["exist"] = False # 构建最终结果 result = ddl_list # 添加数据源信息 if data_source: result["data_source"] = data_source logger.debug(f"识别到的DDL语句: {result}") return jsonify(success(result)) except Exception as e: logger.error(f"识别DDL语句失败: {str(e)}") logger.error(traceback.format_exc()) # 添加详细错误堆栈 return jsonify(failed(str(e))) # 废弃的识别DDL语句方法,该API 与 ddl API 功能类似,但功能简化了 @bp.route('/ddl/identify', methods=['POST']) def sql_ddl_identify(): """识别DDL语句""" try: # 获取参数 sql_content = request.json.get('sql', '') if not sql_content: return jsonify(failed("SQL内容不能为空")) # 提取创建表的DDL语句 create_ddl_list = select_create_ddl(sql_content) if not create_ddl_list: return jsonify(failed("未找到有效的CREATE TABLE语句")) return jsonify(success({"count": len(create_ddl_list)})) except Exception as e: logger.error(f"识别DDL语句失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/model/list', methods=['POST']) def resource_model_list(): """获取模型资源列表""" try: # 获取分页和筛选参数 page = int(request.json.get('current', 1)) page_size = int(request.json.get('size', 10)) name_filter = request.json.get('name') # 调用业务逻辑查询模型资源列表 resources, total_count = model_resource_list(page, page_size, name_filter) # 返回结果 return jsonify(success({ "records": resources, "total": total_count, "size": page_size, "current": page })) except Exception as e: logger.error(f"获取模型资源列表失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/detail', methods=['POST']) def data_resource_detail(): """获取数据资源详情""" try: # 获取资源ID resource_id = request.json.get('id') if resource_id is None: return jsonify(failed("资源ID不能为空")) # 确保传入的ID为整数 try: resource_id = int(resource_id) except (ValueError, TypeError): return jsonify(failed(f"资源ID必须为整数, 收到的是: {resource_id}")) # 记录请求信息 logger.info(f"获取资源详情请求,ID: {resource_id}") # 调用业务逻辑查询数据资源详情 resource_data = handle_id_resource(resource_id) if not resource_data: logger.error(f"资源不存在,ID: {resource_id}") return jsonify(failed("资源不存在")) return jsonify(success(resource_data)) except Exception as e: logger.error(f"获取数据资源详情失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/config', methods=['GET']) @require_auth def get_resource_config(): """获取数据资源配置信息""" config = get_minio_config() return jsonify({ 'allowed_extensions': list(config['allowed_extensions']), 'bucket_name': config['bucket_name'], 'prefix': config['prefix'] })