Browse Source

translate rewrite complete, added llm translation

wangxq 1 month ago
parent
commit
a07f06b993
2 changed files with 203 additions and 61 deletions
  1. 93 33
      app/api/data_resource/routes.py
  2. 110 28
      app/core/meta_data/meta_data.py

+ 93 - 33
app/api/data_resource/routes.py

@@ -9,6 +9,7 @@ import json
 import re
 from minio import Minio
 from app.config.config import Config
+from app.core.graph.graph_operations import MyEncoder
 from app.services.neo4j_driver import neo4j_driver
 from app.core.data_resource.resource import (
     resource_list, 
@@ -51,45 +52,104 @@ def is_english(text):
     """检查文本是否为英文"""
     return text.isascii() and bool(re.match(r'^[a-zA-Z0-9_\s.,;:!?()\'"-]+$', text))
 
+
 @bp.route('/translate', methods=['POST'])
 def data_resource_translate():
-    """数据资源翻译"""
+    # 获取表单数据
+    data_resource = request.form.get('data_resource')
+    meta_data = request.form.get('meta_data')
+    meta_data_list = json.loads(meta_data)
+    file = request.files.get('file')
+
+    if not data_resource or not meta_data or not file:
+        return jsonify(failed("缺少必要参数"))
+
+    # 构建翻译后的内容组合
+    translated_meta_data_list = []
+    for meta_item in meta_data_list:
+        if is_english(meta_item):  # 检查是否为英文
+            translated_meta_data_list.append(meta_item)  # 如果是英文,则直接添加
+        else:
+            translated_meta_data_list.append(translate_and_parse(meta_item)[0])  # 否则翻译后添加
+
+    # 对 data_resource 进行翻译
+    translated_data_resource = translate_and_parse(data_resource)
+    if translated_data_resource and len(translated_data_resource) > 0:
+        translated_data_resource = translated_data_resource[0]
+    else:
+        translated_data_resource = data_resource  # 翻译失败时使用原值
+
     try:
-        # 获取表单数据
-        name = request.json.get('name', '')
-        en_name = request.json.get('en_name', '')
-        data_type = request.json.get('data_type', 'table')
-        is_file = request.json.get('is_file', False)
-        
-        # 验证输入
-        if not name:
-            return jsonify(failed("名称不能为空"))
-            
-        # 如果已经提供了英文名,则直接使用
-        if en_name and is_english(en_name):
-            translated = True
-            return jsonify(success({"name": name, "en_name": en_name, "translated": translated}))
-        
-        # 否则进行翻译
+        # 构建最终的翻译结果
+        # meta_en = translated_meta_data_list
+        resource = {"name": data_resource, "en_name": translated_data_resource}
+        parsed_data = []
+
+        # 读取文件内容
+        file_content = file.read()
+        # 重置文件指针
+        file.seek(0)
+
         try:
-            if data_type == 'table':
-                prompt = f"""将以下数据表名(中文)翻译成英文,不需要额外说明:
-                中文:{name}
-                英文(snake_case格式):
-                """
-                result = text_resource_solve(None, name, "")
-                translated = True
-                return jsonify(success({"name": name, "en_name": result["en_name"], "translated": translated}))
-            else:
-                result = text_resource_solve(None, name, "")
-                translated = True
-                return jsonify(success({"name": name, "en_name": result["en_name"], "translated": translated}))
+            df = pd.read_excel(BytesIO(file_content))
         except Exception as e:
-            logger.error(f"翻译失败: {str(e)}")
-            return jsonify(failed(f"翻译失败: {str(e)}"))
+            return jsonify(failed(f"文件格式错误: {str(e)}"))
+        # 获取列名和对应的数据类型
+        columns_and_types = infer_column_type(df)
+        for i in range(len(meta_data_list)):
+            zh = meta_data_list[i]
+            en = translated_meta_data_list[i]
+            data_type = columns_and_types[i] if i < len(columns_and_types) else "varchar(255)"
+            parsed_item = {"name": zh, "en_name": en, "data_type": data_type}
+            parsed_data.append(parsed_item)
+
+        response_data = {
+            "head_data": parsed_data,
+            "data_resource": resource
+        }
+        return jsonify(success(response_data, "success"))
+
     except Exception as e:
-        logger.error(f"处理数据资源翻译请求失败: {str(e)}")
-        return jsonify(failed(str(e)))
+        return jsonify(failed({}, str(e)))
+
+    # 废弃的翻译方法
+    # """数据资源翻译"""
+    # try:
+    #     # 获取表单数据
+    #     name = request.json.get('name', '')
+    #     en_name = request.json.get('en_name', '')
+    #     data_type = request.json.get('data_type', 'table')
+    #     is_file = request.json.get('is_file', False)
+        
+    #     # 验证输入
+    #     if not name:
+    #         return jsonify(failed("名称不能为空"))
+            
+    #     # 如果已经提供了英文名,则直接使用
+    #     if en_name and is_english(en_name):
+    #         translated = True
+    #         return jsonify(success({"name": name, "en_name": en_name, "translated": translated}))
+        
+    #     # 否则进行翻译
+    #     try:
+    #         if data_type == 'table':
+    #             prompt = f"""将以下数据表名(中文)翻译成英文,不需要额外说明:
+    #             中文:{name}
+    #             英文(snake_case格式):
+    #             """
+    #             result = text_resource_solve(None, name, "")
+    #             translated = True
+    #             return jsonify(success({"name": name, "en_name": result["en_name"], "translated": translated}))
+    #         else:
+    #             result = text_resource_solve(None, name, "")
+    #             translated = True
+    #             return jsonify(success({"name": name, "en_name": result["en_name"], "translated": translated}))
+    #     except Exception as e:
+    #         logger.error(f"翻译失败: {str(e)}")
+    #         return jsonify(failed(f"翻译失败: {str(e)}"))
+    # except Exception as e:
+    #     logger.error(f"处理数据资源翻译请求失败: {str(e)}")
+    #     return jsonify(failed(str(e)))
 
 @bp.route('/save', methods=['POST'])
 def data_resource_save():

+ 110 - 28
app/core/meta_data/meta_data.py

@@ -29,53 +29,135 @@ def get_formatted_time():
     """获取格式化的当前时间"""
     return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
 
-def translate_and_parse(data):
-    """转换并解析数据"""
-    if isinstance(data, dict):
-        return data
-    else:
-        return {}
+def translate_and_parse(content):
+    translate = llm_client(content)
+    if translate is None:
+        return [content]
+    try:
+        temp = translate.replace("'", '"')
+        result_list = json.loads(temp)
+        
+        # 处理括号的问题
+        processed_list = []
+        for item in result_list:
+            # 方案1: 将括号及内容替换为下划线 - 如 "Salary (Yuan)" → "salary_yuan"
+            item_lower = item.lower()
+            if '(' in item_lower and ')' in item_lower:
+                # 找到左右括号位置
+                left_pos = item_lower.find('(')
+                right_pos = item_lower.find(')')
+                
+                # 取出括号前的内容和括号中的内容
+                prefix = item_lower[:left_pos].strip()
+                in_brackets = item_lower[left_pos+1:right_pos].strip()
+                
+                # 组合成新的格式
+                processed_item = f"{prefix}_{in_brackets}".replace(' ', '_')
+            else:
+                # 如果没有括号,正常处理
+                processed_item = item_lower.replace(' ', '_')
+                
+            processed_list.append(processed_item)
+        
+        return processed_list
+    except (json.JSONDecodeError, AttributeError) as e:
+        logger.error(f"翻译结果处理失败: {str(e)}")
+        return [content]
+
+    # """转换并解析数据"""
+    # if isinstance(data, dict):
+    #     return data
+    # else:
+    #     return {}
 
 # LLM服务
 def llm_client(content):
-    """调用LLM服务进行内容生成"""
+    """调用LLM服务进行中英文翻译,返回列表格式的结果"""
     client = OpenAI(
         api_key=api_key,
         base_url=base_url
     )
 
     try:
+        print(f"调用API翻译: {content}")
         completion = client.chat.completions.create(
             model=model_name,
             messages=[
-                {"role": "system", "content": "You are a helpful assistant."},
-                {"role": "user", "content": content}
-            ]
+                {"role": "system", "content": "你是一个翻译助手,根据用户的提示进行翻译"},
+                {"role": "user",
+                 "content": "请将以下内容翻译为英文,并按顺序返回结果。输出是列表格式"
+                            "例如,如果输入是 '苹果, 香蕉, 橙子',输出应该是['Apple', 'Banana', 'Orange'],"
+                            "不需要其他任何多余的字符:" + content},
+            ],
+            max_tokens=1024,
+            temperature=0.1,
+            stream=False
         )
