Jelajahi Sumber

task-manager 修复bug,更新功能为自动完成任务。

maxiaolong 2 hari lalu
induk
melakukan
9c1b50b163

+ 7 - 2
app/api/data_model/routes.py

@@ -57,7 +57,8 @@ def data_model_save():
     # 传入请求参数
     receiver = request.get_json()
     data_model = receiver['name_zh']
-    id_list = receiver['id_list']
+    # DDL新增时,id_list(包含resource_id)不是必填项
+    id_list = receiver.get('id_list', [])
     data_source = receiver.get('data_source')  # 获取data_source节点ID
     
     # resource_id和meta_id构成json格式
@@ -66,7 +67,11 @@ def data_model_save():
         # 从DDL中选取保存数据模型(支持data_source参数)
         result_list = [receiver['name_en']]
         id, data_model_node = model_functions.handle_data_model(data_model, result_list, result, receiver)
-        model_functions.handle_no_meta_data_model(id_list, receiver, data_model_node)
+        
+        # 只有在id_list不为空时才处理资源关系
+        if id_list:
+            model_functions.handle_no_meta_data_model(id_list, receiver, data_model_node)
+        
         model_functions.calculate_model_level(id)
 
         # 查询节点的实际属性(data_model_node 可能只是整数ID)

TEMPAT SAMPAH
app/app.rar


+ 29 - 21
app/core/data_flow/dataflows.py

@@ -33,7 +33,7 @@ class DataFlowService:
             
             # 构建搜索条件
             where_clause = ""
-            params = {'skip': skip_count, 'limit': page_size}
+            params: Dict[str, Union[int, str]] = {'skip': skip_count, 'limit': page_size}
             
             if search:
                 where_clause = "WHERE n.name_zh CONTAINS $search OR n.description CONTAINS $search"
@@ -52,7 +52,7 @@ class DataFlowService:
             # 获取Neo4j驱动(如果连接失败会抛出ConnectionError异常)
             try:
                 with connect_graph().session() as session:
-                    list_result = session.run(query, **params).data()
+                    list_result = session.run(query, params).data()
                     
                     # 查询总数
                     count_query = f"""
@@ -61,7 +61,7 @@ class DataFlowService:
                     RETURN count(n) as total
                     """
                     count_params = {'search': search} if search else {}
-                    count_result = session.run(count_query, **count_params).single()
+                    count_result = session.run(count_query, count_params).single()
                     total = count_result['total'] if count_result else 0
             except Exception as e:
                 # 确保 driver 被正确关闭,避免资源泄漏 - 这里不再需要手动关闭driver,因为connect_graph返回的可能是单例或新实例,
@@ -284,8 +284,14 @@ class DataFlowService:
             # 提取脚本相关信息
             script_requirement = data.get('script_requirement', '')
             script_content = data.get('script_content', '')
-            source_table = data.get('source_table', '').split(':')[-1] if ':' in data.get('source_table', '') else data.get('source_table', '')
-            target_table = data.get('target_table', '').split(':')[-1] if ':' in data.get('target_table', '') else data.get('target_table', name_en)  # 如果没有指定目标表,使用英文名
+            
+            # 安全处理 source_table 和 target_table(避免 None 值导致的 'in' 操作错误)
+            source_table_raw = data.get('source_table') or ''
+            source_table = source_table_raw.split(':')[-1] if ':' in source_table_raw else source_table_raw
+            
+            target_table_raw = data.get('target_table') or ''
+            target_table = target_table_raw.split(':')[-1] if ':' in target_table_raw else (target_table_raw or name_en)
+            
             script_type = data.get('script_type', 'python')
             user_name = data.get('created_by', 'system')
             target_dt_column = data.get('target_dt_column', '')
@@ -563,7 +569,7 @@ class DataFlowService:
                 
                 # 更新节点属性
                 update_fields = []
-                params = {'dataflow_id': dataflow_id}
+                params: Dict[str, Any] = {'dataflow_id': dataflow_id}
                 
                 for key, value in data.items():
                     if key not in ['id', 'created_at']:  # 保护字段
@@ -582,7 +588,7 @@ class DataFlowService:
                     RETURN n, id(n) as node_id
                     """
                     
-                    result = session.run(update_query, **params).data()
+                    result = session.run(update_query, params).data()
                     
                     if result:
                         node = result[0]['n']
@@ -632,7 +638,7 @@ class DataFlowService:
             raise e
     
     @staticmethod
-    def execute_dataflow(dataflow_id: int, params: Dict[str, Any] = None) -> Dict[str, Any]:
+    def execute_dataflow(dataflow_id: int, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
         """
         执行数据流
         
@@ -926,7 +932,7 @@ class DataFlowService:
                 RETURN n, collect(m) as metadata
                 """
                 
-                result = session.run(cypher, name_en=name_en)
+                result = session.run(cypher, {'name_en': name_en})  # type: ignore[arg-type]
                 record = result.single()
                 
                 if not record:
@@ -1028,10 +1034,11 @@ class DataFlowService:
                 """
                 
                 # 执行创建节点的查询
-                result = session.run(create_nodes_query,
-                                   source_table=source_table,
-                                   target_table=target_table,
-                                   created_at=get_formatted_time()).single()
+                result = session.run(create_nodes_query, {  # type: ignore[arg-type]
+                    'source_table': source_table,
+                    'target_table': target_table,
+                    'created_at': get_formatted_time()
+                }).single()
                 
                 if result:
                     source_id = result['source_id']
