Parcourir la source

按照metric的操作逻辑,修改数据流API,记录图数据库节点及关系。记录PG数据库的表记录。

maxiaolong il y a 4 semaines
Parent
commit
a01229ab23
2 fichiers modifiés avec 357 ajouts et 225 suppressions
  1. 65 134
      app/api/data_flow/routes.py
  2. 292 91
      app/core/data_flow/dataflows.py

+ 65 - 134
app/api/data_flow/routes.py

@@ -3,6 +3,9 @@ from app.api.data_flow import bp
 from app.core.data_flow.dataflows import DataFlowService
 import logging
 from datetime import datetime
+import json
+from app.models.result import success, failed
+from app.core.graph.graph_operations import MyEncoder
 
 logger = logging.getLogger(__name__)
 
@@ -15,18 +18,12 @@ def get_dataflows():
         search = request.args.get('search', '')
         
         result = DataFlowService.get_dataflows(page=page, page_size=page_size, search=search)
-        return jsonify({
-            'code': 200,
-            'message': 'success',
-            'data': result
-        })
+        res = success(result, "success")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
     except Exception as e:
         logger.error(f"获取数据流列表失败: {str(e)}")
-        return jsonify({
-            'code': 500,
-            'message': f'获取数据流列表失败: {str(e)}',
-            'data': None
-        }), 500
+        res = failed(f'获取数据流列表失败: {str(e)}')
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
 
 @bp.route('/get-dataflow/<int:dataflow_id>', methods=['GET'])
 def get_dataflow(dataflow_id):
@@ -34,24 +31,15 @@ def get_dataflow(dataflow_id):
     try:
         result = DataFlowService.get_dataflow_by_id(dataflow_id)
         if result:
-            return jsonify({
-                'code': 200,
-                'message': 'success',
-                'data': result
-            })
+            res = success(result, "success")
+            return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
         else:
-            return jsonify({
-                'code': 404,
-                'message': '数据流不存在',
-                'data': None
-            }), 404
+            res = failed("数据流不存在", code=404)
+            return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
     except Exception as e:
         logger.error(f"获取数据流详情失败: {str(e)}")
-        return jsonify({
-            'code': 500,
-            'message': f'获取数据流详情失败: {str(e)}',
-            'data': None
-        }), 500
+        res = failed(f'获取数据流详情失败: {str(e)}')
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
 
 @bp.route('/add-dataflow', methods=['POST'])
 def create_dataflow():
@@ -59,25 +47,20 @@ def create_dataflow():
     try:
         data = request.get_json()
         if not data:
-            return jsonify({
-                'code': 400,
-                'message': '请求数据不能为空',
-                'data': None
-            }), 400
+            res = failed("请求数据不能为空", code=400)
+            return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
             
         result = DataFlowService.create_dataflow(data)
-        return jsonify({
-            'code': 200,
-            'message': '数据流创建成功',
-            'data': result
-        })
+        res = success(result, "数据流创建成功")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+    except ValueError as ve:
+        logger.error(f"创建数据流参数错误: {str(ve)}")
+        res = failed(f'参数错误: {str(ve)}', code=400)
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
     except Exception as e:
         logger.error(f"创建数据流失败: {str(e)}")
-        return jsonify({
-            'code': 500,
-            'message': f'创建数据流失败: {str(e)}',
-            'data': None
-        }), 500
+        res = failed(f'创建数据流失败: {str(e)}')
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
 
 @bp.route('/update-dataflow/<int:dataflow_id>', methods=['PUT'])
 def update_dataflow(dataflow_id):
@@ -85,32 +68,20 @@ def update_dataflow(dataflow_id):
     try:
         data = request.get_json()
         if not data:
-            return jsonify({
-                'code': 400,
-                'message': '请求数据不能为空',
-                'data': None
-            }), 400
+            res = failed("请求数据不能为空", code=400)
+            return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
             
         result = DataFlowService.update_dataflow(dataflow_id, data)
         if result:
-            return jsonify({
-                'code': 200,
-                'message': '数据流更新成功',
-                'data': result
-            })
+            res = success(result, "数据流更新成功")
+            return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
         else:
-            return jsonify({
-                'code': 404,
-                'message': '数据流不存在',
-                'data': None
-            }), 404
+            res = failed("数据流不存在", code=404)
+            return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
     except Exception as e:
         logger.error(f"更新数据流失败: {str(e)}")
