Procházet zdrojové kódy

task_list表插入失败不影响主流程
dataflow查询失败不影响主流程

maxiaolong před 5 dny
rodič
revize
fd94adc424

+ 4 - 2
app/api/data_interface/routes.py

@@ -42,7 +42,8 @@ def data_standard_detail():
                   RETURN properties(n) as property"""
         # Create a session from the driver returned by connect_graph
         with connect_graph().session() as session:
-            property = session.run(cql, nodeId=nodeid).evaluate()
+            result = session.run(cql, nodeId=nodeid).single()
+            property = result["property"] if result else {}
             if "tag" not in property:
                 property["tag"] = None
             else:
@@ -179,7 +180,8 @@ def data_label_detail():
         cql = """MATCH (n:DataLabel) where id(n) = $nodeId 
                   RETURN properties(n) as property"""
         with connect_graph().session() as session:
-            property = session.run(cql, nodeId=nodeid).evaluate()
+            result = session.run(cql, nodeId=nodeid).single()
+            property = result["property"] if result else {}
             if "describe" not in property:
                 property["describe"] = None
             res = success(property, "success")

+ 148 - 7
app/core/data_flow/dataflows.py

@@ -50,10 +50,8 @@ class DataFlowService:
             """
             
             # 获取Neo4j驱动(如果连接失败会抛出ConnectionError异常)
-            driver = None
             try:
-                driver = connect_graph()
-                with driver.session() as session:
+                with connect_graph().session() as session:
                     list_result = session.run(query, **params).data()
                     
                     # 查询总数
@@ -65,10 +63,14 @@ class DataFlowService:
                     count_params = {'search': search} if search else {}
                     count_result = session.run(count_query, **count_params).single()
                     total = count_result['total'] if count_result else 0
-            finally:
-                # 确保 driver 被正确关闭,避免资源泄漏
-                if driver:
-                    driver.close()
+            except Exception as e:
+                # 确保 driver 被正确关闭,避免资源泄漏 - 这里不再需要手动关闭driver,因为connect_graph返回的可能是单例或新实例,
+                # 但如果是新实例,我们没有引用它去关闭。如果connect_graph设计为每次返回新实例且需要关闭,
+                # 那么之前的代码是对的。如果connect_graph返回单例,则不应关闭。
+                # 根据用户反馈:The driver.close() call prematurely closes a shared driver instance.
+                # 所以我们直接使用 session,并不关闭 driver。
+                logger.error(f"查询数据流失败: {str(e)}")
+                raise e
             
             # 格式化结果
             dataflows = []
@@ -330,6 +332,145 @@ class DataFlowService:
             
             # 执行插入操作
             db.session.execute(insert_sql, params)