@@ -1052,14 +1059,15 @@ class DataFlowService:
                     RETURN r
                     """
                     
-                    relationship_result = session.run(create_relationship_query,
-                                                   source_id=source_id,
-                                                   target_id=target_id,
-                                                   script_name=script_name,
-                                                   script_type=script_type,
-                                                   schedule_status=schedule_status,
-                                                   update_mode=update_mode,
-                                                   created_at=get_formatted_time()).single()
+                    relationship_result = session.run(create_relationship_query, {  # type: ignore[arg-type]
+                        'source_id': source_id,
+                        'target_id': target_id,
+                        'script_name': script_name,
+                        'script_type': script_type,
+                        'schedule_status': schedule_status,
+                        'update_mode': update_mode,
+                        'created_at': get_formatted_time()
+                    }).single()
                     
                     if relationship_result:
                         logger.info(f"成功创建DERIVED_FROM关系: {target_table} -> {source_table} (script: {script_name})")

+ 29 - 15
app/core/data_model/model.py

@@ -407,23 +407,32 @@ def handle_no_meta_data_model(id_lists, receiver, data_model_node):
     处理从DDL中选取的没有元数据的数据模型
     
     Args:
-        id_lists: ID列表
+        id_lists: ID列表(可以为空)
         receiver: 接收到的请求参数
         data_model_node: 数据模型节点
         
     Returns:
         None
     """
+    # DDL新增时,id_lists可能为空,提前返回
+    if not id_lists:
+        logger.info("id_lists为空,跳过资源关系处理")
+        return
+    
     # 构建meta_id和resouce_id的列表
-    resouce_ids = [record['resource_id'] for record in id_lists]
-    meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
+    resouce_ids = [record['resource_id'] for record in id_lists if 'resource_id' in record]
+    meta_ids = [record['id'] for id_list in id_lists for record in id_list.get('metaData', []) if 'id' in record]
     
     # 获取数据模型节点ID
     data_model_node_id = None
     if hasattr(data_model_node, 'id'):
+        # data_model_node 是节点对象
         data_model_node_id = data_model_node.id
-    else:
-        # 如果节点没有id属性,尝试通过查询获取
+    elif isinstance(data_model_node, int):
+        # data_model_node 直接就是整数ID
+        data_model_node_id = data_model_node
+    elif isinstance(data_model_node, dict):
+        # data_model_node 是字典,尝试通过name_zh查询
         query = """
         MATCH (n:DataModel {name_zh: $name_zh})
         RETURN id(n) as node_id
@@ -433,6 +442,9 @@ def handle_no_meta_data_model(id_lists, receiver, data_model_node):
             record = result.single()
             if record:
                 data_model_node_id = record["node_id"]
+    else:
+        # 未知类型,记录警告
+        logger.warning(f"data_model_node类型未知: {type(data_model_node)}, 值: {data_model_node}")
     
     if not data_model_node_id:
         return
@@ -457,20 +469,22 @@ def handle_no_meta_data_model(id_lists, receiver, data_model_node):
             """
             with connect_graph().session() as session:
                 result = session.run(query, node_id=id)
-            if result:
-                record = result.data()
-                if record:
-                    meta_node_list.append(record[0]['n'])
+                # 必须在 session 作用域内处理结果
+                if result:
+                    record = result.data()
+                    if record:
+                        meta_node_list.append(record[0]['n'])
         
         # 提取接收到的数据并创建meta_node节点
         meta_node = None
         resource_ids = []
         
         for item in id_lists:
-            resource_id = item['resource_id']
-            resource_ids.append(resource_id)
+            resource_id = item.get('resource_id')
+            if resource_id:
+                resource_ids.append(resource_id)
             
-            for meta_item in item['metaData']:
+            for meta_item in item.get('metaData', []):
                 meta_id = meta_item['id']
                 data_standard = meta_item.get('data_standard', '')
                 name_en = meta_item.get('name_en', '')
@@ -494,7 +508,7 @@ def handle_no_meta_data_model(id_lists, receiver, data_model_node):
                     # 直接使用Cypher查询检查关系是否存在
                     with connect_graph().session() as session:
                         rel_query = """
-                        MATCH (a)-[r:component]->(b)
+                        MATCH (a)-[r:INCLUDES]->(b)
                         WHERE id(a) = $start_id AND id(b) = $end_id
                         RETURN count(r) > 0 as exists
                         """
@@ -502,11 +516,11 @@ def handle_no_meta_data_model(id_lists, receiver, data_model_node):
                                                start_id=int(dm_id), 
                                                end_id=int(meta_node)).single()
                         
-                        # 如果关系不存在,则创建关系
+                        # 如果关系不存在,则创建INCLUDES关系
                         if not (rel_result and rel_result["exists"]):
                             session.execute_write(
                                 lambda tx: tx.run(
-                                    "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:component]->(b)",
+                                    "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:INCLUDES]->(b)",
                                     a_id=int(dm_id), b_id=int(meta_node)
                                 )
                             )

+ 228 - 0
app/core/data_processing/data_cleaner.py