-        return jsonify({
-            'code': 500,
-            'message': f'更新数据流失败: {str(e)}',
-            'data': None
-        }), 500
+        res = failed(f'更新数据流失败: {str(e)}')
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
 
 @bp.route('/delete-dataflow/<int:dataflow_id>', methods=['DELETE'])
 def delete_dataflow(dataflow_id):
@@ -118,24 +89,15 @@ def delete_dataflow(dataflow_id):
     try:
         result = DataFlowService.delete_dataflow(dataflow_id)
         if result:
-            return jsonify({
-                'code': 200,
-                'message': '数据流删除成功',
-                'data': None
-            })
+            res = success({}, "数据流删除成功")
+            return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
         else:
-            return jsonify({
-                'code': 404,
-                'message': '数据流不存在',
-                'data': None
-            }), 404
+            res = failed("数据流不存在", code=404)
+            return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
     except Exception as e:
         logger.error(f"删除数据流失败: {str(e)}")
-        return jsonify({
-            'code': 500,
-            'message': f'删除数据流失败: {str(e)}',
-            'data': None
-        }), 500
+        res = failed(f'删除数据流失败: {str(e)}')
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
 
 @bp.route('/execute-dataflow/<int:dataflow_id>', methods=['POST'])
 def execute_dataflow(dataflow_id):
@@ -143,36 +105,24 @@ def execute_dataflow(dataflow_id):
     try:
         data = request.get_json() or {}
         result = DataFlowService.execute_dataflow(dataflow_id, data)
-        return jsonify({
-            'code': 200,
-            'message': '数据流执行成功',
-            'data': result
-        })
+        res = success(result, "数据流执行成功")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
     except Exception as e:
         logger.error(f"执行数据流失败: {str(e)}")
-        return jsonify({
-            'code': 500,
-            'message': f'执行数据流失败: {str(e)}',
-            'data': None
-        }), 500
+        res = failed(f'执行数据流失败: {str(e)}')
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
 
 @bp.route('/get-dataflow-status/<int:dataflow_id>', methods=['GET'])
 def get_dataflow_status(dataflow_id):
     """获取数据流执行状态"""
     try:
         result = DataFlowService.get_dataflow_status(dataflow_id)
-        return jsonify({
-            'code': 200,
-            'message': 'success',
-            'data': result
-        })
+        res = success(result, "success")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
     except Exception as e:
         logger.error(f"获取数据流状态失败: {str(e)}")
-        return jsonify({
-            'code': 500,
-            'message': f'获取数据流状态失败: {str(e)}',
-            'data': None
-        }), 500
+        res = failed(f'获取数据流状态失败: {str(e)}')
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
 
 @bp.route('/get-dataflow-logs/<int:dataflow_id>', methods=['GET'])
 def get_dataflow_logs(dataflow_id):
@@ -182,18 +132,12 @@ def get_dataflow_logs(dataflow_id):
         page_size = request.args.get('page_size', 50, type=int)
         
         result = DataFlowService.get_dataflow_logs(dataflow_id, page=page, page_size=page_size)
-        return jsonify({
-            'code': 200,
-            'message': 'success',
-            'data': result
-        })
+        res = success(result, "success")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
     except Exception as e:
         logger.error(f"获取数据流日志失败: {str(e)}")
-        return jsonify({
-            'code': 500,
-            'message': f'获取数据流日志失败: {str(e)}',
-            'data': None
-        }), 500
+        res = failed(f'获取数据流日志失败: {str(e)}')
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
 
 @bp.route('/create-script', methods=['POST'])
 def create_script():
@@ -201,45 +145,32 @@ def create_script():
     try:
         json_data = request.get_json()
         if not json_data:
-            return jsonify({
-                'code': 400,
-                'message': '请求数据不能为空',
-                'data': None
-            }), 400
+            res = failed("请求数据不能为空", code=400)
+            return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
         
         # 提取文本描述
         request_data = json_data.get('request_data')
         
         if not request_data or not isinstance(request_data, str):
-            return jsonify({
-                'code': 400,
-                'message': '请求数据必须是文本描述',
-                'data': None
-            }), 400
+            res = failed("请求数据必须是文本描述", code=400)
+            return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
         
         # 调用DataFlowService的create_script方法
         script_content = DataFlowService.create_script(request_data)
         
