123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484 |
- from io import BytesIO, StringIO
- import os
- import pandas as pd
- from flask import request, jsonify, send_file
- from app.api.data_resource import bp
- from app.models.result import success, failed
- import logging
- import json
- import re
- from minio import Minio
- from app.config.config import Config
- from app.services.neo4j_driver import neo4j_driver
- from app.core.data_resource.resource import (
- resource_list,
- handle_node,
- resource_kinship_graph,
- resource_impact_all_graph,
- model_resource_list,
- select_create_ddl,
- data_resource_edit,
- handle_id_resource,
- id_data_search_list,
- table_sql,
- select_sql,
- id_resource_graph
- )
- from app.core.meta_data import (
- translate_and_parse,
- infer_column_type,
- text_resource_solve,
- get_file_content,
- get_formatted_time
- )
- logger = logging.getLogger("app")
- # 配置MinIO客户端
- minio_client = Minio(
- Config.MINIO_HOST,
- access_key=Config.MINIO_USER,
- secret_key=Config.MINIO_PASSWORD,
- secure=True
- )
- # 配置文件上传相关
- UPLOAD_FOLDER = Config.UPLOAD_FOLDER
- bucket_name = Config.BUCKET_NAME
- prefix = Config.PREFIX
- def is_english(text):
- """检查文本是否为英文"""
- return text.isascii() and bool(re.match(r'^[a-zA-Z0-9_\s.,;:!?()\'"-]+$', text))
- @bp.route('/translate', methods=['POST'])
- def data_resource_translate():
- """数据资源翻译"""
- try:
- # 获取表单数据
- name = request.json.get('name', '')
- en_name = request.json.get('en_name', '')
- data_type = request.json.get('data_type', 'table')
- is_file = request.json.get('is_file', False)
-
- # 验证输入
- if not name:
- return jsonify(failed("名称不能为空"))
-
- # 如果已经提供了英文名,则直接使用
- if en_name and is_english(en_name):
- translated = True
- return jsonify(success({"name": name, "en_name": en_name, "translated": translated}))
-
- # 否则进行翻译
- try:
- if data_type == 'table':
- prompt = f"""将以下数据表名(中文)翻译成英文,不需要额外说明:
- 中文:{name}
- 英文(snake_case格式):
- """
- result = text_resource_solve(None, name, "")
- translated = True
- return jsonify(success({"name": name, "en_name": result["en_name"], "translated": translated}))
- else:
- result = text_resource_solve(None, name, "")
- translated = True
- return jsonify(success({"name": name, "en_name": result["en_name"], "translated": translated}))
- except Exception as e:
- logger.error(f"翻译失败: {str(e)}")
- return jsonify(failed(f"翻译失败: {str(e)}"))
- except Exception as e:
- logger.error(f"处理数据资源翻译请求失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/save', methods=['POST'])
- def data_resource_save():
- """保存数据资源"""
- try:
- # 获取表单数据
- receiver = request.json.get('receiver', {})
- head_data = request.json.get('head_data', [])
- data_resource = request.json.get('data_resource', {})
-
- if not receiver or not data_resource:
- return jsonify(failed("参数不完整"))
-
- # 调用业务逻辑处理数据资源创建
- resource_id = handle_node(receiver, head_data, data_resource)
-
- return jsonify(success({"id": resource_id}))
- except Exception as e:
- logger.error(f"保存数据资源失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/delete', methods=['POST'])
- def data_resource_delete():
- """删除数据资源"""
- try:
- # 获取资源ID
- resource_id = request.json.get('id')
- if not resource_id:
- return jsonify(failed("资源ID不能为空"))
-
- with neo4j_driver.get_session() as session:
- # 删除数据资源节点及其关系
- cypher = """
- MATCH (n:data_resource)
- WHERE id(n) = $resource_id
- DETACH DELETE n
- """
-
- session.run(cypher, resource_id=int(resource_id))
-
- return jsonify(success({"message": "数据资源删除成功"}))
- except Exception as e:
- logger.error(f"删除数据资源失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/update', methods=['POST'])
- def data_resource_update():
- """更新数据资源"""
- try:
- # 获取更新数据
- data = request.json
-
- if not data or "id" not in data:
- return jsonify(failed("参数不完整"))
-
- # 调用业务逻辑更新数据资源
- updated_data = data_resource_edit(data)
-
- return jsonify(success(updated_data))
- except Exception as e:
- logger.error(f"更新数据资源失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/ddl', methods=['POST'])
- def id_data_ddl():
- """解析数据资源的DDL"""
- try:
- # 获取SQL内容
- sql_content = request.json.get('sql', '')
- if not sql_content:
- return jsonify(failed("SQL内容不能为空"))
-
- # 提取创建表的DDL语句
- create_ddl_list = select_create_ddl(sql_content)
-
- if not create_ddl_list:
- return jsonify(failed("未找到有效的CREATE TABLE语句"))
-
- # 解析每个表定义
- tables = []
- for ddl in create_ddl_list:
- table_info = table_sql(ddl)
- if table_info:
- tables.append(table_info)
-
- if not tables:
- return jsonify(failed("解析表结构失败"))
-
- # 转换结果为前端需要的格式
- result = []
- for table in tables:
- table_result = {
- "name": table["table_name"],
- "en_name": table["table_name"], # 初始使用原名
- "fields": []
- }
-
- # 处理每个字段
- for field in table["fields"]:
- field_info = {
- "name": field["name"],
- "en_name": field["name"], # 初始使用原名
- "type": field["type"],
- "is_primary": field["is_primary"],
- "nullable": not field.get("not_null", False)
- }
-
- if "default" in field:
- field_info["default"] = field["default"]
-
- table_result["fields"].append(field_info)
-
- result.append(table_result)
-
- return jsonify(success(result))
- except Exception as e:
- logger.error(f"解析DDL失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/list', methods=['POST'])
- def data_resource_list():
- """获取数据资源列表"""
- try:
- # 获取分页和筛选参数
- page = int(request.json.get('current', 1))
- page_size = int(request.json.get('size', 10))
- en_name_filter = request.json.get('en_name')
- name_filter = request.json.get('name')
- type_filter = request.json.get('type', 'all')
- category_filter = request.json.get('category')
- tag_filter = request.json.get('tag')
-
- # 调用业务逻辑查询数据资源列表
- resources, total_count = resource_list(
- page,
- page_size,
- en_name_filter,
- name_filter,
- type_filter,
- category_filter,
- tag_filter
- )
-
- # 返回结果
- return jsonify(success({
- "records": resources,
- "total": total_count,
- "size": page_size,
- "current": page
- }))
- except Exception as e:
- logger.error(f"获取数据资源列表失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/search', methods=['POST'])
- def id_data_search():
- """搜索数据资源关联的元数据"""
- try:
- # 获取参数
- resource_id = request.json.get('id')
- if not resource_id:
- return jsonify(failed("资源ID不能为空"))
-
- page = int(request.json.get('current', 1))
- page_size = int(request.json.get('size', 10))
- en_name_filter = request.json.get('en_name')
- name_filter = request.json.get('name')
- category_filter = request.json.get('category')
- tag_filter = request.json.get('tag')
-
- # 调用业务逻辑查询关联元数据
- metadata_list, total_count = id_data_search_list(
- resource_id,
- page,
- page_size,
- en_name_filter,
- name_filter,
- category_filter,
- tag_filter
- )
-
- # 返回结果
- return jsonify(success({
- "records": metadata_list,
- "total": total_count,
- "size": page_size,
- "current": page
- }))
- except Exception as e:
- logger.error(f"搜索数据资源关联的元数据失败: {str(e)}")
- return jsonify(failed(str(e)))
- def dynamic_type_conversion(value, target_type):
- """动态类型转换"""
- if value is None:
- return None
-
- if target_type == "int" or target_type == "INT":
- return int(value)
- elif target_type == "float" or target_type == "FLOAT" or target_type == "double" or target_type == "DOUBLE":
- return float(value)
- elif target_type == "bool" or target_type == "BOOL" or target_type == "boolean" or target_type == "BOOLEAN":
- if isinstance(value, str):
- return value.lower() in ('true', 'yes', '1', 't', 'y')
- return bool(value)
- else:
- return str(value)
- @bp.route('/graph/all', methods=['POST'])
- def data_resource_graph_all():
- """获取数据资源完整图谱"""
- try:
- # 获取参数
- resource_id = request.json.get('id')
- meta = request.json.get('meta', True)
-
- if not resource_id:
- return jsonify(failed("资源ID不能为空"))
-
- # 调用业务逻辑获取图谱
- graph_data = resource_impact_all_graph(resource_id, meta)
-
- return jsonify(success(graph_data))
- except Exception as e:
- logger.error(f"获取数据资源完整图谱失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/graph', methods=['POST'])
- def data_resource_list_graph():
- """获取数据资源亲缘关系图谱"""
- try:
- # 获取参数
- resource_id = request.json.get('id')
- meta = request.json.get('meta', True)
-
- if not resource_id:
- return jsonify(failed("资源ID不能为空"))
-
- # 调用业务逻辑获取图谱
- graph_data = resource_kinship_graph(resource_id, meta)
-
- return jsonify(success(graph_data))
- except Exception as e:
- logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/save/metadata', methods=['POST'])
- def id_data_save():
- """保存数据资源关联的元数据"""
- try:
- # 获取参数
- resource_id = request.json.get('id')
- metadata_list = request.json.get('data', [])
-
- if not resource_id:
- return jsonify(failed("资源ID不能为空"))
-
- if not metadata_list:
- return jsonify(failed("元数据列表不能为空"))
-
- # 处理元数据保存
- with neo4j_driver.get_session() as session:
- # 先删除现有关系
- cypher_delete = """
- MATCH (n:data_resource)-[r:contain]->()
- WHERE id(n) = $resource_id
- DELETE r
- """
- session.run(cypher_delete, resource_id=int(resource_id))
-
- # 添加新关系
- for meta in metadata_list:
- # 创建或获取元数据节点
- meta_cypher = """
- MERGE (m:Metadata {name: $name})
- ON CREATE SET m.en_name = $en_name, m.createTime = $create_time
- RETURN m
- """
-
- meta_result = session.run(
- meta_cypher,
- name=meta["name"],
- en_name=meta["en_name"],
- create_time=meta.get("createTime", get_formatted_time())
- )
- meta_node = meta_result.single()["m"]
-
- # 创建关系
- rel_cypher = """
- MATCH (n:data_resource), (m:Metadata)
- WHERE id(n) = $resource_id AND id(m) = $meta_id
- CREATE (n)-[r:contain]->(m)
- RETURN r
- """
-
- session.run(
- rel_cypher,
- resource_id=int(resource_id),
- meta_id=meta_node.id
- )
-
- return jsonify(success({"message": "元数据保存成功"}))
- except Exception as e:
- logger.error(f"保存数据资源关联的元数据失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/sql/test', methods=['POST'])
- def sql_test():
- """测试SQL查询"""
- try:
- # 获取参数
- sql_query = request.json.get('sql', '')
-
- if not sql_query:
- return jsonify(failed("SQL查询不能为空"))
-
- # 解析SQL
- parsed_sql = select_sql(sql_query)
-
- if not parsed_sql:
- return jsonify(failed("解析SQL失败"))
-
- # 返回解析结果
- return jsonify(success(parsed_sql))
- except Exception as e:
- logger.error(f"测试SQL查询失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/ddl/identify', methods=['POST'])
- def sql_ddl_identify():
- """识别DDL语句"""
- try:
- # 获取参数
- sql_content = request.json.get('sql', '')
-
- if not sql_content:
- return jsonify(failed("SQL内容不能为空"))
-
- # 提取创建表的DDL语句
- create_ddl_list = select_create_ddl(sql_content)
-
- if not create_ddl_list:
- return jsonify(failed("未找到有效的CREATE TABLE语句"))
-
- return jsonify(success({"count": len(create_ddl_list)}))
- except Exception as e:
- logger.error(f"识别DDL语句失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/model/list', methods=['POST'])
- def resource_model_list():
- """获取模型资源列表"""
- try:
- # 获取分页和筛选参数
- page = int(request.json.get('current', 1))
- page_size = int(request.json.get('size', 10))
- name_filter = request.json.get('name')
-
- # 调用业务逻辑查询模型资源列表
- resources, total_count = model_resource_list(page, page_size, name_filter)
-
- # 返回结果
- return jsonify(success({
- "records": resources,
- "total": total_count,
- "size": page_size,
- "current": page
- }))
- except Exception as e:
- logger.error(f"获取模型资源列表失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/detail', methods=['POST'])
- def data_resource_detail():
- """获取数据资源详情"""
- try:
- # 获取资源ID
- resource_id = request.json.get('id')
-
- if not resource_id:
- return jsonify(failed("资源ID不能为空"))
-
- # 调用业务逻辑查询数据资源详情
- resource_data = handle_id_resource(resource_id)
-
- if not resource_data:
- return jsonify(failed("资源不存在"))
-
- return jsonify(success(resource_data))
- except Exception as e:
- logger.error(f"获取数据资源详情失败: {str(e)}")
- return jsonify(failed(str(e)))
|