@@ -0,0 +1,228 @@
+"""
+数据清洗工具模块
+
+提供通用的数据清洗和标准化功能
+"""
+
+import pandas as pd
+import numpy as np
+from typing import Union, List, Any
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class DataCleaner:
+    """
+    数据清洗工具类
+    
+    提供数据清洗、去重、类型转换、异常值检测等功能
+    """
+    
+    def __init__(self):
+        """初始化数据清洗工具"""
+        logger.info("DataCleaner initialized")
+    
+    def remove_nulls(self, df: pd.DataFrame, columns: List[str] = None, how: str = 'any') -> pd.DataFrame:
+        """
+        去除空值
+        
+        Args:
+            df: 输入的DataFrame
+            columns: 需要检查的列名列表,None表示检查所有列
+            how: 'any'表示只要有一个空值就删除行,'all'表示所有值都为空才删除行
+            
+        Returns:
+            清理后的DataFrame
+        """
+        logger.info(f"Removing null values from dataframe, shape before: {df.shape}")
+        
+        if columns:
+            result = df.dropna(subset=columns, how=how)
+        else:
+            result = df.dropna(how=how)
+        
+        logger.info(f"Shape after removing nulls: {result.shape}")
+        return result
+    
+    def remove_duplicates(self, df: pd.DataFrame, columns: List[str] = None, keep: str = 'first') -> pd.DataFrame:
+        """
+        去除重复数据
+        
+        Args:
+            df: 输入的DataFrame
+            columns: 用于判断重复的列名列表,None表示使用所有列
+            keep: 'first'保留第一个,'last'保留最后一个,False删除所有重复
+            
+        Returns:
+            去重后的DataFrame
+        """
+        logger.info(f"Removing duplicates from dataframe, shape before: {df.shape}")
+        
+        result = df.drop_duplicates(subset=columns, keep=keep)
+        
+        logger.info(f"Shape after removing duplicates: {result.shape}")
+        return result
+    
+    def convert_types(self, df: pd.DataFrame, type_mapping: dict) -> pd.DataFrame:
+        """
+        数据类型转换
+        
+        Args:
+            df: 输入的DataFrame
+            type_mapping: 列名到目标类型的映射,例如 {'age': int, 'price': float}
+            
+        Returns:
+            类型转换后的DataFrame
+        """
+        logger.info(f"Converting data types for columns: {list(type_mapping.keys())}")
+        
+        result = df.copy()
+        
+        for col, dtype in type_mapping.items():
+            if col in result.columns:
+                try:
+                    result[col] = result[col].astype(dtype)
+                    logger.info(f"Column '{col}' converted to {dtype}")
+                except Exception as e:
+                    logger.error(f"Failed to convert column '{col}' to {dtype}: {str(e)}")
+                    raise
+            else:
+                logger.warning(f"Column '{col}' not found in dataframe")
+        
+        return result
+    
+    def detect_outliers(self, df: pd.DataFrame, column: str, method: str = 'iqr', 
+                       threshold: float = 1.5) -> pd.Series:
+        """
+        异常值检测
+        
+        Args:
+            df: 输入的DataFrame
+            column: 需要检测的列名
+            method: 检测方法,'iqr'(四分位距)或'zscore'(标准分数)
+            threshold: 阈值,IQR方法默认1.5,Z-score方法默认3
+            
+        Returns:
+            布尔Series,True表示异常值
+        """
+        logger.info(f"Detecting outliers in column '{column}' using {method} method")
+        
+        if column not in df.columns:
+            raise ValueError(f"Column '{column}' not found in dataframe")
+        
+        data = df[column]
+        
+        if method == 'iqr':
+            # 使用四分位距方法
+            Q1 = data.quantile(0.25)
+            Q3 = data.quantile(0.75)
+            IQR = Q3 - Q1
+            
+            lower_bound = Q1 - threshold * IQR
+            upper_bound = Q3 + threshold * IQR
+            
+            outliers = (data < lower_bound) | (data > upper_bound)
+            
+        elif method == 'zscore':
+            # 使用Z-score方法
+            if threshold == 1.5:  # 如果使用默认IQR阈值,改为Z-score默认阈值
+                threshold = 3
+            
+            mean = data.mean()
+            std = data.std()
+            
+            if std == 0:
+                logger.warning(f"Standard deviation is 0 for column '{column}'")
+                return pd.Series([False] * len(data), index=data.index)
+            
+            z_scores = np.abs((data - mean) / std)
+            outliers = z_scores > threshold
+            
+        else:
+            raise ValueError(f"Unknown method: {method}. Use 'iqr' or 'zscore'")
+        
+        outlier_count = outliers.sum()
+        logger.info(f"Found {outlier_count} outliers in column '{column}'")
+        
+        return outliers
+    
+    def remove_outliers(self, df: pd.DataFrame, column: str, method: str = 'iqr', 
+                       threshold: float = 1.5) -> pd.DataFrame:
+        """
+        移除异常值
+        
+        Args:
+            df: 输入的DataFrame
+            column: 需要处理的列名
+            method: 检测方法,'iqr'或'zscore'
+            threshold: 阈值
+            
+        Returns:
+            移除异常值后的DataFrame
+        """
+        outliers = self.detect_outliers(df, column, method, threshold)
+        result = df[~outliers].copy()
+        
+        logger.info(f"Removed {outliers.sum()} outliers from dataframe")
+        return result
+    
+    def clean_data(self, df: pd.DataFrame, 
+                  remove_nulls: bool = True,
+                  remove_duplicates: bool = True,
+                  type_mapping: dict = None,
+                  outlier_columns: List[str] = None) -> pd.DataFrame:
+        """
+        一键数据清洗
+        
+        Args:
+            df: 输入的DataFrame
+            remove_nulls: 是否去除空值
+            remove_duplicates: 是否去重
+            type_mapping: 类型转换映射
+            outlier_columns: 需要检测异常值的列名列表
+            
+        Returns:
+            清洗后的DataFrame
+        """
+        logger.info(f"Starting data cleaning, input shape: {df.shape}")
+        
+        result = df.copy()
+        
+        # 去除空值
+        if remove_nulls:
+            result = self.remove_nulls(result)
+        
+        # 去重
+        if remove_duplicates:
+            result = self.remove_duplicates(result)
+        
+        # 类型转换
+        if type_mapping:
+            result = self.convert_types(result, type_mapping)
+        
+        # 异常值处理
+        if outlier_columns:
+            for col in outlier_columns:
+                if col in result.columns:
+                    result = self.remove_outliers(result, col)
+        
+        logger.info(f"Data cleaning completed, output shape: {result.shape}")
+        return result
+
+
+# 便捷函数
+def clean_data(df: pd.DataFrame, **kwargs) -> pd.DataFrame:
+    """
+    便捷的数据清洗函数
+    
+    Args:
+        df: 输入的DataFrame
+        **kwargs: 传递给DataCleaner.clean_data的参数
+        
+    Returns:
+        清洗后的DataFrame
+    """
+    cleaner = DataCleaner()
+    return cleaner.clean_data(df, **kwargs)
+