-        return jsonify({
-            'code': 200,
-            'message': '脚本生成成功',
-            'data': {
-                'script_content': script_content,
-                'format': 'txt',
-                'generated_at': datetime.now().isoformat()
-            }
-        })
+        result_data = {
+            'script_content': script_content,
+            'format': 'txt',
+            'generated_at': datetime.now().isoformat()
+        }
+        
+        res = success(result_data, "脚本生成成功")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
     except ValueError as ve:
         logger.error(f"脚本生成参数错误: {str(ve)}")
-        return jsonify({
-            'code': 400,
-            'message': f'参数错误: {str(ve)}',
-            'data': None
-        }), 400
+        res = failed(f'参数错误: {str(ve)}', code=400)
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
     except Exception as e:
         logger.error(f"脚本生成失败: {str(e)}")
-        return jsonify({
-            'code': 500,
-            'message': f'脚本生成失败: {str(e)}',
-            'data': None
-        }), 500 
+        res = failed(f'脚本生成失败: {str(e)}')
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder) 

+ 292 - 91
app/core/data_flow/dataflows.py

@@ -3,6 +3,12 @@ from typing import Dict, List, Optional, Any
 from datetime import datetime
 import json
 from app.core.llm.llm_service import llm_client
+from app.core.graph.graph_operations import connect_graph, create_or_get_node, get_node, relationship_exists
+from app.core.meta_data import translate_and_parse
+from app.core.common.functions import get_formatted_time
+from py2neo import Relationship
+from app import db
+from sqlalchemy import text
 
 logger = logging.getLogger(__name__)
 
@@ -23,33 +29,45 @@ class DataFlowService:
             包含数据流列表和分页信息的字典
         """
         try:
-            # TODO: 这里应该从数据库查询数据流列表
-            # 目前返回模拟数据
-            mock_dataflows = [
-                {
-                    'id': i,
-                    'name': f'数据流_{i}',
-                    'description': f'这是第{i}个数据流的描述',
-                    'status': 'active' if i % 2 == 0 else 'inactive',
-                    'created_at': datetime.now().isoformat(),
-                    'updated_at': datetime.now().isoformat(),
-                    'created_by': f'user_{i % 3 + 1}'
-                }
-                for i in range(1, 21)
-            ]
+            # 从图数据库查询数据流列表
+            skip_count = (page - 1) * page_size
             
-            # 简单的搜索过滤
-            if search:
-                mock_dataflows = [
-                    df for df in mock_dataflows 
-                    if search.lower() in df['name'].lower() or search.lower() in df['description'].lower()
-                ]
+            # 构建搜索条件
+            where_clause = ""
+            params = {'skip': skip_count, 'limit': page_size}
             
-            # 分页处理
-            total = len(mock_dataflows)
-            start = (page - 1) * page_size
-            end = start + page_size
-            dataflows = mock_dataflows[start:end]
+            if search:
+                where_clause = "WHERE n.name CONTAINS $search OR n.description CONTAINS $search"
+                params['search'] = search
+            
+            # 查询数据流列表
+            query = f"""
+            MATCH (n:DataFlow)
+            {where_clause}
+            RETURN n
+            ORDER BY n.created_at DESC
+            SKIP $skip
+            LIMIT $limit
+            """
+            
+            list_result = connect_graph.run(query, **params).data()
+            
+            # 查询总数
+            count_query = f"""
+            MATCH (n:DataFlow)
+            {where_clause}
+            RETURN count(n) as total
+            """
+            count_params = {'search': search} if search else {}
+            total = connect_graph.run(count_query, **count_params).evaluate() or 0
+            
+            # 格式化结果
+            dataflows = []
+            for record in list_result:
+                node = record['n']
+                dataflow = dict(node)
+                dataflow['id'] = node.identity
+                dataflows.append(dataflow)
             
             return {
                 'list': dataflows,
@@ -76,34 +94,32 @@ class DataFlowService:
             数据流详情字典,如果不存在则返回None
         """
         try:
-            # TODO: 这里应该从数据库查询指定的数据流
-            # 目前返回模拟数据
-            if dataflow_id <= 0 or dataflow_id > 20:
+            query = """
+            MATCH (n:DataFlow)
+            WHERE id(n) = $dataflow_id
+            OPTIONAL MATCH (n)-[:label]-(la:data_label)
+            OPTIONAL MATCH (n)-[:child]-(child)
+            OPTIONAL MATCH (parent)-[:child]-(n)
+            RETURN n, 
+                   collect(DISTINCT {id: id(la), name: la.name}) as tags,
+                   collect(DISTINCT {id: id(child), name: child.name}) as children,
+                   collect(DISTINCT {id: id(parent), name: parent.name}) as parents
+            """
+            
+            result = connect_graph.run(query, dataflow_id=dataflow_id).data()
+            
+            if not result:
                 return None