+            
+            # 新增:保存到task_list表
+            try:
+                # 1. 解析script_requirement并构建详细的任务描述
+                task_description_md = script_requirement
+                
+                try:
+                    # 尝试解析JSON
+                    import json
+                    try:
+                        req_json = json.loads(script_requirement)
+                    except (json.JSONDecodeError, TypeError):
+                        req_json = None
+                        
+                    if isinstance(req_json, dict):
+                        # 提取字段
+                        business_domains = []
+                        bd_str = req_json.get('business_domain', '')
+                        if bd_str:
+                            business_domains = [d.strip() for d in bd_str.split(',') if d.strip()]
+                            
+                        data_source = req_json.get('data_source', '')
+                        request_content_str = req_json.get('request_content', '')
+                        
+                        # 生成Business Domain DDLs
+                        domain_ddls = []
+                        if business_domains:
+                            try:
+                                with connect_graph().session() as session:
+                                    for domain in business_domains:
+                                        # 查询BusinessDomain节点及元数据
+                                        # 尝试匹配name, name_zh, name_en
+                                        cypher = """
+                                        MATCH (n:BusinessDomain)
+                                        WHERE n.name = $name OR n.name_zh = $name OR n.name_en = $name
+                                        OPTIONAL MATCH (n)-[:INCLUDES]->(m:DataMeta)
+                                        RETURN n, collect(m) as metadata
+                                        """
+                                        result = session.run(cypher, name=domain).single()
+                                        
+                                        if result:
+                                            node = result['n']
+                                            metadata = result['metadata']
+                                            
+                                            # 生成DDL
+                                            node_props = dict(node)
+                                            # 优先使用英文名作为表名,如果没有则使用拼音或原始名称
+                                            table_name = node_props.get('name_en', domain)
+                                            
+                                            ddl_lines = []
+                                            ddl_lines.append(f"CREATE TABLE {table_name} (")
+                                            
+                                            if metadata:
+                                                column_definitions = []
+                                                for meta in metadata:
+                                                    if meta:
+                                                        meta_props = dict(meta)
+                                                        column_name = meta_props.get('name_en', meta_props.get('name_zh', 'unknown_column'))
+                                                        data_type = meta_props.get('data_type', 'VARCHAR(255)')
+                                                        comment = meta_props.get('name_zh', '')
+                                                        
+                                                        column_def = f"    {column_name} {data_type}"
+                                                        if comment:
+                                                            column_def += f" COMMENT '{comment}'"
+                                                        column_definitions.append(column_def)
+                                                
+                                                if column_definitions:
+                                                    ddl_lines.append(",\n".join(column_definitions))
+                                                else:
+                                                    ddl_lines.append("    id BIGINT PRIMARY KEY COMMENT '主键ID'")
+                                            else:
+                                                ddl_lines.append("    id BIGINT PRIMARY KEY COMMENT '主键ID'")
+                                                
+                                            ddl_lines.append(");")
+                                            
+                                            table_comment = node_props.get('name_zh', node_props.get('describe', table_name))
+                                            if table_comment and table_comment != table_name:
+                                                ddl_lines.append(f"COMMENT ON TABLE {table_name} IS '{table_comment}';")
+                                                
+                                            domain_ddls.append("\n".join(ddl_lines))
+                            except Exception as neo_e:
+                                logger.error(f"获取BusinessDomain DDL失败: {str(neo_e)}")
+                        
+                        # 构建Markdown格式
+                        task_desc_parts = [f"# Task: {script_name}\n"]
+                        
+                        if data_source:
+                            task_desc_parts.append(f"## Data Source\n{data_source}\n")
+                        
+                        if domain_ddls:
+                            task_desc_parts.append("## Business Domain Structures (DDL)")
+                            for ddl in domain_ddls:
+                                task_desc_parts.append(f"```sql\n{ddl}\n```\n")
+                        
+                        task_desc_parts.append(f"## Request Content\n{request_content_str}\n")
+                        
+                        task_desc_parts.append("## Implementation Steps")
+                        task_desc_parts.append("1. Generate a Python program to implement the logic.")
+                        task_desc_parts.append("2. Generate an n8n workflow to schedule and execute the Python program.")
+                        
+                        task_description_md = "\n".join(task_desc_parts)
+                        
+                except Exception as parse_e:
+                    logger.warning(f"解析任务描述详情失败,使用原始描述: {str(parse_e)}")
+                    task_description_md = script_requirement
+
+                # 假设运行根目录为项目根目录,dataflows.py在app/core/data_flow/
+                code_path = 'app/core/data_flow'
+                
+                task_insert_sql = text("""
+                    INSERT INTO public.task_list 
+                    (task_name, task_description, status, code_name, code_path, create_by, create_time, update_time)
+                    VALUES 
+                    (:task_name, :task_description, :status, :code_name, :code_path, :create_by, :create_time, :update_time)
+                """)
+                
+                task_params = {
+                    'task_name': script_name,
+                    'task_description': task_description_md,
+                    'status': 'pending',
+                    'code_name': script_name,
+                    'code_path': code_path,
+                    'create_by': 'cursor',
+                    'create_time': current_time,
+                    'update_time': current_time
+                }
+                
+                # 使用嵌套事务,确保task_list插入失败不影响主流程
+                with db.session.begin_nested():
+                    db.session.execute(task_insert_sql, task_params)
+                
+                logger.info(f"成功将任务信息写入task_list表: task_name={script_name}")
+                
+            except Exception as task_error:
+                # 记录错误但不中断主流程
+                logger.error(f"写入task_list表失败: {str(task_error)}")
+                # 如果要求必须成功写入任务列表,则这里应该raise task_error
+                # raise task_error 
+            
             db.session.commit()
             
             logger.info(f"成功将脚本信息写入PG数据库: target_table={target_table}, script_name={script_name}")

+ 7 - 7
app/core/data_interface/interface.py

@@ -362,7 +362,7 @@ def id_label_graph(id):
     WITH 
        collect({from: toString(id(a)), to: toString(id(n)), text: "标签"}) AS line1,
        collect({id: toString(id(n)), text: n.name_zh, type:"label"}) AS node1,
-       collect({id: toString(id(a)), text: a.name, type: split(labels(a)[0], '_')[1]}) AS node2, n
+       collect({id: toString(id(a)), text: a.name_zh, type: split(labels(a)[0], '_')[1]}) AS node2, n
     WITH apoc.coll.toSet(line1) AS lines,
                  apoc.coll.toSet(node1 + node2) AS nodes,
                  toString(id(n)) AS res
@@ -410,12 +410,12 @@ def label_kinship_graph(nodeid):
     OPTIONAL MATCH(d:data_standard)-[:LABEL]-(la)
           OPTIONAL MATCH(e:DataMetric)-[:LABEL]-(la)
     WITH 
-        collect({id:toString(id(a)),text:a.name,type:split(labels(a)[0],'_')[1]})+
-        collect({id:toString(id(b)),text:b.name,type:split(labels(b)[0],'_')[1]})+
-        collect({id:toString(id(d)),text:d.name,type:split(labels(d)[0],'_')[1]})+
-        collect({id:toString(id(e)),text:e.name,type:split(labels(e)[0],'_')[1]})+
-        collect({id:toString(id(la)),text:la.name,type:split(labels(la)[0],'_')[1]})+
-        collect({id:toString(id(meta)),text:meta.name}) as nodes,la,
+        collect({id:toString(id(a)),text:a.name_zh,type:split(labels(a)[0],'_')[1]})+
+        collect({id:toString(id(b)),text:b.name_zh,type:split(labels(b)[0],'_')[1]})+
+        collect({id:toString(id(d)),text:d.name_zh,type:split(labels(d)[0],'_')[1]})+
+        collect({id:toString(id(e)),text:e.name_zh,type:split(labels(e)[0],'_')[1]})+
+        collect({id:toString(id(la)),text:la.name_zh,type:split(labels(la)[0],'_')[1]})+
+        collect({id:toString(id(meta)),text:meta.name_zh}) as nodes,la,
         collect({from:toString(id(a)),to:toString(id(la)),text:'标签'})+
         collect({from:toString(id(b)),to:toString(id(la)),text:'标签'})+
         collect({from:toString(id(meta)),to:toString(id(la)),text:'标签'})+