+ 431 - 0
app/core/data_processing/data_validator.py

@@ -0,0 +1,431 @@
+"""
+数据验证工具模块
+
+提供数据验证功能,用于验证数据的完整性和格式正确性
+"""
+
+import pandas as pd
+import re
+from typing import List, Dict, Any, Callable, Optional
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class ValidationRule:
+    """验证规则基类"""
+    
+    def __init__(self, column: str, error_message: str = None):
+        """
+        初始化验证规则
+        
+        Args:
+            column: 要验证的列名
+            error_message: 自定义错误消息
+        """
+        self.column = column
+        self.error_message = error_message or f"Validation failed for column '{column}'"
+    
+    def validate(self, df: pd.DataFrame) -> pd.Series:
+        """
+        执行验证
+        
+        Args:
+            df: 输入的DataFrame
+            
+        Returns:
+            布尔Series,True表示验证通过
+        """
+        raise NotImplementedError("Subclasses must implement validate method")
+
+
+class RequiredFieldRule(ValidationRule):
+    """必填字段验证规则"""
+    
+    def validate(self, df: pd.DataFrame) -> pd.Series:
+        """验证字段不能为空"""
+        if self.column not in df.columns:
+            raise ValueError(f"Column '{self.column}' not found in DataFrame")
+        
+        return df[self.column].notna()
+
+
+class DataTypeRule(ValidationRule):
+    """数据类型验证规则"""
+    
+    def __init__(self, column: str, expected_type: type, error_message: str = None):
+        """
+        初始化数据类型验证规则
+        
+        Args:
+            column: 要验证的列名
+            expected_type: 期望的数据类型
+            error_message: 自定义错误消息
+        """
+        super().__init__(column, error_message)
+        self.expected_type = expected_type
+        self.error_message = error_message or f"Column '{column}' must be of type {expected_type.__name__}"
+    
+    def validate(self, df: pd.DataFrame) -> pd.Series:
+        """验证数据类型"""
+        if self.column not in df.columns:
+            raise ValueError(f"Column '{self.column}' not found in DataFrame")
+        
+        # 对于空值,认为验证通过(可以与 RequiredFieldRule 组合使用)
+        result = pd.Series([True] * len(df), index=df.index)
+        non_null_mask = df[self.column].notna()
+        
+        if self.expected_type == int:
+            result[non_null_mask] = df.loc[non_null_mask, self.column].apply(
+                lambda x: isinstance(x, (int, float)) and float(x).is_integer()
+            )
+        elif self.expected_type == float:
+            result[non_null_mask] = df.loc[non_null_mask, self.column].apply(
+                lambda x: isinstance(x, (int, float))
+            )
+        elif self.expected_type == str:
+            result[non_null_mask] = df.loc[non_null_mask, self.column].apply(
+                lambda x: isinstance(x, str)
+            )
+        else:
+            result[non_null_mask] = df.loc[non_null_mask, self.column].apply(
+                lambda x: isinstance(x, self.expected_type)
+            )
+        
+        return result
+
+
+class RegexRule(ValidationRule):
+    """正则表达式验证规则"""
+    
+    def __init__(self, column: str, pattern: str, error_message: str = None):
+        """
+        初始化正则表达式验证规则
+        
+        Args:
+            column: 要验证的列名
+            pattern: 正则表达式模式
+            error_message: 自定义错误消息
+        """
+        super().__init__(column, error_message)
+        self.pattern = re.compile(pattern)
+        self.error_message = error_message or f"Column '{column}' does not match pattern '{pattern}'"
+    
+    def validate(self, df: pd.DataFrame) -> pd.Series:
+        """验证正则表达式"""
+        if self.column not in df.columns:
+            raise ValueError(f"Column '{self.column}' not found in DataFrame")
+        
+        # 对于空值或非字符串,认为验证通过
+        result = pd.Series([True] * len(df), index=df.index)
+        valid_mask = df[self.column].notna() & df[self.column].apply(lambda x: isinstance(x, str))
+        
+        result[valid_mask] = df.loc[valid_mask, self.column].apply(
+            lambda x: bool(self.pattern.match(str(x)))
+        )
+        
+        return result
+
+
+class EmailRule(RegexRule):
+    """邮箱格式验证规则"""
+    
+    def __init__(self, column: str, error_message: str = None):
+        """初始化邮箱验证规则"""
+        email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
+        super().__init__(
+            column, 
+            email_pattern,
+            error_message or f"Column '{column}' contains invalid email addresses"
+        )
+
+
+class PhoneRule(RegexRule):
+    """电话号码格式验证规则"""
+    
+    def __init__(self, column: str, country_code: str = 'CN', error_message: str = None):
+        """
+        初始化电话号码验证规则
+        
+        Args:
+            column: 要验证的列名
+            country_code: 国家代码,'CN'表示中国手机号
+            error_message: 自定义错误消息
+        """
+        if country_code == 'CN':
+            # 中国手机号:11位,1开头
+            phone_pattern = r'^1[3-9]\d{9}$'
+        else:
+            # 通用格式:支持国际格式
+            phone_pattern = r'^\+?[1-9]\d{1,14}$'
+        
+        super().__init__(
+            column, 
+            phone_pattern,
+            error_message or f"Column '{column}' contains invalid phone numbers"
+        )
+
+
+class CustomRule(ValidationRule):
+    """自定义验证规则"""
+    
+    def __init__(self, column: str, validator_func: Callable, error_message: str = None):
+        """
+        初始化自定义验证规则
+        
+        Args:
+            column: 要验证的列名
+            validator_func: 自定义验证函数,接收单个值,返回布尔值
+            error_message: 自定义错误消息
+        """
+        super().__init__(column, error_message)
+        self.validator_func = validator_func
+    
+    def validate(self, df: pd.DataFrame) -> pd.Series:
+        """验证自定义规则"""
+        if self.column not in df.columns:
+            raise ValueError(f"Column '{self.column}' not found in DataFrame")
+        
+        return df[self.column].apply(self.validator_func)
+
+
+class DataValidator:
+    """
+    数据验证器类
+    
+    用于验证数据的完整性和格式正确性
+    """
+    
+    def __init__(self):
+        """初始化数据验证器"""
+        self.rules: List[ValidationRule] = []
+        logger.info("DataValidator initialized")
+    
+    def add_rule(self, rule: ValidationRule) -> 'DataValidator':
+        """
+        添加验证规则
+        
+        Args:
+            rule: 验证规则对象
+            
+        Returns:
+            self,支持链式调用
+        """
+        self.rules.append(rule)
+        logger.info(f"Added validation rule for column '{rule.column}'")
+        return self
+    
+    def add_required_field(self, column: str, error_message: str = None) -> 'DataValidator':
+        """
+        添加必填字段验证
+        
+        Args:
+            column: 列名
+            error_message: 自定义错误消息
+            
+        Returns:
+            self,支持链式调用
+        """
+        return self.add_rule(RequiredFieldRule(column, error_message))
+    
+    def add_data_type(self, column: str, expected_type: type, error_message: str = None) -> 'DataValidator':
+        """
+        添加数据类型验证
+        
+        Args:
+            column: 列名
+            expected_type: 期望的数据类型
+            error_message: 自定义错误消息
+            
+        Returns:
+            self,支持链式调用
+        """
+        return self.add_rule(DataTypeRule(column, expected_type, error_message))
+    
+    def add_email_format(self, column: str, error_message: str = None) -> 'DataValidator':
+        """
+        添加邮箱格式验证
+        
+        Args:
+            column: 列名
+            error_message: 自定义错误消息
+            
+        Returns:
+            self,支持链式调用
+        """
+        return self.add_rule(EmailRule(column, error_message))
+    
+    def add_phone_format(self, column: str, country_code: str = 'CN', error_message: str = None) -> 'DataValidator':
+        """
+        添加电话号码格式验证
+        
+        Args:
+            column: 列名
+            country_code: 国家代码
+            error_message: 自定义错误消息
+            
+        Returns:
+            self,支持链式调用
+        """
+        return self.add_rule(PhoneRule(column, country_code, error_message))
+    
+    def add_regex(self, column: str, pattern: str, error_message: str = None) -> 'DataValidator':
+        """
+        添加正则表达式验证
+        
+        Args:
+            column: 列名
+            pattern: 正则表达式模式
+            error_message: 自定义错误消息
+            
+        Returns:
+            self,支持链式调用
+        """
+        return self.add_rule(RegexRule(column, pattern, error_message))
+    
+    def add_custom(self, column: str, validator_func: Callable, error_message: str = None) -> 'DataValidator':
+        """
+        添加自定义验证规则
+        
+        Args:
+            column: 列名
+            validator_func: 自定义验证函数
+            error_message: 自定义错误消息
+            
+        Returns:
+            self,支持链式调用
+        """
+        return self.add_rule(CustomRule(column, validator_func, error_message))
+    
+    def validate(self, df: pd.DataFrame) -> Dict[str, Any]:
+        """
+        执行所有验证规则
+        
+        Args:
+            df: 输入的DataFrame
+            
+        Returns:
+            验证结果字典,包含:
+            - is_valid: 整体是否通过验证
+            - total_rows: 总行数
+            - valid_rows: 有效行数
+            - invalid_rows: 无效行数
+            - errors: 错误详情列表
+            - invalid_indices: 无效行的索引列表
+        """
+        logger.info(f"Starting validation on DataFrame with {len(df)} rows")
+        
+        if not self.rules:
+            logger.warning("No validation rules defined")
+            return {
+                'is_valid': True,
+                'total_rows': len(df),
+                'valid_rows': len(df),
+                'invalid_rows': 0,
+                'errors': [],
+                'invalid_indices': [],
+            }
+        
+        # 初始化所有行为有效
+        valid_mask = pd.Series([True] * len(df), index=df.index)
+        errors = []
+        
+        # 应用所有验证规则
+        for rule in self.rules:
+            try:
+                rule_result = rule.validate(df)
+                failed_mask = ~rule_result
+                
+                if failed_mask.any():
+                    failed_indices = df.index[failed_mask].tolist()
+                    errors.append({
+                        'rule': rule.__class__.__name__,
+                        'column': rule.column,
+                        'message': rule.error_message,
+                        'failed_count': failed_mask.sum(),
+                        'failed_indices': failed_indices[:10],  # 只记录前10个
+                    })
+                    logger.warning(f"Validation failed for column '{rule.column}': {failed_mask.sum()} rows")
+                
+                # 更新整体有效性掩码
+                valid_mask &= rule_result
+                
+            except Exception as e:
+                logger.error(f"Error applying rule {rule.__class__.__name__} on column '{rule.column}': {str(e)}")
+                errors.append({
+                    'rule': rule.__class__.__name__,
+                    'column': rule.column,
+                    'message': f"Validation error: {str(e)}",
+                    'failed_count': len(df),
+                    'failed_indices': [],
+                })
+                valid_mask = pd.Series([False] * len(df), index=df.index)
+        
+        invalid_indices = df.index[~valid_mask].tolist()
+        
+        result = {
+            'is_valid': valid_mask.all(),
+            'total_rows': len(df),
+            'valid_rows': valid_mask.sum(),
+            'invalid_rows': (~valid_mask).sum(),
+            'errors': errors,
+            'invalid_indices': invalid_indices,
+        }
+        
+        logger.info(f"Validation completed: {result['valid_rows']}/{result['total_rows']} rows valid")
+        return result
+    
+    def get_valid_data(self, df: pd.DataFrame) -> pd.DataFrame:
+        """
+        获取通过验证的数据
+        
+        Args:
+            df: 输入的DataFrame
+            
+        Returns:
+            只包含有效行的DataFrame
+        """
+        validation_result = self.validate(df)
+        invalid_indices = validation_result['invalid_indices']
+        
+        if not invalid_indices:
+            return df.copy()
+        
+        return df.drop(invalid_indices).copy()
+    
+    def get_invalid_data(self, df: pd.DataFrame) -> pd.DataFrame:
+        """
+        获取未通过验证的数据
+        
+        Args:
+            df: 输入的DataFrame
+            
+        Returns:
+            只包含无效行的DataFrame
+        """
+        validation_result = self.validate(df)
+        invalid_indices = validation_result['invalid_indices']
+        
+        if not invalid_indices:
+            return pd.DataFrame(columns=df.columns)
+        
+        return df.loc[invalid_indices].copy()
+
+
+# 便捷函数
+def validate_data(df: pd.DataFrame, rules: List[ValidationRule]) -> Dict[str, Any]:
+    """
+    便捷的数据验证函数
+    
+    Args:
+        df: 输入的DataFrame
+        rules: 验证规则列表
+        
+    Returns:
+        验证结果字典
+    """
+    validator = DataValidator()
+    for rule in rules:
+        validator.add_rule(rule)
+    return validator.validate(df)
+