+        print(f"翻译结果: {completion.choices[0].message.content.strip()}")
         return completion.choices[0].message.content.strip()
     except Exception as e:
+        print(f"LLM调用失败详情: {str(e)}")
         logger.error(f"LLM调用失败: {str(e)}")
         return None
 
 def infer_column_type(df):
-    """推断DataFrame的列类型"""
-    column_types = {}
-    for column in df.columns:
-        if df[column].dtype == 'object':
-            # 如果列是对象类型,尝试判断是否为日期或字符串
-            if pd.to_datetime(df[column], errors='coerce').notna().all():
-                column_types[column] = 'datetime'
-            else:
-                column_types[column] = 'varchar(255)'
-        elif pd.api.types.is_integer_dtype(df[column]):
-            column_types[column] = 'int'
-        elif pd.api.types.is_float_dtype(df[column]):
-            column_types[column] = 'float'
-        elif pd.api.types.is_bool_dtype(df[column]):
-            column_types[column] = 'boolean'
-        else:
-            column_types[column] = 'varchar(255)'
-    return column_types
+    try:
+        # 列名
+        res = df.columns.to_list()
+        columns = ','.join(res)
+        client = OpenAI(api_key=api_key, base_url=base_url, )
+        response = client.chat.completions.create(
+            model=model_name,
+            messages=[
+                {"role": "system", "content": "你是一个PostgreSQL数据库专家,精通PostgreSQL所有数据类型和最佳实践"},
+                {"role": "user",
+                 "content": "请根据以下数据表内容:" + str(df.head(n=6)) + "其列名为" + columns +
+                            ",帮我判断每个列最合适的PostgreSQL数据类型。请注意以下要求:" +
+                            "1. 对于文本数据,使用varchar并给出合适长度,如varchar(50)、varchar(255)等" +
+                            "2. 对于整数,根据数值范围选择smallint、integer或bigint" +
+                            "3. 对于小数,如果是金额相关字段使用numeric(15,2),其他小数使用numeric(18,6)" +
+                            "4. 对于日期时间,根据实际情况选择date、time或timestamp" +
+                            "5. 对于布尔值,使用boolean类型" +
+                            "6. 如果是JSON数据,使用jsonb类型" +
+                            "请以列表格式返回,列表中的元素顺序要与输入的列名顺序一致,如:" +
+                            "['varchar(255)', 'integer', 'numeric(15,2)', 'timestamp']" +
+                            "只返回列表,不要有任何其他说明文字"},
+            ],
+            max_tokens=1024,
+            temperature=0.1,
+            stream=False
+        )
+        result = response.choices[0].message.content
+        res = result.strip('`').strip('python').strip('`').strip()
+
+        # 使用 ast.literal_eval 函数将字符串转换为列表
+        result_list = ast.literal_eval(res)
+        return result_list
+    except Exception as e:
+        logger.error(f"列类型推断失败: {str(e)}")
+        # 返回一个空列表或默认类型列表,保持返回类型一致
+        return ['varchar(255)'] * len(df.columns) if not df.empty else []
+
+
+    # 废弃的推断列类型方法
+    # """推断DataFrame的列类型"""
+    # column_types = {}
+    # for column in df.columns:
+    #     if df[column].dtype == 'object':
+    #         # 如果列是对象类型,尝试判断是否为日期或字符串
+    #         if pd.to_datetime(df[column], errors='coerce').notna().all():
+    #             column_types[column] = 'datetime'
+    #         else:
+    #             column_types[column] = 'varchar(255)'
+    #     elif pd.api.types.is_integer_dtype(df[column]):
+    #         column_types[column] = 'int'
+    #     elif pd.api.types.is_float_dtype(df[column]):
+    #         column_types[column] = 'float'
+    #     elif pd.api.types.is_bool_dtype(df[column]):
+    #         column_types[column] = 'boolean'
+    #     else:
+    #         column_types[column] = 'varchar(255)'
+    # return column_types
 
 def meta_list(page, page_size, search="", en_name_filter=None, 
              name_filter=None, category_filter=None, time_filter=None, tag_filter=None):