|
- from io import BytesIO, StringIO
- import os
- import pandas as pd
- from flask import request, jsonify, send_file, current_app
- from app.api.data_resource import bp
- from app.models.result import success, failed
- import logging
- import json
- import re
- from minio import Minio
- from app.core.graph.graph_operations import MyEncoder
- from app.services.neo4j_driver import neo4j_driver
- from app.core.data_resource.resource import (
- resource_list,
- handle_node,
- resource_kinship_graph,
- resource_impact_all_graph,
- model_resource_list,
- select_create_ddl,
- data_resource_edit,
- handle_id_resource,
- id_data_search_list,
- table_sql,
- select_sql,
- id_resource_graph,
- status_query
- )
- from app.core.meta_data import (
- translate_and_parse,
- infer_column_type,
- text_resource_solve,
- get_file_content,
- get_formatted_time
- )
- import traceback
- from app.core.system.auth import require_auth
- from app.core.llm.ddl_parser import DDLParser
- logger = logging.getLogger("app")
- def get_minio_client():
- """获取 MinIO 客户端实例"""
- return Minio(
- current_app.config['MINIO_HOST'],
- access_key=current_app.config['MINIO_USER'],
- secret_key=current_app.config['MINIO_PASSWORD'],
- secure=current_app.config['MINIO_SECURE']
- )
- def get_minio_config():
- """获取 MinIO 配置"""
- return {
- 'bucket_name': current_app.config['BUCKET_NAME'],
- 'prefix': current_app.config['PREFIX'],
- 'allowed_extensions': current_app.config['ALLOWED_EXTENSIONS']
- }
- def is_english(text):
- """检查文本是否为英文"""
- return text.isascii() and bool(re.match(r'^[a-zA-Z0-9_\s.,;:!?()\'"-]+$', text))
- @bp.route('/translate', methods=['POST'])
- def data_resource_translate():
- # 获取表单数据
- data_resource = request.form.get('data_resource')
- meta_data = request.form.get('meta_data')
- file = request.files.get('file')
- if not data_resource or not file:
- return jsonify(failed("缺少必要参数:data_resource 或文件"))
- # 处理meta_data可能为None的情况
- if meta_data:
- try:
- # 修复JSON解析问题,处理可能包含特殊引号的情况
- # 替换可能存在的特殊引号字符
- meta_data = meta_data.replace('â', '"').replace('"', '"').replace('"', '"')
- meta_data_list = json.loads(meta_data)
- except json.JSONDecodeError as e:
- logger.error(f"解析meta_data失败: {meta_data}, 错误: {str(e)}")
- # 尝试进行基本的字符串解析,以处理简单的数组格式
- if meta_data.startswith('[') and meta_data.endswith(']'):
- try:
- # 使用ast.literal_eval作为备用解析方法
- import ast
- meta_data_list = ast.literal_eval(meta_data)
- except Exception:
- # 如果仍然失败,使用简单的字符串分割
- meta_data = meta_data.strip('[]')
- meta_data_list = [item.strip('"\'') for item in meta_data.split(',')]
- else:
- meta_data_list = []
- else:
- logger.warning("meta_data为空,将使用空列表")
- meta_data_list = []
- # 构建翻译后的内容组合
- translated_meta_data_list = []
- for meta_item in meta_data_list:
- if is_english(meta_item): # 检查是否为英文
- translated_meta_data_list.append(meta_item) # 如果是英文,则直接添加
- else:
- translated_meta_data_list.append(translate_and_parse(meta_item)[0]) # 否则翻译后添加
- # 对 data_resource 进行翻译
- translated_data_resource = translate_and_parse(data_resource)
- if translated_data_resource and len(translated_data_resource) > 0:
- translated_data_resource = translated_data_resource[0]
- else:
- translated_data_resource = data_resource # 翻译失败时使用原值
- try:
- # 构建最终的翻译结果
- resource = {"name": data_resource, "en_name": translated_data_resource}
- parsed_data = []
- # 读取文件内容
- file_content = file.read()
- # 重置文件指针
- file.seek(0)
- try:
- df = pd.read_excel(BytesIO(file_content))
- except Exception as e:
- return jsonify(failed(f"文件格式错误: {str(e)}"))
-
- # 获取列名和对应的数据类型
- # 如果meta_data为空,使用DataFrame的列名
- if not meta_data_list and not df.empty:
- meta_data_list = df.columns.tolist()
- translated_meta_data_list = []
- for col in meta_data_list:
- if is_english(col):
- translated_meta_data_list.append(col)
- else:
- translated_meta_data_list.append(translate_and_parse(col)[0])
-
- columns_and_types = infer_column_type(df)
- for i in range(len(meta_data_list)):
- zh = meta_data_list[i]
- en = translated_meta_data_list[i]
- data_type = columns_and_types[i] if i < len(columns_and_types) else "varchar(255)"
- parsed_item = {"name": zh, "en_name": en, "data_type": data_type}
- parsed_data.append(parsed_item)
- response_data = {
- "head_data": parsed_data,
- "data_resource": resource
- }
- return jsonify(success(response_data, "success"))
- except Exception as e:
- logger.error(f"翻译处理失败: {str(e)}", exc_info=True)
- return jsonify(failed({}, str(e)))
-
- @bp.route('/save', methods=['POST'])
- def data_resource_save():
- """保存数据资源"""
- try:
- # 获取表单数据
- receiver = request.get_json()
- # 检查receiver是否存在
- if not receiver:
- return jsonify(failed("参数不完整:缺少receiver"))
- # 检查url是否存在
- if 'url' not in receiver or not receiver['url']:
- logger.debug(f"url 为空")
- additional_info = receiver.get('additional_info')
- if not additional_info:
- return jsonify(failed("参数不完整: 缺少additional_info"))
-
- head_data = additional_info.get('head_data')
- file_extension = receiver['url'].split('.')[-1] if receiver.get('url') else ""
-
-
- if file_extension in ['xlsx', 'xls', 'csv']:
- # Excel/CSV文件必须有storage_location
- storage_location = receiver.get('storage_location', '')
- if not storage_location:
- return jsonify(failed("参数不完整:缺少storage_location或storage_location为空"))
-
- # 调用业务逻辑处理数据资源创建,设置resource_type为structure
- resource_id = handle_node(receiver, head_data, data_source=None, resource_type='structure')
-
- elif file_extension == 'sql' or file_extension == "" or not file_extension:
- data_source = additional_info.get('data_source', '')
- storage_location = receiver.get('storage_location', '')
- # 如果有storage_location,按结构化数据处理
- if storage_location:
- resource_id = handle_node(receiver, head_data, data_source=None, resource_type='structure')
-
- # 如果有data_source,按DDL处理
- elif data_source:
- # 检查data_source格式
- if not isinstance(data_source, dict) or not data_source.get("en_name"):
- return jsonify(failed("数据源信息不完整或无效"))
- resource_id = handle_node(receiver, head_data, data_source, resource_type='ddl')
-
- # 两者都没有
- else:
- return jsonify(failed("SQL文件处理需要提供storage_location或有效的data_source信息"))
-
- else:
- return jsonify(failed("文件格式错误"))
-
- return jsonify(success({"id": resource_id}))
- except Exception as e:
- logger.error(f"保存数据资源失败: {str(e)}")
- error_traceback = traceback.format_exc()
- logger.error(f"错误详情: {error_traceback}")
- return jsonify(failed(str(e)))
- @bp.route('/delete', methods=['POST'])
- def data_resource_delete():
- """删除数据资源"""
- try:
- # 获取资源ID
- resource_id = request.json.get('id')
- if not resource_id:
- return jsonify(failed("资源ID不能为空"))
-
- with neo4j_driver.get_session() as session:
- # 删除数据资源节点及其关系
- cypher = """
- MATCH (n:data_resource)
- WHERE id(n) = $resource_id
- DETACH DELETE n
- """
-
- session.run(cypher, resource_id=int(resource_id))
-
- return jsonify(success({"message": "数据资源删除成功"}))
- except Exception as e:
- logger.error(f"删除数据资源失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/update', methods=['POST'])
- def data_resource_update():
- """更新数据资源"""
- try:
- # 获取更新数据
- data = request.json
-
- if not data or "id" not in data:
- return jsonify(failed("参数不完整"))
-
- # 调用业务逻辑更新数据资源
- updated_data = data_resource_edit(data)
-
- return jsonify(success(updated_data))
- except Exception as e:
- logger.error(f"更新数据资源失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 解析ddl,使用正则表达式匹配,但没有进行翻译,也没有对注释进行识别
- # 使用ddl创建数据资源时,调用该API
- @bp.route('/ddl', methods=['POST'])
- def id_data_ddl():
- """解析数据资源的DDL"""
- try:
- # 获取SQL内容
- sql_content = request.json.get('sql', '')
- if not sql_content:
- return jsonify(failed("SQL内容不能为空"))
-
- # 记录原始SQL用于调试
- logger.debug(f"原始SQL: {sql_content}")
-
- # 提取创建表的DDL语句
- create_ddl_list = select_create_ddl(sql_content)
-
- if not create_ddl_list:
- return jsonify(failed("未找到有效的CREATE TABLE语句"))
-
- # 解析每个表定义
- tables_dict = {} # 最终返回的表字典
-
- for ddl in create_ddl_list:
- table_info = table_sql(ddl)
- if table_info:
- # table_info格式: {"table_name": {"exist": bool, "meta": [...], "table_comment": "..."}}
- # 合并到结果字典中
- tables_dict.update(table_info)
-
- if not tables_dict:
- return jsonify(failed("解析表结构失败"))
-
- # 记录结果
- logger.debug(f"解析结果: {json.dumps(tables_dict, ensure_ascii=False)}")
-
- # 直接返回解析结果
- return jsonify(success(tables_dict))
-
- except Exception as e:
- logger.error(f"解析DDL失败: {str(e)}")
- logger.error(traceback.format_exc()) # 添加详细错误堆栈
- return jsonify(failed(str(e)))
- @bp.route('/list', methods=['POST'])
- def data_resource_list():
- """获取数据资源列表"""
- try:
- # 获取分页和筛选参数
- page = int(request.json.get('current', 1))
- page_size = int(request.json.get('size', 10))
- en_name_filter = request.json.get('en_name')
- name_filter = request.json.get('name')
- type_filter = request.json.get('type', 'all')
- category_filter = request.json.get('category')
- tag_filter = request.json.get('tag')
-
- # 调用业务逻辑查询数据资源列表
- resources, total_count = resource_list(
- page,
- page_size,
- en_name_filter,
- name_filter,
- type_filter,
- category_filter,
- tag_filter
- )
-
- # 返回结果
- return jsonify(success({
- "records": resources,
- "total": total_count,
- "size": page_size,
- "current": page
- }))
- except Exception as e:
- logger.error(f"获取数据资源列表失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/search', methods=['POST'])
- def id_data_search():
- """数据资源关联元数据搜索"""
- try:
- # 获取分页和筛选参数
- page = int(request.json.get('current', 1))
- page_size = int(request.json.get('size', 10))
- resource_id = request.json.get('id')
-
- en_name_filter = request.json.get('en_name')
- name_filter = request.json.get('name')
- category_filter = request.json.get('category')
- tag_filter = request.json.get('tag')
-
- if resource_id is None:
- return jsonify(failed("资源ID不能为空"))
-
- # 确保传入的ID为整数
- try:
- resource_id = int(resource_id)
- except (ValueError, TypeError):
- return jsonify(failed(f"资源ID必须为整数, 收到的是: {resource_id}"))
-
- # 记录请求信息
- logger.info(f"获取资源关联元数据请求,ID: {resource_id}")
-
- # 调用业务逻辑查询关联元数据
- metadata_list, total_count = id_data_search_list(
- resource_id,
- page,
- page_size,
- en_name_filter,
- name_filter,
- category_filter,
- tag_filter
- )
-
- # 返回结果
- return jsonify(success({
- "records": metadata_list,
- "total": total_count,
- "size": page_size,
- "current": page
- }))
- except Exception as e:
- logger.error(f"数据资源关联元数据搜索失败: {str(e)}")
- return jsonify(failed(str(e)))
- def dynamic_type_conversion(value, target_type):
- """动态类型转换"""
- if value is None:
- return None
-
- if target_type == "int" or target_type == "INT":
- return int(value)
- elif target_type == "float" or target_type == "FLOAT" or target_type == "double" or target_type == "DOUBLE":
- return float(value)
- elif target_type == "bool" or target_type == "BOOL" or target_type == "boolean" or target_type == "BOOLEAN":
- if isinstance(value, str):
- return value.lower() in ('true', 'yes', '1', 't', 'y')
- return bool(value)
- else:
- return str(value)
- @bp.route('/graph/all', methods=['POST'])
- def data_resource_graph_all():
- """获取数据资源完整图谱"""
- try:
- # 获取参数
- resource_id = request.json.get('id')
- meta = request.json.get('meta', True)
-
- if resource_id is None:
- return jsonify(failed("资源ID不能为空"))
-
- # 确保传入的ID为整数
- try:
- resource_id = int(resource_id)
- except (ValueError, TypeError):
- return jsonify(failed(f"资源ID必须为整数, 收到的是: {resource_id}"))
-
- # 调用业务逻辑获取完整图谱
- graph_data = resource_impact_all_graph(resource_id, meta)
-
- return jsonify(success(graph_data))
- except Exception as e:
- logger.error(f"获取数据资源完整图谱失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/graph', methods=['POST'])
- def data_resource_list_graph():
- """获取数据资源亲缘关系图谱"""
- try:
- # 获取参数
- resource_id = request.json.get('id')
- meta = request.json.get('meta', True)
-
- if resource_id is None:
- return jsonify(failed("资源ID不能为空"))
-
- # 确保传入的ID为整数
- try:
- resource_id = int(resource_id)
- except (ValueError, TypeError):
- return jsonify(failed(f"资源ID必须为整数, 收到的是: {resource_id}"))
-
- # 记录请求信息
- logger.info(f"获取图谱请求,ID: {resource_id}")
-
- # 调用业务逻辑获取图谱
- graph_data = resource_kinship_graph(resource_id, meta)
-
- return jsonify(success(graph_data))
- except Exception as e:
- logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/save/metadata', methods=['POST'])
- def id_data_save():
- """保存数据资源关联的元数据"""
- try:
- # 获取参数
- resource_id = request.json.get('id')
- metadata_list = request.json.get('data', [])
-
- if not resource_id:
- return jsonify(failed("资源ID不能为空"))
-
- if not metadata_list:
- return jsonify(failed("元数据列表不能为空"))
-
- # 处理元数据保存
- with neo4j_driver.get_session() as session:
- # 获取数据资源名称
- resource_query = """
- MATCH (n:data_resource)
- WHERE id(n) = $resource_id
- RETURN n.name as resource_name
- """
- resource_result = session.run(resource_query, resource_id=int(resource_id))
- resource_record = resource_result.single()
-
- if not resource_record:
- return jsonify(failed(f"未找到ID为{resource_id}的数据资源"))
-
- resource_name = resource_record["resource_name"]
-
- # 先删除现有关系
- cypher_delete = """
- MATCH (n:data_resource)-[r:contain]->()
- WHERE id(n) = $resource_id
- DELETE r
- """
- session.run(cypher_delete, resource_id=int(resource_id))
-
- # 添加新关系
- for meta in metadata_list:
- # 创建元数据节点
- meta_cypher = """
- MERGE (m:meta_data {name: $name})
- ON CREATE SET m.en_name = $en_name,
- m.createTime = $create_time,
- m.data_type = $type
- ON MATCH SET m.data_type = $type
- RETURN m
- """
-
- create_time = get_formatted_time()
- meta_result = session.run(
- meta_cypher,
- name=meta["name"],
- en_name=meta["en_name"],
- create_time=create_time,
- type=meta["data_type"]
- )
- meta_node = meta_result.single()["m"]
- meta_id = meta_node.id
-
- # 打印节点ID信息,便于调试
- logger.info(f"元数据节点ID: {meta_id}, 类型: {type(meta_id)}")
- logger.info(f"数据资源节点ID: {resource_id}, 类型: {type(resource_id)}")
-
- # 使用明确的属性名匹配而不是ID
- rel_cypher = """
- MATCH (a:data_resource {name: $r_name}), (m:meta_data {name: $m_name})
- MERGE (a)-[r:contain]->(m)
- RETURN r
- """
-
- rel_result = session.run(
- rel_cypher,
- r_name=resource_name,
- m_name=meta["name"]
- )
-
- # 检查关系是否创建成功
- if rel_result.single():
- logger.info(f"成功创建关系: {resource_name} -> {meta['name']}")
- else:
- logger.warning(f"关系创建结果为空")
- # 额外验证关系是否创建
- verify_cypher = """
- MATCH (a:data_resource {name: $r_name})-[r:contain]->(m:meta_data {name: $m_name})
- RETURN count(r) as rel_count
- """
-
- verify_result = session.run(
- verify_cypher,
- r_name=resource_name,
- m_name=meta["name"]
- )
-
- count = verify_result.single()["rel_count"]
- logger.info(f"验证关系数量: {count}")
-
- return jsonify(success({"message": "元数据保存成功"}))
- except Exception as e:
- logger.error(f"保存数据资源关联的元数据失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/sql/test', methods=['POST'])
- def sql_test():
- """测试SQL查询"""
- try:
- # 获取参数
- sql_query = request.json.get('sql', '')
-
- if not sql_query:
- return jsonify(failed("SQL查询不能为空"))
-
- # 解析SQL
- parsed_sql = select_sql(sql_query)
-
- if not parsed_sql:
- return jsonify(failed("解析SQL失败"))
-
- # 返回解析结果
- return jsonify(success(parsed_sql))
- except Exception as e:
- logger.error(f"测试SQL查询失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 使用LLM识别DDL语句,用来代替原来的正则的方式
- # 用于在数据资源创建时,识别DDL语句 /api/resource/ddl/parse
- @bp.route('/ddl/parse', methods=['POST'])
- def ddl_identify():
- """识别DDL语句"""
- try:
- # 获取参数 - 支持两种方式:上传文件或JSON
- sql_content = ''
-
- # 检查是否有文件上传
- if 'file' in request.files:
- file = request.files['file']
- # 检查文件是否存在且文件名不为空
- if file and file.filename:
- # 检查是否是SQL文件
- if not file.filename.lower().endswith('.sql'):
- return jsonify(failed("只接受SQL文件"))
-
- # 读取文件内容
- sql_content = file.read().decode('utf-8')
- logger.info(f"从上传的文件中读取SQL内容,文件名: {file.filename}")
- # 如果没有文件上传,检查是否有JSON输入
- elif request.is_json:
- sql_content = request.json.get('sql', '')
-
- # 如果两种方式都没有提供SQL内容,则返回错误
- if not sql_content:
- return jsonify(failed("SQL内容不能为空,请上传SQL文件或提供SQL内容"))
-
- parser = DDLParser()
- # 提取创建表的DDL语句
- ddl_list = parser.parse_ddl(sql_content)
-
- if not ddl_list:
- return jsonify(failed("未找到有效的CREATE TABLE语句"))
-
- # 处理表的存在状态
- if isinstance(ddl_list, dict):
- # 获取所有表名
- table_names = list(ddl_list.keys())
-
- # 首先为所有表设置默认的exist状态
- for table_name in table_names:
- ddl_list[table_name]["exist"] = False
-
- if table_names:
- try:
- # 查询表是否存在
- with neo4j_driver.get_session() as session:
- table_query = """
- UNWIND $names AS name
- OPTIONAL MATCH (n:data_resource {en_name: name})
- RETURN name, n IS NOT NULL AS exists
- """
- table_results = session.run(table_query, names=table_names)
-
- # 更新存在的表的状态
- for record in table_results:
- table_name = record["name"]
- exists = record["exists"]
- if table_name in ddl_list:
- ddl_list[table_name]["exist"] = exists
- except Exception as e:
- logger.error(f"检查表存在状态失败: {str(e)}")
- # 如果查询失败,所有表保持默认的False状态
-
- logger.debug(f"识别到的DDL语句: {json.dumps(ddl_list, ensure_ascii=False)}")
-
- return jsonify(success(ddl_list))
- except Exception as e:
- logger.error(f"识别DDL语句失败: {str(e)}")
- logger.error(traceback.format_exc()) # 添加详细错误堆栈
- return jsonify(failed(str(e)))
-
- # 废弃的识别DDL语句方法,该API 与 ddl API 功能类似,但功能简化了
- @bp.route('/ddl/identify', methods=['POST'])
- def sql_ddl_identify():
- """识别DDL语句"""
- try:
- # 获取参数
- sql_content = request.json.get('sql', '')
-
- if not sql_content:
- return jsonify(failed("SQL内容不能为空"))
-
- # 提取创建表的DDL语句
- create_ddl_list = select_create_ddl(sql_content)
-
- if not create_ddl_list:
- return jsonify(failed("未找到有效的CREATE TABLE语句"))
-
- return jsonify(success({"count": len(create_ddl_list)}))
- except Exception as e:
- logger.error(f"识别DDL语句失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/model/list', methods=['POST'])
- def resource_model_list():
- """获取模型资源列表"""
- try:
- # 获取分页和筛选参数
- page = int(request.json.get('current', 1))
- page_size = int(request.json.get('size', 10))
- name_filter = request.json.get('name')
-
- # 调用业务逻辑查询模型资源列表
- resources, total_count = model_resource_list(page, page_size, name_filter)
-
- # 返回结果
- return jsonify(success({
- "records": resources,
- "total": total_count,
- "size": page_size,
- "current": page
- }))
- except Exception as e:
- logger.error(f"获取模型资源列表失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/detail', methods=['POST'])
- def data_resource_detail():
- """获取数据资源详情"""
- try:
- # 获取资源ID
- resource_id = request.json.get('id')
-
- if resource_id is None:
- return jsonify(failed("资源ID不能为空"))
-
- # 确保传入的ID为整数
- try:
- resource_id = int(resource_id)
- except (ValueError, TypeError):
- return jsonify(failed(f"资源ID必须为整数, 收到的是: {resource_id}"))
-
- # 记录请求信息
- logger.info(f"获取资源详情请求,ID: {resource_id}")
-
- # 调用业务逻辑查询数据资源详情
- resource_data = handle_id_resource(resource_id)
-
- if not resource_data:
- logger.error(f"资源不存在,ID: {resource_id}")
- return jsonify(failed("资源不存在"))
-
- # 记录从handle_id_resource返回的数据
- logger.info(f"handle_id_resource返回数据,describe字段: {resource_data.get('describe')}")
-
- # 确保返回的数据格式符合要求
- response_data = {
- "parsed_data": resource_data.get("parsed_data", []),
- "tag": resource_data.get("tag", {"name": None, "id": None}),
- "leader": resource_data.get("leader", ""),
- "organization": resource_data.get("organization", ""),
- "name": resource_data.get("name", ""),
- "en_name": resource_data.get("en_name", ""),
- "data_sensitivity": resource_data.get("data_sensitivity", ""),
- "location": resource_data.get("location", "/"),
- "time": resource_data.get("time", ""),
- "type": resource_data.get("type", ""),
- "category": resource_data.get("category", ""),
- "url": resource_data.get("url", ""),
- "frequency": resource_data.get("frequency", ""),
- "status": resource_data.get("status", True),
- "id": resource_data.get("id"),
- "keywords": resource_data.get("keywords", []),
- "describe": resource_data.get("describe", "")
- }
-
- # 记录最终返回的数据
- logger.info(f"最终返回的response_data,describe字段: {response_data.get('describe')}")
-
- return jsonify(success(response_data))
- except Exception as e:
- logger.error(f"获取数据资源详情失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/config', methods=['GET'])
- @require_auth
- def get_resource_config():
- """获取数据资源配置信息"""
- config = get_minio_config()
- return jsonify({
- 'allowed_extensions': list(config['allowed_extensions']),
- 'bucket_name': config['bucket_name'],
- 'prefix': config['prefix']
- })
|