+ 43 - 5
mcp-servers/task-manager/README.md

@@ -85,9 +85,22 @@ VALUES (
 
 #### 执行单个任务
 
+**自动完成模式(推荐):**
 ```
 调用工具: execute_task
-参数: { "task_id": 1 }
+参数: { 
+  "task_id": 1,
+  "auto_complete": true  // 默认值,AI 会自动生成代码并更新状态
+}
+```
+
+**手动模式:**
+```
+调用工具: execute_task
+参数: { 
+  "task_id": 1,
+  "auto_complete": false  // 需要手动生成代码并调用 update_task_status
+}
 ```
 
 #### 批量处理所有任务
@@ -179,13 +192,38 @@ POST /api/auth/login
 
 ## 工作流程
 
+### 🚀 自动完成模式(推荐)
+
+1. **创建任务**:在数据库中插入任务记录,状态为`pending`
+2. **执行任务**:调用 `execute_task` (默认 `auto_complete: true`) 或 `process_all_tasks`
+3. **自动生成代码**:AI 根据任务描述自动生成 Python 代码文件
+4. **自动更新状态**:AI 自动调用 `update_task_status` 工具更新任务状态为 `completed`
+5. **轮询检查**:如果启用自动轮询,每5分钟检查一次新任务
+
+**✅ 自动完成优势**:
+- 🤖 完全自动化:无需人工干预
+- 📝 明确指令:AI 收到详细的自动执行步骤
+- ✓ 状态同步:代码生成完成后立即更新状态
+- 🔄 高效流程:适合批量处理任务
+
+### 📋 手动模式
+
 1. **创建任务**:在数据库中插入任务记录,状态为`pending`
-2. **获取任务**:MCP服务器从数据库读取`pending`状态的任务
-3. **执行任务**:通过MCP协议将任务描述发送给Cursor
-4. **开发代码**:Cursor根据任务描述开发代码
-5. **更新状态**:任务完成后,更新状态为`completed`,并记录代码文件名和路径
+2. **获取任务**:调用 `execute_task` 并设置 `auto_complete: false`
+3. **查看任务描述**:系统返回详细的任务描述和执行指令
+4. **开发代码**:根据任务描述手动开发 Python 代码文件
+5. **更新状态**:手动调用 `update_task_status` 工具,提供:
+   - `task_id`: 任务ID
+   - `status`: `"completed"` 或 `"failed"`
+   - `code_name`: 生成的代码文件名(如 `"user_login.py"`)
+   - `code_path`: 代码文件路径(如 `"app/core/auth"`)
 6. **轮询检查**:如果启用自动轮询,每5分钟检查一次新任务
 
+**⚠️ 手动模式注意事项**:
+- 任务不会自动完成,必须手动调用 `update_task_status`
+- 如果只执行任务而不更新状态,任务会一直保持 `processing` 状态
+- 适合需要人工审核或特殊处理的任务
+
 ## 故障处理
 
 ### 任务执行失败

+ 30 - 16
mcp-servers/task-manager/index.js

@@ -75,7 +75,7 @@ class TaskManagerServer {
                 },
                 {
                     name: 'execute_task',
-                    description: '执行单个任务,将任务描述发送给Cursor进行代码开发',
+                    description: '执行单个任务,支持自动完成模式(AI自动生成代码并更新状态)',
                     inputSchema: {
                         type: 'object',
                         properties: {
@@ -83,6 +83,11 @@ class TaskManagerServer {
                                 type: 'number',
                                 description: '要执行的任务ID',
                             },
+                            auto_complete: {
+                                type: 'boolean',
+                                description: '是否启用自动完成模式(AI自动生成代码并更新状态),默认为 true',
+                                default: true,
+                            },
                         },
                         required: ['task_id'],
                     },
@@ -241,20 +246,28 @@ class TaskManagerServer {
                         await updateTaskStatus(args.task_id, 'processing');
                         info(`[Task Manager] Task ${args.task_id} status updated to 'processing'`);
 
-                        // 处理任务
-                        const result = await processTask(task);
+                        // 处理任务(支持自动完成模式)
+                        const autoComplete = args.auto_complete !== false; // 默认启用自动完成
+                        const result = await processTask(task, autoComplete);
                         
                         if (result.success) {
-                            info(`[Task Manager] Task ${args.task_id} executed successfully. Code: ${result.code_name || 'N/A'}, Path: ${result.code_path || 'N/A'}`);
+                            if (autoComplete) {
+                                info(`[Task Manager] Task ${args.task_id} prepared for AUTO code generation.`);
+                                info(`[Task Manager] AI will automatically generate code and update task status.`);
+                            } else {
+                                info(`[Task Manager] Task ${args.task_id} prepared for MANUAL code generation.`);
+                                info(`[Task Manager] Please generate code and manually call update_task_status.`);
+                            }
                         } else {
-                            error(`[Task Manager] Task ${args.task_id} execution failed: ${result.message || 'Unknown error'}`);
+                            error(`[Task Manager] Task ${args.task_id} preparation failed: ${result.message || 'Unknown error'}`);
+                            await updateTaskStatus(args.task_id, 'failed');
                         }
 
                         return {
                             content: [
                                 {
                                     type: 'text',
-                                    text: JSON.stringify(result, null, 2),
+                                    text: result.execution_instructions || JSON.stringify(result, null, 2),
                                 },
                             ],
                         };
@@ -386,23 +399,21 @@ class TaskManagerServer {
                     // 更新状态为处理中
                     await updateTaskStatus(task.task_id, 'processing');
 
-                    // 处理任务
-                    const result = await processTask(task);
+                    // 处理任务(默认启用自动完成模式)
+                    const autoComplete = true; // process_all_tasks 默认自动完成
+                    const result = await processTask(task, autoComplete);
                     
                     results.processed++;
                     if (result.success) {
                         results.succeeded++;
-                        await updateTaskStatus(
-                            task.task_id,
-                            'completed',
-                            result.code_name,
-                            result.code_path
-                        );
-                        info(`[Task Manager] Task ${task.task_id} completed successfully. Code: ${result.code_name || 'N/A'}, Path: ${result.code_path || 'N/A'}`);
+                        // 注意:不立即标记为 completed
+                        // 任务保持 'processing' 状态,等待 AI 完成代码生成后
+                        // AI 会自动调用 update_task_status 工具更新状态
+                        info(`[Task Manager] Task ${task.task_id} prepared for AUTO execution.`);
                     } else {
                         results.failed++;
                         await updateTaskStatus(task.task_id, 'failed');
-                        error(`[Task Manager] Task ${task.task_id} failed: ${result.message || 'Unknown error'}`);
+                        error(`[Task Manager] Task ${task.task_id} preparation failed: ${result.message || 'Unknown error'}`);
                     }
 
                     results.tasks.push({
@@ -410,6 +421,9 @@ class TaskManagerServer {
                         task_name: task.task_name,
                         success: result.success,
                         message: result.message,
+                        auto_complete: result.auto_complete,
+                        execution_instructions: result.execution_instructions,
+                        formatted_description: result.formatted_description,
                     });
                 } catch (err) {
                     results.failed++;

+ 105 - 20
mcp-servers/task-manager/task-processor.js

@@ -15,10 +15,11 @@ import { info, error } from './logger.js';
  * @param {number} task.task_id - 任务ID
  * @param {string} task.task_name - 任务名称
  * @param {string} task.task_description - 任务描述(markdown格式)
+ * @param {boolean} autoComplete - 是否启用自动完成模式
  * @returns {Promise<Object>} 处理结果
  */
-export async function processTask(task) {
-    info(`[Task Processor] Processing task ${task.task_id}: ${task.task_name}`);
+export async function processTask(task, autoComplete = true) {
+    info(`[Task Processor] Processing task ${task.task_id}: ${task.task_name} (autoComplete: ${autoComplete})`);
     const startTime = Date.now();
     
     try {
@@ -30,12 +31,16 @@ export async function processTask(task) {
             task_name: task.task_name,
             description: task.task_description,
             timestamp: new Date().toISOString(),
+            auto_complete: autoComplete,
         };
 
         // 格式化任务描述,确保是有效的markdown
         const formattedDescription = formatTaskDescription(task);
         info(`[Task Processor] Task ${task.task_id} description formatted successfully (${formattedDescription.length} characters)`);
 
+        // 从任务描述中提取关键信息
+        const extractedInfo = extractTaskInfo(task);
+
         // 返回任务信息,供Cursor处理
         // Cursor会接收到这个信息,并根据描述执行代码开发
         const result = {
@@ -44,25 +49,13 @@ export async function processTask(task) {
             task_id: task.task_id,
             task_name: task.task_name,
             formatted_description: formattedDescription,
+            auto_complete: autoComplete,
             // 这些字段将在任务完成后更新到数据库
-            code_name: null,
-            code_path: null,
-            execution_instructions: `
-请根据以下任务描述进行代码开发:
-
-## 任务信息
-- 任务ID: ${task.task_id}
-- 任务名称: ${task.task_name}
-
-## 任务描述
-${formattedDescription}
-
-## 执行要求
-1. 仔细阅读任务描述,理解需求
-2. 根据描述开发相应的代码
-3. 确保代码符合项目规范
-4. 完成后,请提供生成的代码文件名和路径信息
-            `,
+            code_name: extractedInfo.code_name || null,
+            code_path: extractedInfo.code_path || task.code_path || 'app/core/data_flow',
+            execution_instructions: autoComplete ? 
+                buildAutoCompleteInstructions(task, formattedDescription) :
+                buildManualInstructions(task, formattedDescription),
         };
         
         const duration = Date.now() - startTime;
@@ -80,6 +73,98 @@ ${formattedDescription}
     }
 }
 
+/**
+ * 构建自动完成模式的执行指令
+ * 
+ * @param {Object} task - 任务对象
+ * @param {string} formattedDescription - 格式化后的任务描述
+ * @returns {string} 执行指令
+ */
+function buildAutoCompleteInstructions(task, formattedDescription) {
+    return `
+🤖 自动执行模式 - 任务 ${task.task_id}
+
+## 📋 任务信息
+- **任务ID**: ${task.task_id}
+- **任务名称**: ${task.task_name}
+
+## 📝 任务描述
+${formattedDescription}
+
+## 🚀 自动执行步骤
+
+**请按照以下步骤自动完成任务:**
+
+### 1️⃣ 分析需求
+仔细阅读任务描述,理解功能要求和技术细节。
+
+### 2️⃣ 生成代码
+根据任务描述,创建 Python 代码文件:
+- 确保代码结构清晰、注释完整
+- 遵循项目代码规范(PEP 8)
+- 包含必要的错误处理和日志记录
+- 如果任务描述中指定了文件路径,请使用该路径
+- 否则默认保存到 \`app/core/data_flow/\` 目录
+
+### 3️⃣ 自动更新任务状态
+**重要!代码生成完成后,必须调用以下 MCP 工具更新任务状态:**
+
+\`\`\`
+工具: update_task_status
+参数: {
+  "task_id": ${task.task_id},
+  "status": "completed",
+  "code_name": "<生成的代码文件名>.py",
+  "code_path": "<代码文件的目录路径>"
+}
+\`\`\`
+
+### 示例
+如果生成了文件 \`app/core/data_flow/data_processor.py\`,则调用:
+\`\`\`
+{
+  "task_id": ${task.task_id},
+  "status": "completed",
+  "code_name": "data_processor.py",
+  "code_path": "app/core/data_flow"
+}
+\`\`\`
+
+---
+
+**⚠️ 执行提醒**:
+- 不要跳过任何步骤
+- 必须在生成代码后立即更新任务状态
+- 如果遇到问题,将 status 设为 "failed" 并记录原因
+`;
+}
+
+/**
+ * 构建手动模式的执行指令
+ * 
+ * @param {Object} task - 任务对象
+ * @param {string} formattedDescription - 格式化后的任务描述
+ * @returns {string} 执行指令
+ */
+function buildManualInstructions(task, formattedDescription) {
+    return `
+📋 手动执行模式 - 任务 ${task.task_id}
+
+## 任务信息
+- 任务ID: ${task.task_id}
+- 任务名称: ${task.task_name}
+
+## 任务描述
+${formattedDescription}
+
+## 执行要求
+1. 仔细阅读任务描述,理解需求
+2. 根据描述开发相应的代码
+3. 确保代码符合项目规范
+4. 完成后,手动调用 update_task_status 工具更新状态
+`;
+}
+
 /**
  * 格式化任务描述
  *