-                
-            return {
-                'id': dataflow_id,
-                'name': f'数据流_{dataflow_id}',
-                'description': f'这是第{dataflow_id}个数据流的详细描述',
-                'status': 'active' if dataflow_id % 2 == 0 else 'inactive',
-                'created_at': datetime.now().isoformat(),
-                'updated_at': datetime.now().isoformat(),
-                'created_by': f'user_{dataflow_id % 3 + 1}',
-                'config': {
-                    'source': {
-                        'type': 'database',
-                        'connection': 'mysql://localhost:3306/test'
-                    },
-                    'target': {
-                        'type': 'file',
-                        'path': '/data/output/'
-                    },
-                    'transformations': [
-                        {'type': 'filter', 'condition': 'age > 18'},
-                        {'type': 'aggregate', 'group_by': 'department', 'function': 'count'}
-                    ]
-                }
-            }
+            
+            record = result[0]
+            node = record['n']
+            dataflow = dict(node)
+            dataflow['id'] = node.identity
+            dataflow['tags'] = record['tags']
+            dataflow['children'] = record['children']
+            dataflow['parents'] = record['parents']
+            
+            return dataflow
         except Exception as e:
             logger.error(f"获取数据流详情失败: {str(e)}")
             raise e
@@ -120,30 +136,175 @@ class DataFlowService:
             创建的数据流信息
         """
         try:
-            # TODO: 这里应该验证数据并保存到数据库
-            # 目前返回模拟数据
+            # 验证必填字段
             required_fields = ['name', 'description']
             for field in required_fields:
                 if field not in data:
                     raise ValueError(f"缺少必填字段: {field}")
             
-            new_dataflow = {
-                'id': 21,  # 模拟新生成的ID
-                'name': data['name'],
-                'description': data['description'],
-                'status': 'inactive',
-                'created_at': datetime.now().isoformat(),
-                'updated_at': datetime.now().isoformat(),
-                'created_by': 'current_user',
-                'config': data.get('config', {})
+            dataflow_name = data['name']
+            
+            # 使用LLM翻译名称生成英文名
+            try:
+                result_list = translate_and_parse(dataflow_name)
+                en_name = result_list[0] if result_list else dataflow_name.lower().replace(' ', '_')
+            except Exception as e:
+                logger.warning(f"翻译失败,使用默认英文名: {str(e)}")
+                en_name = dataflow_name.lower().replace(' ', '_')
+            
+            # 准备节点数据
+            node_data = {
+                'name': dataflow_name,
+                'en_name': en_name,
+                'description': data.get('description', ''),
+                'status': data.get('status', 'inactive'),
+                'created_at': get_formatted_time(),
+                'updated_at': get_formatted_time(),
+                'created_by': data.get('created_by', 'system'),
+                'config': json.dumps(data.get('config', {}), ensure_ascii=False)
             }
             
-            logger.info(f"创建数据流成功: {new_dataflow['name']}")
-            return new_dataflow
+            # 创建或获取数据流节点
+            dataflow_node = get_node('DataFlow', name=dataflow_name)
+            if dataflow_node:
+                raise ValueError(f"数据流 '{dataflow_name}' 已存在")
+            
+            dataflow_node = create_or_get_node('DataFlow', **node_data)
+            
+            # 处理子节点关系
+            if data.get('children_ids'):
+                DataFlowService._handle_children_relationships(dataflow_node, data['children_ids'])
+            
+            # 处理标签关系
+            if data.get('tag_id'):
+                DataFlowService._handle_tag_relationship(dataflow_node, data['tag_id'])
+            
+            # 成功创建图数据库节点后,写入PG数据库
+            try:
+                DataFlowService._save_to_pg_database(data, dataflow_name, en_name)
+                logger.info(f"数据流信息已写入PG数据库: {dataflow_name}")
+            except Exception as pg_error:
+                logger.error(f"写入PG数据库失败: {str(pg_error)}")
+                # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据
+                # 在实际应用中,可能需要考虑分布式事务
+                
+            # 返回创建的数据流信息
+            result = dict(dataflow_node)
+            result['id'] = dataflow_node.identity
+            
+            logger.info(f"创建数据流成功: {dataflow_name}")
+            return result
+            
         except Exception as e:
             logger.error(f"创建数据流失败: {str(e)}")
             raise e
     
+    @staticmethod
+    def _save_to_pg_database(data: Dict[str, Any], script_name: str, en_name: str):
+        """
+        将脚本信息保存到PG数据库
+        
+        Args:
+            data: 包含脚本信息的数据
+            script_name: 脚本名称
+            en_name: 英文名称
+        """
+        try:
+            # 提取脚本相关信息
+            script_requirement = data.get('script_requirement', '')
+            script_content = data.get('script_content', '')
+            source_table = data.get('source_table', '')
+            target_table = data.get('target_table', en_name)  # 如果没有指定目标表,使用英文名
+            script_type = data.get('script_type', 'python')
+            user_name = data.get('created_by', 'system')
+            target_dt_column = data.get('target_dt_column', '')
+            
+            # 验证必需字段
+            if not target_table:
+                target_table = en_name
+            if not script_name:
+                raise ValueError("script_name不能为空")
+            
+            # 构建插入SQL
+            insert_sql = text("""
+                INSERT INTO dags.data_transform_scripts 
+                (source_table, target_table, script_name, script_type, script_requirement, 
+                 script_content, user_name, create_time, update_time, target_dt_column)
+                VALUES 
+                (:source_table, :target_table, :script_name, :script_type, :script_requirement,
+                 :script_content, :user_name, :create_time, :update_time, :target_dt_column)
+                ON CONFLICT (target_table, script_name) 
+                DO UPDATE SET 
+                    source_table = EXCLUDED.source_table,
+                    script_type = EXCLUDED.script_type,
+                    script_requirement = EXCLUDED.script_requirement,
+                    script_content = EXCLUDED.script_content,
+                    user_name = EXCLUDED.user_name,
+                    update_time = EXCLUDED.update_time,
+                    target_dt_column = EXCLUDED.target_dt_column
+            """)
+            
+            # 准备参数
+            current_time = datetime.now()
+            params = {
+                'source_table': source_table,
+                'target_table': target_table,
+                'script_name': script_name,
+                'script_type': script_type,
+                'script_requirement': script_requirement,
+                'script_content': script_content,
+                'user_name': user_name,
+                'create_time': current_time,
+                'update_time': current_time,
+                'target_dt_column': target_dt_column
+            }
+            
+            # 执行插入操作
+            db.session.execute(insert_sql, params)
+            db.session.commit()
+            
+            logger.info(f"成功将脚本信息写入PG数据库: target_table={target_table}, script_name={script_name}")
+            
+        except Exception as e:
+            db.session.rollback()
+            logger.error(f"写入PG数据库失败: {str(e)}")
+            raise e
+    
+    @staticmethod
+    def _handle_children_relationships(dataflow_node, children_ids):
+        """处理子节点关系"""
+        for child_id in children_ids:
+            try:
+                # 查找子节点
+                query = "MATCH (n) WHERE id(n) = $child_id RETURN n"
+                result = connect_graph.run(query, child_id=child_id).data()
+                
+                if result:
+                    child_node = result[0]['n']
+                    # 创建关系
+                    if not relationship_exists(dataflow_node, 'child', child_node):
+                        connect_graph.create(Relationship(dataflow_node, 'child', child_node))
+                        logger.info(f"创建子节点关系: {dataflow_node.identity} -> {child_id}")
+            except Exception as e:
+                logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
+    
+    @staticmethod
+    def _handle_tag_relationship(dataflow_node, tag_id):
+        """处理标签关系"""
+        try:
+            # 查找标签节点
+            query = "MATCH (n:data_label) WHERE id(n) = $tag_id RETURN n"
+            result = connect_graph.run(query, tag_id=tag_id).data()
+            
+            if result:
+                tag_node = result[0]['n']
+                # 创建关系
+                if not relationship_exists(dataflow_node, 'label', tag_node):
+                    connect_graph.create(Relationship(dataflow_node, 'label', tag_node))
+                    logger.info(f"创建标签关系: {dataflow_node.identity} -> {tag_id}")
+        except Exception as e:
+            logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
+    
     @staticmethod
     def update_dataflow(dataflow_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
         """
