from io import BytesIO, StringIO import os import pandas as pd from flask import request, jsonify, send_file from app.api.data_resource import bp from app.models.result import success, failed import logging import json import re from minio import Minio from app.config.config import Config from app.core.graph.graph_operations import MyEncoder from app.services.neo4j_driver import neo4j_driver from app.core.data_resource.resource import ( resource_list, handle_node, resource_kinship_graph, resource_impact_all_graph, model_resource_list, select_create_ddl, data_resource_edit, handle_id_resource, id_data_search_list, table_sql, select_sql, id_resource_graph ) from app.core.meta_data import ( translate_and_parse, infer_column_type, text_resource_solve, get_file_content, get_formatted_time ) logger = logging.getLogger("app") # 配置MinIO客户端 minio_client = Minio( Config.MINIO_HOST, access_key=Config.MINIO_USER, secret_key=Config.MINIO_PASSWORD, secure=True ) # 配置文件上传相关 UPLOAD_FOLDER = Config.UPLOAD_FOLDER bucket_name = Config.BUCKET_NAME prefix = Config.PREFIX def is_english(text): """检查文本是否为英文""" return text.isascii() and bool(re.match(r'^[a-zA-Z0-9_\s.,;:!?()\'"-]+$', text)) @bp.route('/translate', methods=['POST']) def data_resource_translate(): # 获取表单数据 data_resource = request.form.get('data_resource') meta_data = request.form.get('meta_data') meta_data_list = json.loads(meta_data) file = request.files.get('file') if not data_resource or not meta_data or not file: return jsonify(failed("缺少必要参数")) # 构建翻译后的内容组合 translated_meta_data_list = [] for meta_item in meta_data_list: if is_english(meta_item): # 检查是否为英文 translated_meta_data_list.append(meta_item) # 如果是英文,则直接添加 else: translated_meta_data_list.append(translate_and_parse(meta_item)[0]) # 否则翻译后添加 # 对 data_resource 进行翻译 translated_data_resource = translate_and_parse(data_resource) if translated_data_resource and len(translated_data_resource) > 0: translated_data_resource = translated_data_resource[0] else: translated_data_resource = data_resource # 翻译失败时使用原值 try: # 构建最终的翻译结果 # meta_en = translated_meta_data_list resource = {"name": data_resource, "en_name": translated_data_resource} parsed_data = [] # 读取文件内容 file_content = file.read() # 重置文件指针 file.seek(0) try: df = pd.read_excel(BytesIO(file_content)) except Exception as e: return jsonify(failed(f"文件格式错误: {str(e)}")) # 获取列名和对应的数据类型 columns_and_types = infer_column_type(df) for i in range(len(meta_data_list)): zh = meta_data_list[i] en = translated_meta_data_list[i] data_type = columns_and_types[i] if i < len(columns_and_types) else "varchar(255)" parsed_item = {"name": zh, "en_name": en, "data_type": data_type} parsed_data.append(parsed_item) response_data = { "head_data": parsed_data, "data_resource": resource } return jsonify(success(response_data, "success")) except Exception as e: return jsonify(failed({}, str(e))) # 废弃的翻译方法 # """数据资源翻译""" # try: # # 获取表单数据 # name = request.json.get('name', '') # en_name = request.json.get('en_name', '') # data_type = request.json.get('data_type', 'table') # is_file = request.json.get('is_file', False) # # 验证输入 # if not name: # return jsonify(failed("名称不能为空")) # # 如果已经提供了英文名,则直接使用 # if en_name and is_english(en_name): # translated = True # return jsonify(success({"name": name, "en_name": en_name, "translated": translated})) # # 否则进行翻译 # try: # if data_type == 'table': # prompt = f"""将以下数据表名(中文)翻译成英文,不需要额外说明: # 中文:{name} # 英文(snake_case格式): # """ # result = text_resource_solve(None, name, "") # translated = True # return jsonify(success({"name": name, "en_name": result["en_name"], "translated": translated})) # else: # result = text_resource_solve(None, name, "") # translated = True # return jsonify(success({"name": name, "en_name": result["en_name"], "translated": translated})) # except Exception as e: # logger.error(f"翻译失败: {str(e)}") # return jsonify(failed(f"翻译失败: {str(e)}")) # except Exception as e: # logger.error(f"处理数据资源翻译请求失败: {str(e)}") # return jsonify(failed(str(e))) @bp.route('/save', methods=['POST']) def data_resource_save(): """保存数据资源""" try: # 获取表单数据 receiver = request.json.get('receiver', {}) head_data = request.json.get('head_data', []) data_resource = request.json.get('data_resource', {}) if not receiver or not data_resource: return jsonify(failed("参数不完整")) # 调用业务逻辑处理数据资源创建 resource_id = handle_node(receiver, head_data, data_resource) return jsonify(success({"id": resource_id})) except Exception as e: logger.error(f"保存数据资源失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/delete', methods=['POST']) def data_resource_delete(): """删除数据资源""" try: # 获取资源ID resource_id = request.json.get('id') if not resource_id: return jsonify(failed("资源ID不能为空")) with neo4j_driver.get_session() as session: # 删除数据资源节点及其关系 cypher = """ MATCH (n:data_resource) WHERE id(n) = $resource_id DETACH DELETE n """ session.run(cypher, resource_id=int(resource_id)) return jsonify(success({"message": "数据资源删除成功"})) except Exception as e: logger.error(f"删除数据资源失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/update', methods=['POST']) def data_resource_update(): """更新数据资源""" try: # 获取更新数据 data = request.json if not data or "id" not in data: return jsonify(failed("参数不完整")) # 调用业务逻辑更新数据资源 updated_data = data_resource_edit(data) return jsonify(success(updated_data)) except Exception as e: logger.error(f"更新数据资源失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/ddl', methods=['POST']) def id_data_ddl(): """解析数据资源的DDL""" try: # 获取SQL内容 sql_content = request.json.get('sql', '') if not sql_content: return jsonify(failed("SQL内容不能为空")) # 提取创建表的DDL语句 create_ddl_list = select_create_ddl(sql_content) if not create_ddl_list: return jsonify(failed("未找到有效的CREATE TABLE语句")) # 解析每个表定义 tables = [] for ddl in create_ddl_list: table_info = table_sql(ddl) if table_info: tables.append(table_info) if not tables: return jsonify(failed("解析表结构失败")) # 转换结果为前端需要的格式 result = [] for table in tables: table_result = { "name": table["table_name"], "en_name": table["table_name"], # 初始使用原名 "fields": [] } # 处理每个字段 for field in table["fields"]: field_info = { "name": field["name"], "en_name": field["name"], # 初始使用原名 "type": field["type"], "is_primary": field["is_primary"], "nullable": not field.get("not_null", False) } if "default" in field: field_info["default"] = field["default"] table_result["fields"].append(field_info) result.append(table_result) return jsonify(success(result)) except Exception as e: logger.error(f"解析DDL失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/list', methods=['POST']) def data_resource_list(): """获取数据资源列表""" try: # 获取分页和筛选参数 page = int(request.json.get('current', 1)) page_size = int(request.json.get('size', 10)) en_name_filter = request.json.get('en_name') name_filter = request.json.get('name') type_filter = request.json.get('type', 'all') category_filter = request.json.get('category') tag_filter = request.json.get('tag') # 调用业务逻辑查询数据资源列表 resources, total_count = resource_list( page, page_size, en_name_filter, name_filter, type_filter, category_filter, tag_filter ) # 返回结果 return jsonify(success({ "records": resources, "total": total_count, "size": page_size, "current": page })) except Exception as e: logger.error(f"获取数据资源列表失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/search', methods=['POST']) def id_data_search(): """搜索数据资源关联的元数据""" try: # 获取参数 resource_id = request.json.get('id') if not resource_id: return jsonify(failed("资源ID不能为空")) page = int(request.json.get('current', 1)) page_size = int(request.json.get('size', 10)) en_name_filter = request.json.get('en_name') name_filter = request.json.get('name') category_filter = request.json.get('category') tag_filter = request.json.get('tag') # 调用业务逻辑查询关联元数据 metadata_list, total_count = id_data_search_list( resource_id, page, page_size, en_name_filter, name_filter, category_filter, tag_filter ) # 返回结果 return jsonify(success({ "records": metadata_list, "total": total_count, "size": page_size, "current": page })) except Exception as e: logger.error(f"搜索数据资源关联的元数据失败: {str(e)}") return jsonify(failed(str(e))) def dynamic_type_conversion(value, target_type): """动态类型转换""" if value is None: return None if target_type == "int" or target_type == "INT": return int(value) elif target_type == "float" or target_type == "FLOAT" or target_type == "double" or target_type == "DOUBLE": return float(value) elif target_type == "bool" or target_type == "BOOL" or target_type == "boolean" or target_type == "BOOLEAN": if isinstance(value, str): return value.lower() in ('true', 'yes', '1', 't', 'y') return bool(value) else: return str(value) @bp.route('/graph/all', methods=['POST']) def data_resource_graph_all(): """获取数据资源完整图谱""" try: # 获取参数 resource_id = request.json.get('id') meta = request.json.get('meta', True) if not resource_id: return jsonify(failed("资源ID不能为空")) # 调用业务逻辑获取图谱 graph_data = resource_impact_all_graph(resource_id, meta) return jsonify(success(graph_data)) except Exception as e: logger.error(f"获取数据资源完整图谱失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/graph', methods=['POST']) def data_resource_list_graph(): """获取数据资源亲缘关系图谱""" try: # 获取参数 resource_id = request.json.get('id') meta = request.json.get('meta', True) if not resource_id: return jsonify(failed("资源ID不能为空")) # 调用业务逻辑获取图谱 graph_data = resource_kinship_graph(resource_id, meta) return jsonify(success(graph_data)) except Exception as e: logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/save/metadata', methods=['POST']) def id_data_save(): """保存数据资源关联的元数据""" try: # 获取参数 resource_id = request.json.get('id') metadata_list = request.json.get('data', []) if not resource_id: return jsonify(failed("资源ID不能为空")) if not metadata_list: return jsonify(failed("元数据列表不能为空")) # 处理元数据保存 with neo4j_driver.get_session() as session: # 先删除现有关系 cypher_delete = """ MATCH (n:data_resource)-[r:contain]->() WHERE id(n) = $resource_id DELETE r """ session.run(cypher_delete, resource_id=int(resource_id)) # 添加新关系 for meta in metadata_list: # 创建或获取元数据节点 meta_cypher = """ MERGE (m:Metadata {name: $name}) ON CREATE SET m.en_name = $en_name, m.createTime = $create_time RETURN m """ meta_result = session.run( meta_cypher, name=meta["name"], en_name=meta["en_name"], create_time=meta.get("createTime", get_formatted_time()) ) meta_node = meta_result.single()["m"] # 创建关系 rel_cypher = """ MATCH (n:data_resource), (m:Metadata) WHERE id(n) = $resource_id AND id(m) = $meta_id CREATE (n)-[r:contain]->(m) RETURN r """ session.run( rel_cypher, resource_id=int(resource_id), meta_id=meta_node.id ) return jsonify(success({"message": "元数据保存成功"})) except Exception as e: logger.error(f"保存数据资源关联的元数据失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/sql/test', methods=['POST']) def sql_test(): """测试SQL查询""" try: # 获取参数 sql_query = request.json.get('sql', '') if not sql_query: return jsonify(failed("SQL查询不能为空")) # 解析SQL parsed_sql = select_sql(sql_query) if not parsed_sql: return jsonify(failed("解析SQL失败")) # 返回解析结果 return jsonify(success(parsed_sql)) except Exception as e: logger.error(f"测试SQL查询失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/ddl/identify', methods=['POST']) def sql_ddl_identify(): """识别DDL语句""" try: # 获取参数 sql_content = request.json.get('sql', '') if not sql_content: return jsonify(failed("SQL内容不能为空")) # 提取创建表的DDL语句 create_ddl_list = select_create_ddl(sql_content) if not create_ddl_list: return jsonify(failed("未找到有效的CREATE TABLE语句")) return jsonify(success({"count": len(create_ddl_list)})) except Exception as e: logger.error(f"识别DDL语句失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/model/list', methods=['POST']) def resource_model_list(): """获取模型资源列表""" try: # 获取分页和筛选参数 page = int(request.json.get('current', 1)) page_size = int(request.json.get('size', 10)) name_filter = request.json.get('name') # 调用业务逻辑查询模型资源列表 resources, total_count = model_resource_list(page, page_size, name_filter) # 返回结果 return jsonify(success({ "records": resources, "total": total_count, "size": page_size, "current": page })) except Exception as e: logger.error(f"获取模型资源列表失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/detail', methods=['POST']) def data_resource_detail(): """获取数据资源详情""" try: # 获取资源ID resource_id = request.json.get('id') if not resource_id: return jsonify(failed("资源ID不能为空")) # 调用业务逻辑查询数据资源详情 resource_data = handle_id_resource(resource_id) if not resource_data: return jsonify(failed("资源不存在")) return jsonify(success(resource_data)) except Exception as e: logger.error(f"获取数据资源详情失败: {str(e)}") return jsonify(failed(str(e)))