from io import BytesIO, StringIO import os import pandas as pd from flask import request, jsonify, send_file, current_app from app.api.data_resource import bp from app.models.result import success, failed import logging import json import re from minio import Minio from app.core.graph.graph_operations import MyEncoder from app.services.neo4j_driver import neo4j_driver from app.core.data_resource.resource import ( resource_list, handle_node, resource_kinship_graph, resource_impact_all_graph, model_resource_list, select_create_ddl, data_resource_edit, handle_id_resource, id_data_search_list, table_sql, select_sql, id_resource_graph ) from app.core.meta_data import ( translate_and_parse, infer_column_type, text_resource_solve, get_file_content, get_formatted_time ) import traceback from app.core.system.auth import require_auth logger = logging.getLogger("app") def get_minio_client(): """获取 MinIO 客户端实例""" return Minio( current_app.config['MINIO_HOST'], access_key=current_app.config['MINIO_USER'], secret_key=current_app.config['MINIO_PASSWORD'], secure=current_app.config['MINIO_SECURE'] ) def get_minio_config(): """获取 MinIO 配置""" return { 'bucket_name': current_app.config['BUCKET_NAME'], 'prefix': current_app.config['PREFIX'], 'allowed_extensions': current_app.config['ALLOWED_EXTENSIONS'] } def is_english(text): """检查文本是否为英文""" return text.isascii() and bool(re.match(r'^[a-zA-Z0-9_\s.,;:!?()\'"-]+$', text)) @bp.route('/translate', methods=['POST']) def data_resource_translate(): # 获取表单数据 data_resource = request.form.get('data_resource') meta_data = request.form.get('meta_data') meta_data_list = json.loads(meta_data) file = request.files.get('file') if not data_resource or not meta_data or not file: return jsonify(failed("缺少必要参数")) # 构建翻译后的内容组合 translated_meta_data_list = [] for meta_item in meta_data_list: if is_english(meta_item): # 检查是否为英文 translated_meta_data_list.append(meta_item) # 如果是英文,则直接添加 else: translated_meta_data_list.append(translate_and_parse(meta_item)[0]) # 否则翻译后添加 # 对 data_resource 进行翻译 translated_data_resource = translate_and_parse(data_resource) if translated_data_resource and len(translated_data_resource) > 0: translated_data_resource = translated_data_resource[0] else: translated_data_resource = data_resource # 翻译失败时使用原值 try: # 构建最终的翻译结果 # meta_en = translated_meta_data_list resource = {"name": data_resource, "en_name": translated_data_resource} parsed_data = [] # 读取文件内容 file_content = file.read() # 重置文件指针 file.seek(0) try: df = pd.read_excel(BytesIO(file_content)) except Exception as e: return jsonify(failed(f"文件格式错误: {str(e)}")) # 获取列名和对应的数据类型 columns_and_types = infer_column_type(df) for i in range(len(meta_data_list)): zh = meta_data_list[i] en = translated_meta_data_list[i] data_type = columns_and_types[i] if i < len(columns_and_types) else "varchar(255)" parsed_item = {"name": zh, "en_name": en, "data_type": data_type} parsed_data.append(parsed_item) response_data = { "head_data": parsed_data, "data_resource": resource } return jsonify(success(response_data, "success")) except Exception as e: return jsonify(failed({}, str(e))) @bp.route('/save', methods=['POST']) def data_resource_save(): """保存数据资源""" try: # 获取表单数据 # 表单以 receiver 开头时使用下面的方法: # receiver = request.json.get('receiver', {}) receiver = request.get_json() additional_info = receiver['additional_info'] # 检查receiver是否存在 if not receiver: return jsonify(failed("参数不完整:缺少receiver")) # 检查url是否存在 if 'url' not in receiver: return jsonify(failed("参数不完整:缺少url")) file_extension = receiver['url'].split('.')[-1] if file_extension == 'xlsx' or file_extension == 'xls' or file_extension == 'csv': head_data = additional_info['head_data'] data_resource = additional_info['data_resource'] if not receiver or not data_resource: return jsonify(failed("参数不完整")) # 调用业务逻辑处理数据资源创建 resource_id = handle_node(receiver, head_data, data_resource) else: return jsonify(failed("文件格式错误")) return jsonify(success({"id": resource_id})) except Exception as e: logger.error(f"保存数据资源失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/delete', methods=['POST']) def data_resource_delete(): """删除数据资源""" try: # 获取资源ID resource_id = request.json.get('id') if not resource_id: return jsonify(failed("资源ID不能为空")) with neo4j_driver.get_session() as session: # 删除数据资源节点及其关系 cypher = """ MATCH (n:data_resource) WHERE id(n) = $resource_id DETACH DELETE n """ session.run(cypher, resource_id=int(resource_id)) return jsonify(success({"message": "数据资源删除成功"})) except Exception as e: logger.error(f"删除数据资源失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/update', methods=['POST']) def data_resource_update(): """更新数据资源""" try: # 获取更新数据 data = request.json if not data or "id" not in data: return jsonify(failed("参数不完整")) # 调用业务逻辑更新数据资源 updated_data = data_resource_edit(data) return jsonify(success(updated_data)) except Exception as e: logger.error(f"更新数据资源失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/ddl', methods=['POST']) def id_data_ddl(): """解析数据资源的DDL""" try: # 获取SQL内容 sql_content = request.json.get('sql', '') if not sql_content: return jsonify(failed("SQL内容不能为空")) # 记录原始SQL用于调试 logger.debug(f"原始SQL: {sql_content}") # 提取创建表的DDL语句 create_ddl_list = select_create_ddl(sql_content) if not create_ddl_list: return jsonify(failed("未找到有效的CREATE TABLE语句")) # 解析每个表定义 tables_dict = {} # 最终返回的表字典 for ddl in create_ddl_list: table_info = table_sql(ddl) if table_info: # table_info格式: {"table_name": {"exist": bool, "meta": [...], "table_comment": "..."}} # 合并到结果字典中 tables_dict.update(table_info) if not tables_dict: return jsonify(failed("解析表结构失败")) # 记录结果 logger.debug(f"解析结果: {json.dumps(tables_dict, ensure_ascii=False)}") # 直接返回解析结果 return jsonify(success(tables_dict)) except Exception as e: logger.error(f"解析DDL失败: {str(e)}") logger.error(traceback.format_exc()) # 添加详细错误堆栈 return jsonify(failed(str(e))) @bp.route('/list', methods=['POST']) def data_resource_list(): """获取数据资源列表""" try: # 获取分页和筛选参数 page = int(request.json.get('current', 1)) page_size = int(request.json.get('size', 10)) en_name_filter = request.json.get('en_name') name_filter = request.json.get('name') type_filter = request.json.get('type', 'all') category_filter = request.json.get('category') tag_filter = request.json.get('tag') # 调用业务逻辑查询数据资源列表 resources, total_count = resource_list( page, page_size, en_name_filter, name_filter, type_filter, category_filter, tag_filter ) # 返回结果 return jsonify(success({ "records": resources, "total": total_count, "size": page_size, "current": page })) except Exception as e: logger.error(f"获取数据资源列表失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/search', methods=['POST']) def id_data_search(): """搜索数据资源关联的元数据""" try: # 获取参数 resource_id = request.json.get('id') if not resource_id: return jsonify(failed("资源ID不能为空")) page = int(request.json.get('current', 1)) page_size = int(request.json.get('size', 10)) en_name_filter = request.json.get('en_name') name_filter = request.json.get('name') category_filter = request.json.get('category') tag_filter = request.json.get('tag') # 调用业务逻辑查询关联元数据 metadata_list, total_count = id_data_search_list( resource_id, page, page_size, en_name_filter, name_filter, category_filter, tag_filter ) # 返回结果 return jsonify(success({ "records": metadata_list, "total": total_count, "size": page_size, "current": page })) except Exception as e: logger.error(f"搜索数据资源关联的元数据失败: {str(e)}") return jsonify(failed(str(e))) def dynamic_type_conversion(value, target_type): """动态类型转换""" if value is None: return None if target_type == "int" or target_type == "INT": return int(value) elif target_type == "float" or target_type == "FLOAT" or target_type == "double" or target_type == "DOUBLE": return float(value) elif target_type == "bool" or target_type == "BOOL" or target_type == "boolean" or target_type == "BOOLEAN": if isinstance(value, str): return value.lower() in ('true', 'yes', '1', 't', 'y') return bool(value) else: return str(value) @bp.route('/graph/all', methods=['POST']) def data_resource_graph_all(): """获取数据资源完整图谱""" try: # 获取参数 resource_id = request.json.get('id') meta = request.json.get('meta', True) if not resource_id: return jsonify(failed("资源ID不能为空")) # 调用业务逻辑获取图谱 graph_data = resource_impact_all_graph(resource_id, meta) return jsonify(success(graph_data)) except Exception as e: logger.error(f"获取数据资源完整图谱失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/graph', methods=['POST']) def data_resource_list_graph(): """获取数据资源亲缘关系图谱""" try: # 获取参数 resource_id = request.json.get('id') meta = request.json.get('meta', True) if not resource_id: return jsonify(failed("资源ID不能为空")) # 调用业务逻辑获取图谱 graph_data = resource_kinship_graph(resource_id, meta) return jsonify(success(graph_data)) except Exception as e: logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/save/metadata', methods=['POST']) def id_data_save(): """保存数据资源关联的元数据""" try: # 获取参数 resource_id = request.json.get('id') metadata_list = request.json.get('data', []) if not resource_id: return jsonify(failed("资源ID不能为空")) if not metadata_list: return jsonify(failed("元数据列表不能为空")) # 处理元数据保存 with neo4j_driver.get_session() as session: # 先删除现有关系 cypher_delete = """ MATCH (n:data_resource)-[r:contain]->() WHERE id(n) = $resource_id DELETE r """ session.run(cypher_delete, resource_id=int(resource_id)) # 添加新关系 for meta in metadata_list: # 创建或获取元数据节点 meta_cypher = """ MERGE (m:Metadata {name: $name}) ON CREATE SET m.en_name = $en_name, m.createTime = $create_time RETURN m """ meta_result = session.run( meta_cypher, name=meta["name"], en_name=meta["en_name"], create_time=meta.get("createTime", get_formatted_time()) ) meta_node = meta_result.single()["m"] # 创建关系 rel_cypher = """ MATCH (n:data_resource), (m:Metadata) WHERE id(n) = $resource_id AND id(m) = $meta_id CREATE (n)-[r:contain]->(m) RETURN r """ session.run( rel_cypher, resource_id=int(resource_id), meta_id=meta_node.id ) return jsonify(success({"message": "元数据保存成功"})) except Exception as e: logger.error(f"保存数据资源关联的元数据失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/sql/test', methods=['POST']) def sql_test(): """测试SQL查询""" try: # 获取参数 sql_query = request.json.get('sql', '') if not sql_query: return jsonify(failed("SQL查询不能为空")) # 解析SQL parsed_sql = select_sql(sql_query) if not parsed_sql: return jsonify(failed("解析SQL失败")) # 返回解析结果 return jsonify(success(parsed_sql)) except Exception as e: logger.error(f"测试SQL查询失败: {str(e)}") return jsonify(failed(str(e))) # 废弃的识别DDL语句方法,该API 与 ddl API 功能类似,但功能简化了 @bp.route('/ddl/identify', methods=['POST']) def sql_ddl_identify(): """识别DDL语句""" try: # 获取参数 sql_content = request.json.get('sql', '') if not sql_content: return jsonify(failed("SQL内容不能为空")) # 提取创建表的DDL语句 create_ddl_list = select_create_ddl(sql_content) if not create_ddl_list: return jsonify(failed("未找到有效的CREATE TABLE语句")) return jsonify(success({"count": len(create_ddl_list)})) except Exception as e: logger.error(f"识别DDL语句失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/model/list', methods=['POST']) def resource_model_list(): """获取模型资源列表""" try: # 获取分页和筛选参数 page = int(request.json.get('current', 1)) page_size = int(request.json.get('size', 10)) name_filter = request.json.get('name') # 调用业务逻辑查询模型资源列表 resources, total_count = model_resource_list(page, page_size, name_filter) # 返回结果 return jsonify(success({ "records": resources, "total": total_count, "size": page_size, "current": page })) except Exception as e: logger.error(f"获取模型资源列表失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/detail', methods=['POST']) def data_resource_detail(): """获取数据资源详情""" try: # 获取资源ID resource_id = request.json.get('id') if not resource_id: return jsonify(failed("资源ID不能为空")) # 调用业务逻辑查询数据资源详情 resource_data = handle_id_resource(resource_id) if not resource_data: return jsonify(failed("资源不存在")) return jsonify(success(resource_data)) except Exception as e: logger.error(f"获取数据资源详情失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/config', methods=['GET']) @require_auth def get_resource_config(): """获取数据资源配置信息""" config = get_minio_config() return jsonify({ 'allowed_extensions': list(config['allowed_extensions']), 'bucket_name': config['bucket_name'], 'prefix': config['prefix'] }) """解析表定义SQL,支持带schema和不带schema两种格式""" try: # 支持以下格式: # 1. CREATE TABLE tablename # 2. CREATE TABLE "tablename" # 3. CREATE TABLE schema.tablename # 4. CREATE TABLE "schema"."tablename" table_name_pattern = r'CREATE\s+TABLE\s+(?:(?:"([^"]+)"|([^"\s\.]+))\.)?(?:"([^"]+)"|([^"\s\(]+))' table_name_match = re.search(table_name_pattern, sql, re.IGNORECASE) if not table_name_match: return None # 获取表名,优先使用带引号的名称,如果没有则使用不带引号的 schema = table_name_match.group(1) or table_name_match.group(2) # schema是可选的 table_name = table_name_match.group(3) or table_name_match.group(4) # 实际表名 # 提取字段定义 fields_pattern = r'CREATE\s+TABLE[^(]*\(\s*(.*?)\s*\)' fields_match = re.search(fields_pattern, sql, re.DOTALL | re.IGNORECASE) if not fields_match: return None fields_text = fields_match.group(1) # 分割字段定义 field_definitions = [] in_parenthesis = 0 current_field = "" for char in fields_text: if char == '(': in_parenthesis += 1 current_field += char elif char == ')': in_parenthesis -= 1 current_field += char elif char == ',' and in_parenthesis == 0: field_definitions.append(current_field.strip()) current_field = "" else: current_field += char if current_field.strip(): field_definitions.append(current_field.strip()) # 解析每个字段 fields = [] primary_keys = [] for field_def in field_definitions: # 忽略PRIMARY KEY等约束定义 if re.match(r'^\s*(?:PRIMARY|UNIQUE|FOREIGN|CHECK|CONSTRAINT)\s+', field_def, re.IGNORECASE): # 提取主键字段 pk_pattern = r'PRIMARY\s+KEY\s*\(\s*(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))\s*\)' pk_match = re.search(pk_pattern, field_def, re.IGNORECASE) if pk_match: pk = next((g for g in pk_match.groups() if g is not None), "") primary_keys.append(pk) continue # 解析常规字段定义 field_pattern = r'^\s*(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))\s+([A-Za-z0-9_]+(?:\s*\([^)]*\))?)' field_match = re.search(field_pattern, field_def) if field_match: # 提取字段名和类型 field_name = next((g for g in field_match.groups()[:4] if g is not None), "") field_type = field_match.group(5) # 检查是否为主键 is_primary = "PRIMARY KEY" in field_def.upper() if is_primary: primary_keys.append(field_name) # 检查是否为非空 not_null = "NOT NULL" in field_def.upper() # 检查默认值 default_match = re.search(r'DEFAULT\s+([^,\s]+)', field_def, re.IGNORECASE) default_value = default_match.group(1) if default_match else None # 添加字段信息 field_info = { "name": field_name, "type": clean_type(field_type), "is_primary": is_primary, "not_null": not_null } if default_value: field_info["default"] = default_value fields.append(field_info) # 更新主键标记 for field in fields: if field["name"] in primary_keys and not field["is_primary"]: field["is_primary"] = True # 返回结果,包含schema信息 result = { "table_name": table_name, "fields": fields } # 如果有schema,添加到结果中 if schema: result["schema"] = schema return result except Exception as e: logger.error(f"解析表定义SQL失败: {str(e)}") return None