@@ -157,24 +318,46 @@ class DataFlowService:
             更新后的数据流信息,如果不存在则返回None
         """
         try:
-            # TODO: 这里应该更新数据库中的数据流
-            # 目前返回模拟数据
-            if dataflow_id <= 0 or dataflow_id > 20:
+            # 查找节点
+            query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
+            result = connect_graph.run(query, dataflow_id=dataflow_id).data()
+            
+            if not result:
                 return None
             
-            updated_dataflow = {
-                'id': dataflow_id,
-                'name': data.get('name', f'数据流_{dataflow_id}'),
-                'description': data.get('description', f'这是第{dataflow_id}个数据流的描述'),
-                'status': data.get('status', 'active' if dataflow_id % 2 == 0 else 'inactive'),
-                'created_at': datetime.now().isoformat(),
-                'updated_at': datetime.now().isoformat(),
-                'created_by': f'user_{dataflow_id % 3 + 1}',
-                'config': data.get('config', {})
-            }
+            # 更新节点属性
+            update_fields = []
+            params = {'dataflow_id': dataflow_id}
+            
+            for key, value in data.items():
+                if key not in ['id', 'created_at']:  # 保护字段
+                    if key == 'config' and isinstance(value, dict):
+                        value = json.dumps(value, ensure_ascii=False)
+                    update_fields.append(f"n.{key} = ${key}")
+                    params[key] = value
+            
+            if update_fields:
+                params['updated_at'] = get_formatted_time()
+                update_fields.append("n.updated_at = $updated_at")
+                
+                update_query = f"""
+                MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
+                SET {', '.join(update_fields)}
+                RETURN n
+                """
+                
+                result = connect_graph.run(update_query, **params).data()
+                
+                if result:
+                    node = result[0]['n']
+                    updated_dataflow = dict(node)
+                    updated_dataflow['id'] = node.identity
+                    
+                    logger.info(f"更新数据流成功: ID={dataflow_id}")
+                    return updated_dataflow
+            
+            return None
             
-            logger.info(f"更新数据流成功: ID={dataflow_id}")
-            return updated_dataflow
         except Exception as e:
             logger.error(f"更新数据流失败: {str(e)}")
             raise e
@@ -191,13 +374,21 @@ class DataFlowService:
             删除是否成功
         """
         try:
-            # TODO: 这里应该从数据库删除数据流
-            # 目前返回模拟结果
-            if dataflow_id <= 0 or dataflow_id > 20:
-                return False
+            # 删除节点及其关系
+            query = """
+            MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
+            DETACH DELETE n
+            RETURN count(n) as deleted_count
+            """
+            
+            result = connect_graph.run(query, dataflow_id=dataflow_id).evaluate()
+            
+            if result and result > 0:
+                logger.info(f"删除数据流成功: ID={dataflow_id}")
+                return True
+            
+            return False
             
-            logger.info(f"删除数据流成功: ID={dataflow_id}")
-            return True
         except Exception as e:
             logger.error(f"删除数据流失败: {str(e)}")
             raise e
@@ -215,13 +406,17 @@ class DataFlowService:
             执行结果信息
         """
         try:
-            # TODO: 这里应该实际执行数据流
-            # 目前返回模拟结果
-            if dataflow_id <= 0 or dataflow_id > 20:
+            # 检查数据流是否存在
+            query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
+            result = connect_graph.run(query, dataflow_id=dataflow_id).data()
+            
+            if not result:
                 raise ValueError(f"数据流不存在: ID={dataflow_id}")
             
             execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
             
+            # TODO: 这里应该实际执行数据流
+            # 目前返回模拟结果
             result = {
                 'execution_id': execution_id,
                 'dataflow_id': dataflow_id,
@@ -251,7 +446,10 @@ class DataFlowService:
         try:
             # TODO: 这里应该查询实际的执行状态
             # 目前返回模拟状态
-            if dataflow_id <= 0 or dataflow_id > 20:
+            query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
+            result = connect_graph.run(query, dataflow_id=dataflow_id).data()
+            
+            if not result:
                 raise ValueError(f"数据流不存在: ID={dataflow_id}")
             
             status = ['running', 'completed', 'failed', 'pending'][dataflow_id % 4]
@@ -284,7 +482,10 @@ class DataFlowService:
         try:
             # TODO: 这里应该查询实际的执行日志
             # 目前返回模拟日志
-            if dataflow_id <= 0 or dataflow_id > 20:
+            query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
+            result = connect_graph.run(query, dataflow_id=dataflow_id).data()
+            
+            if not result:
                 raise ValueError(f"数据流不存在: ID={dataflow_id}")
             
             mock_logs = [