Bläddra i källkod

新增数据服务模块。

maxiaolong 3 dagar sedan
förälder
incheckning
58bf4e2065

+ 87 - 92
app/__init__.py

@@ -1,11 +1,12 @@
+import logging
+
 from flask import Flask, jsonify
-from flask_sqlalchemy import SQLAlchemy
 from flask_cors import CORS
-import logging
+from flask_sqlalchemy import SQLAlchemy
+
 from app.config.config import config, current_env
 from app.config.cors import CORS_OPTIONS
 
-
 db = SQLAlchemy()
 
 
@@ -22,23 +23,25 @@ def create_app():
     db.init_app(app)
 
     # 注册蓝图
-    from app.api.meta_data import bp as meta_bp
+    from app.api.business_domain import bp as business_domain_bp
+    from app.api.data_factory import bp as data_factory_bp
+    from app.api.data_flow import bp as data_flow_bp
     from app.api.data_interface import bp as data_interface_bp
+    from app.api.data_service import bp as data_service_bp
+    from app.api.data_source import bp as data_source_bp
     from app.api.graph import bp as graph_bp
+    from app.api.meta_data import bp as meta_bp
     from app.api.system import bp as system_bp
-    from app.api.data_source import bp as data_source_bp
-    from app.api.data_flow import bp as data_flow_bp
-    from app.api.business_domain import bp as business_domain_bp
-    from app.api.data_factory import bp as data_factory_bp
 
-    app.register_blueprint(meta_bp, url_prefix='/api/meta')
-    app.register_blueprint(data_interface_bp, url_prefix='/api/interface')
-    app.register_blueprint(graph_bp, url_prefix='/api/graph')
-    app.register_blueprint(system_bp, url_prefix='/api/system')
-    app.register_blueprint(data_source_bp, url_prefix='/api/datasource')
-    app.register_blueprint(data_flow_bp, url_prefix='/api/dataflow')
-    app.register_blueprint(business_domain_bp, url_prefix='/api/bd')
-    app.register_blueprint(data_factory_bp, url_prefix='/api/datafactory')
+    app.register_blueprint(meta_bp, url_prefix="/api/meta")
+    app.register_blueprint(data_interface_bp, url_prefix="/api/interface")
+    app.register_blueprint(graph_bp, url_prefix="/api/graph")
+    app.register_blueprint(system_bp, url_prefix="/api/system")
+    app.register_blueprint(data_source_bp, url_prefix="/api/datasource")
+    app.register_blueprint(data_flow_bp, url_prefix="/api/dataflow")
+    app.register_blueprint(business_domain_bp, url_prefix="/api/bd")
+    app.register_blueprint(data_factory_bp, url_prefix="/api/datafactory")
+    app.register_blueprint(data_service_bp, url_prefix="/api/dataservice")
 
     # Configure global response headers
     configure_response_headers(app)
@@ -50,7 +53,7 @@ def create_app():
     configure_error_handlers(app)
 
     # 输出启动信息
-    port = app.config['PORT']
+    port = app.config["PORT"]
     app.logger.info(f"Starting server in {current_env} mode on port {port}")
 
     return app
@@ -64,108 +67,104 @@ def configure_response_headers(app):
         from flask import request
 
         # 检查是否是API路径
-        if request.path.startswith('/api/'):
+        if request.path.startswith("/api/"):
             # 排除文件下载和特殊响应类型
             excluded_types = [
-                'application/octet-stream',
-                'application/pdf',
-                'image/',
-                'text/csv',
-                'application/vnd.ms-excel',
-                'application/vnd.openxmlformats-officedocument'
+                "application/octet-stream",
+                "application/pdf",
+                "image/",
+                "text/csv",
+                "application/vnd.ms-excel",
+                "application/vnd.openxmlformats-officedocument",
             ]
-            if (response.content_type and
-                    any(ct in response.content_type for ct in excluded_types)):
+            if response.content_type and any(
+                ct in response.content_type for ct in excluded_types
+            ):
                 # 保持原有的文件类型不变
                 pass
-            elif (response.content_type and
-                    'application/json' in response.content_type):
+            elif response.content_type and "application/json" in response.content_type:
                 # 确保JSON响应设置正确的Content-Type和charset
-                ct = 'application/json; charset=utf-8'
-                response.headers['Content-Type'] = ct
-            elif (not response.content_type or
-                    response.content_type == 'text/html; charset=utf-8' or
-                    response.content_type == 'text/plain'):
+                ct = "application/json; charset=utf-8"
+                response.headers["Content-Type"] = ct
+            elif (
+                not response.content_type
+                or response.content_type == "text/html; charset=utf-8"
+                or response.content_type == "text/plain"
+            ):
                 # 对于API路由,默认设置为JSON
-                ct = 'application/json; charset=utf-8'
-                response.headers['Content-Type'] = ct
+                ct = "application/json; charset=utf-8"
+                response.headers["Content-Type"] = ct
 
             # 确保CORS头部不被覆盖
-            if 'Access-Control-Allow-Origin' not in response.headers:
+            if "Access-Control-Allow-Origin" not in response.headers:
                 # 动态设置Origin,支持任意前端地址
-                origin = request.headers.get('Origin')
+                origin = request.headers.get("Origin")
                 if origin:
                     # 允许任意Origin(最灵活的配置)
-                    response.headers['Access-Control-Allow-Origin'] = origin
+                    response.headers["Access-Control-Allow-Origin"] = origin
                 else:
                     # 如果没有Origin头部,设置为通配符
-                    response.headers['Access-Control-Allow-Origin'] = '*'
+                    response.headers["Access-Control-Allow-Origin"] = "*"
 
             # 专门处理预检请求(OPTIONS方法)
-            if request.method == 'OPTIONS':
-                origin = request.headers.get('Origin', '*')
-                response.headers['Access-Control-Allow-Origin'] = origin
-                methods = 'GET, POST, PUT, DELETE, OPTIONS'
-                response.headers['Access-Control-Allow-Methods'] = methods
+            if request.method == "OPTIONS":
+                origin = request.headers.get("Origin", "*")
+                response.headers["Access-Control-Allow-Origin"] = origin
+                methods = "GET, POST, PUT, DELETE, OPTIONS"
+                response.headers["Access-Control-Allow-Methods"] = methods
                 headers = (
-                    'Content-Type, Authorization, X-Requested-With, '
-                    'Accept, Origin, Cache-Control, X-File-Name'
+                    "Content-Type, Authorization, X-Requested-With, "
+                    "Accept, Origin, Cache-Control, X-File-Name"
                 )
-                response.headers['Access-Control-Allow-Headers'] = headers
-                response.headers['Access-Control-Max-Age'] = '86400'
+                response.headers["Access-Control-Allow-Headers"] = headers
+                response.headers["Access-Control-Max-Age"] = "86400"
                 return response
 
             # 根据配置设置凭据支持
             from app.config.cors import ALLOW_ALL_ORIGINS
-            if 'Access-Control-Allow-Credentials' not in response.headers:
+
+            if "Access-Control-Allow-Credentials" not in response.headers:
                 if ALLOW_ALL_ORIGINS:
                     # 通配符时不支持凭据
-                    response.headers['Access-Control-Allow-Credentials'] = (
-                        'false'
-                    )
+                    response.headers["Access-Control-Allow-Credentials"] = "false"
                 else:
-                    response.headers['Access-Control-Allow-Credentials'] = (
-                        'true'
-                    )
-
-            if 'Access-Control-Allow-Methods' not in response.headers:
-                methods = 'GET, POST, PUT, DELETE, OPTIONS'
-                response.headers['Access-Control-Allow-Methods'] = methods
-            if 'Access-Control-Allow-Headers' not in response.headers:
+                    response.headers["Access-Control-Allow-Credentials"] = "true"
+
+            if "Access-Control-Allow-Methods" not in response.headers:
+                methods = "GET, POST, PUT, DELETE, OPTIONS"
+                response.headers["Access-Control-Allow-Methods"] = methods
+            if "Access-Control-Allow-Headers" not in response.headers:
                 headers = (
-                    'Content-Type, Authorization, X-Requested-With, '
-                    'Accept, Origin'
+                    "Content-Type, Authorization, X-Requested-With, Accept, Origin"
                 )
-                response.headers['Access-Control-Allow-Headers'] = headers
+                response.headers["Access-Control-Allow-Headers"] = headers
 
             # 添加安全头部
-            if 'X-Content-Type-Options' not in response.headers:
-                response.headers['X-Content-Type-Options'] = 'nosniff'
-            if 'X-Frame-Options' not in response.headers:
-                response.headers['X-Frame-Options'] = 'DENY'
-            if 'X-XSS-Protection' not in response.headers:
-                response.headers['X-XSS-Protection'] = '1; mode=block'
+            if "X-Content-Type-Options" not in response.headers:
+                response.headers["X-Content-Type-Options"] = "nosniff"
+            if "X-Frame-Options" not in response.headers:
+                response.headers["X-Frame-Options"] = "DENY"
+            if "X-XSS-Protection" not in response.headers:
+                response.headers["X-XSS-Protection"] = "1; mode=block"
 
         return response
 
 
 def configure_logging(app):
     """Configure logging for the application"""
-    if not app.config.get('LOG_ENABLED', True):
+    if not app.config.get("LOG_ENABLED", True):
         return None
 
-    log_file = app.config.get(
-        'LOG_FILE', f'flask_{app.config["FLASK_ENV"]}.log'
-    )
-    log_level_name = app.config.get('LOG_LEVEL', 'INFO')
+    log_file = app.config.get("LOG_FILE", f"flask_{app.config['FLASK_ENV']}.log")
+    log_level_name = app.config.get("LOG_LEVEL", "INFO")
     log_level = getattr(logging, log_level_name)
     log_format = app.config.get(
-        'LOG_FORMAT',
-        '%(asctime)s - %(levelname)s - %(filename)s - '
-        '%(funcName)s - %(lineno)s - %(message)s'
+        "LOG_FORMAT",
+        "%(asctime)s - %(levelname)s - %(filename)s - "
+        "%(funcName)s - %(lineno)s - %(message)s",
     )
-    log_encoding = app.config.get('LOG_ENCODING', 'UTF-8')
-    log_to_console = app.config.get('LOG_TO_CONSOLE', True)
+    log_encoding = app.config.get("LOG_ENCODING", "UTF-8")
+    log_to_console = app.config.get("LOG_TO_CONSOLE", True)
 
     # 配置根日志器
     root_logger = logging.getLogger()
@@ -212,9 +211,9 @@ def configure_error_handlers(app):
 
         # 返回标准化的错误响应
         error_response = {
-            'success': False,
-            'message': f'服务器内部错误: {str(e)}',
-            'data': None
+            "success": False,
+            "message": f"服务器内部错误: {str(e)}",
+            "data": None,
         }
 
         return jsonify(error_response), 500
@@ -223,18 +222,14 @@ def configure_error_handlers(app):
     def handle_not_found(e):
         """处理404错误"""
         app.logger.warning(f"404错误: {str(e)}")
-        return jsonify({
-            'success': False,
-            'message': '请求的资源不存在',
-            'data': None
-        }), 404
+        return jsonify(
+            {"success": False, "message": "请求的资源不存在", "data": None}
+        ), 404
 
     @app.errorhandler(500)
     def handle_internal_error(e):
         """处理500错误"""
         app.logger.error(f"500错误: {str(e)}", exc_info=True)
-        return jsonify({
-            'success': False,
-            'message': '服务器内部错误',
-            'data': None
-        }), 500
+        return jsonify(
+            {"success": False, "message": "服务器内部错误", "data": None}
+        ), 500

+ 22 - 34
app/api/business_domain/routes.py

@@ -5,29 +5,30 @@ Business Domain API 路由模块
 
 import io
 import json
-import time
 import logging
+import time
 import traceback
 import urllib.parse
-from flask import request, jsonify, current_app, send_file
+
+from flask import current_app, jsonify, request, send_file
 from minio import Minio
 from minio.error import S3Error
 
 from app.api.business_domain import bp
-from app.models.result import success, failed
-from app.services.neo4j_driver import neo4j_driver
-from app.core.llm.ddl_parser import DDLParser
 from app.core.business_domain import (
+    business_domain_compose,
+    business_domain_graph_all,
+    business_domain_label_list,
     business_domain_list,
-    get_business_domain_by_id,
+    business_domain_search_list,
     delete_business_domain,
-    update_business_domain,
+    get_business_domain_by_id,
     save_business_domain,
-    business_domain_graph_all,
-    business_domain_search_list,
-    business_domain_compose,
-    business_domain_label_list,
+    update_business_domain,
 )
+from app.core.llm.ddl_parser import DDLParser
+from app.models.result import failed, success
+from app.services.neo4j_driver import neo4j_driver
 
 logger = logging.getLogger("app")
 
@@ -47,9 +48,7 @@ def get_minio_config():
     """获取 MinIO 配置"""
     return {
         "MINIO_BUCKET": current_app.config["MINIO_BUCKET"],
-        "PREFIX": current_app.config.get(
-            "BUSINESS_DOMAIN_PREFIX", "business_domain"
-        ),
+        "PREFIX": current_app.config.get("BUSINESS_DOMAIN_PREFIX", "business_domain"),
         "ALLOWED_EXTENSIONS": current_app.config["ALLOWED_EXTENSIONS"],
     }
 
@@ -157,9 +156,8 @@ def bd_save():
         if not data:
             return jsonify(failed("请求数据不能为空"))
 
-        if not data.get("id"):
-            if not data.get("name_zh") or not data.get("name_en"):
-                return jsonify(failed("新建时 name_zh 和 name_en 为必填项"))
+        if not data.get("id") and (not data.get("name_zh") or not data.get("name_en")):
+            return jsonify(failed("新建时 name_zh 和 name_en 为必填项"))
 
         saved_data = save_business_domain(data)
         return jsonify(success(saved_data))
@@ -208,8 +206,7 @@ def bd_upload():
         timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
 
         object_name = (
-            f"{config['PREFIX']}/"
-            f"{filename_without_ext}_{timestamp}.{file_type}"
+            f"{config['PREFIX']}/{filename_without_ext}_{timestamp}.{file_type}"
         )
 
         minio_client.put_object(
@@ -257,9 +254,7 @@ def bd_download():
         config = get_minio_config()
 
         try:
-            response = minio_client.get_object(
-                config["MINIO_BUCKET"], object_name
-            )
+            response = minio_client.get_object(config["MINIO_BUCKET"], object_name)
             file_data = response.read()
         except S3Error as e:
             logger.error(f"MinIO获取文件失败: {str(e)}")
@@ -351,9 +346,7 @@ def bd_ddl_parse():
                         OPTIONAL MATCH (n:BusinessDomain {name_en: name})
                         RETURN name, n IS NOT NULL AS exists
                         """
-                        table_results = session.run(
-                            table_query, names=table_names
-                        )
+                        table_results = session.run(table_query, names=table_names)
 
                         exist_map = {}
                         for record in table_results:
@@ -392,25 +385,20 @@ def bd_ddl_parse():
                         OPTIONAL MATCH (n:BusinessDomain {name_en: name})
                         RETURN name, n IS NOT NULL AS exists
                         """
-                        table_results = session.run(
-                            table_query, names=table_names
-                        )
+                        table_results = session.run(table_query, names=table_names)
 
                         for record in table_results:
                             t_name = record["name"]
                             exists = record["exists"]
-                            is_valid = (
-                                t_name in ddl_list
-                                and isinstance(ddl_list[t_name], dict)
+                            is_valid = t_name in ddl_list and isinstance(
+                                ddl_list[t_name], dict
                             )
                             if is_valid:
                                 ddl_list[t_name]["exist"] = exists
                 except Exception as e:
                     logger.error(f"检查业务领域存在状态失败: {str(e)}")
 
-        logger.debug(
-            f"识别到的DDL语句: {json.dumps(ddl_list, ensure_ascii=False)}"
-        )
+        logger.debug(f"识别到的DDL语句: {json.dumps(ddl_list, ensure_ascii=False)}")
         return jsonify(success(ddl_list))
     except Exception as e:
         logger.error(f"解析DDL语句失败: {str(e)}")

+ 6 - 0
app/api/data_service/__init__.py

@@ -0,0 +1,6 @@
+from flask import Blueprint
+
+bp = Blueprint("data_service", __name__)
+
+# 导入 routes 模块以注册路由(副作用导入)
+from app.api.data_service import routes  # noqa: E402, F401, I001  # pyright: ignore[reportUnusedImport]

+ 289 - 0
app/api/data_service/routes.py

@@ -0,0 +1,289 @@
+"""
+数据服务 API 路由
+提供数据产品列表、数据预览、Excel下载等接口
+"""
+
+import json
+import logging
+
+from flask import request, send_file
+
+from app.api.data_service import bp
+from app.core.data_service.data_product_service import DataProductService
+from app.core.graph.graph_operations import MyEncoder
+from app.models.result import failed, success
+
+logger = logging.getLogger(__name__)
+
+
+# ==================== 数据产品列表接口 ====================
+
+
+@bp.route("/products", methods=["GET"])
+def get_products():
+    """
+    获取数据产品列表
+
+    Query Parameters:
+        page: 页码,默认 1
+        page_size: 每页数量,默认 20
+        search: 搜索关键词
+        status: 状态过滤 (active/inactive/error)
+    """
+    try:
+        page = request.args.get("page", 1, type=int)
+        page_size = request.args.get("page_size", 20, type=int)
+        search = request.args.get("search", "")
+        status = request.args.get("status")
+
+        result = DataProductService.get_data_products(
+            page=page,
+            page_size=page_size,
+            search=search,
+            status=status,
+        )
+
+        res = success(result, "获取数据产品列表成功")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+    except Exception as e:
+        logger.error(f"获取数据产品列表失败: {str(e)}")
+        res = failed(f"获取数据产品列表失败: {str(e)}")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+
+@bp.route("/products/<int:product_id>", methods=["GET"])
+def get_product(product_id: int):
+    """
+    获取数据产品详情
+
+    Path Parameters:
+        product_id: 数据产品ID
+    """
+    try:
+        product = DataProductService.get_product_by_id(product_id)
+
+        if not product:
+            res = failed("数据产品不存在", code=404)
+            return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+        res = success(product.to_dict(), "获取数据产品详情成功")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+    except Exception as e:
+        logger.error(f"获取数据产品详情失败: {str(e)}")
+        res = failed(f"获取数据产品详情失败: {str(e)}")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+
+# ==================== 数据预览接口 ====================
+
+
+@bp.route("/products/<int:product_id>/preview", methods=["GET"])
+def get_product_preview(product_id: int):
+    """
+    获取数据产品的数据预览(默认200条)
+
+    Path Parameters:
+        product_id: 数据产品ID
+
+    Query Parameters:
+        limit: 预览数据条数,默认200,最大1000
+    """
+    try:
+        limit = request.args.get("limit", 200, type=int)
+        # 限制最大预览条数
+        limit = min(limit, 1000)
+
+        result = DataProductService.get_product_preview(
+            product_id=product_id,
+            limit=limit,
+        )
+
+        # 自动标记为已查看
+        DataProductService.mark_as_viewed(product_id)
+
+        res = success(result, "获取数据预览成功")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+    except ValueError as ve:
+        logger.warning(f"获取数据预览参数错误: {str(ve)}")
+        res = failed(str(ve), code=404)
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+    except Exception as e:
+        logger.error(f"获取数据预览失败: {str(e)}")
+        res = failed(f"获取数据预览失败: {str(e)}")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+
+# ==================== Excel下载接口 ====================
+
+
+@bp.route("/products/<int:product_id>/download", methods=["GET"])
+def download_product_excel(product_id: int):
+    """
+    下载数据产品数据为Excel文件
+
+    Path Parameters:
+        product_id: 数据产品ID
+
+    Query Parameters:
+        limit: 导出数据条数,默认200,最大10000
+    """
+    try:
+        limit = request.args.get("limit", 200, type=int)
+        # 限制最大导出条数
+        limit = min(limit, 10000)
+
+        excel_file, filename = DataProductService.export_to_excel(
+            product_id=product_id,
+            limit=limit,
+        )
+
+        # 标记为已查看
+        DataProductService.mark_as_viewed(product_id)
+
+        return send_file(
+            excel_file,
+            mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
+            as_attachment=True,
+            download_name=filename,
+        )
+
+    except ValueError as ve:
+        logger.warning(f"下载Excel参数错误: {str(ve)}")
+        res = failed(str(ve), code=404)
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+    except Exception as e:
+        logger.error(f"下载Excel失败: {str(e)}")
+        res = failed(f"下载Excel失败: {str(e)}")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+
+# ==================== 标记已查看接口 ====================
+
+
+@bp.route("/products/<int:product_id>/viewed", methods=["POST"])
+def mark_product_viewed(product_id: int):
+    """
+    标记数据产品为已查看(消除更新提示)
+
+    Path Parameters:
+        product_id: 数据产品ID
+    """
+    try:
+        product = DataProductService.mark_as_viewed(product_id)
+
+        if not product:
+            res = failed("数据产品不存在", code=404)
+            return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+        res = success(product.to_dict(), "标记已查看成功")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+    except Exception as e:
+        logger.error(f"标记已查看失败: {str(e)}")
+        res = failed(f"标记已查看失败: {str(e)}")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+
+# ==================== 刷新统计信息接口 ====================
+
+
+@bp.route("/products/<int:product_id>/refresh", methods=["POST"])
+def refresh_product_stats(product_id: int):
+    """
+    刷新数据产品的统计信息
+
+    Path Parameters:
+        product_id: 数据产品ID
+    """
+    try:
+        product = DataProductService.refresh_product_stats(product_id)
+
+        if not product:
+            res = failed("数据产品不存在", code=404)
+            return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+        res = success(product.to_dict(), "刷新统计信息成功")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+    except Exception as e:
+        logger.error(f"刷新统计信息失败: {str(e)}")
+        res = failed(f"刷新统计信息失败: {str(e)}")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+
+# ==================== 删除数据产品接口 ====================
+
+
+@bp.route("/products/<int:product_id>", methods=["DELETE"])
+def delete_product(product_id: int):
+    """
+    删除数据产品
+
+    Path Parameters:
+        product_id: 数据产品ID
+    """
+    try:
+        result = DataProductService.delete_product(product_id)
+
+        if not result:
+            res = failed("数据产品不存在", code=404)
+            return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+        res = success({}, "删除数据产品成功")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+    except Exception as e:
+        logger.error(f"删除数据产品失败: {str(e)}")
+        res = failed(f"删除数据产品失败: {str(e)}")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+
+# ==================== 手动注册数据产品接口 ====================
+
+
+@bp.route("/products", methods=["POST"])
+def register_product():
+    """
+    手动注册数据产品
+
+    Request Body:
+        product_name: 数据产品名称(必填)
+        product_name_en: 数据产品英文名(必填)
+        target_table: 目标表名(必填)
+        target_schema: 目标schema(可选,默认public)
+        description: 描述(可选)
+    """
+    try:
+        data = request.get_json()
+        if not data:
+            res = failed("请求数据不能为空", code=400)
+            return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+        # 验证必填字段
+        required_fields = ["product_name", "product_name_en", "target_table"]
+        for field in required_fields:
+            if not data.get(field):
+                res = failed(f"缺少必填字段: {field}", code=400)
+                return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+        product = DataProductService.register_data_product(
+            product_name=data["product_name"],
+            product_name_en=data["product_name_en"],
+            target_table=data["target_table"],
+            target_schema=data.get("target_schema", "public"),
+            description=data.get("description"),
+            source_dataflow_id=data.get("source_dataflow_id"),
+            source_dataflow_name=data.get("source_dataflow_name"),
+            created_by=data.get("created_by", "manual"),
+        )
+
+        res = success(product.to_dict(), "注册数据产品成功")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
+
+    except Exception as e:
+        logger.error(f"注册数据产品失败: {str(e)}")
+        res = failed(f"注册数据产品失败: {str(e)}")
+        return json.dumps(res, ensure_ascii=False, cls=MyEncoder)

+ 6 - 6
app/core/data_factory/n8n_client.py

@@ -134,15 +134,15 @@ class N8nClient:
                 return response.json()
             return {}
 
-        except requests.exceptions.Timeout:
+        except requests.exceptions.Timeout as e:
             logger.error(f"n8n API 请求超时: {url}")
-            raise N8nClientError("n8n API 请求超时")
-        except requests.exceptions.ConnectionError:
+            raise N8nClientError("n8n API 请求超时") from e
+        except requests.exceptions.ConnectionError as e:
             logger.error(f"n8n API 连接失败: {url}")
-            raise N8nClientError("无法连接到 n8n 服务")
+            raise N8nClientError("无法连接到 n8n 服务") from e
         except requests.exceptions.RequestException as e:
             logger.error(f"n8n API 请求异常: {str(e)}")
-            raise N8nClientError(f"n8n API 请求失败: {str(e)}")
+            raise N8nClientError(f"n8n API 请求失败: {str(e)}") from e
 
     # ==================== 工作流相关 API ====================
 
@@ -313,7 +313,7 @@ class N8nClient:
 
         except requests.exceptions.RequestException as e:
             logger.error(f"Webhook 触发失败: {str(e)}")
-            raise N8nClientError(f"Webhook 触发失败: {str(e)}")
+            raise N8nClientError(f"Webhook 触发失败: {str(e)}") from e
 
     # ==================== 健康检查 ====================
 

Filskillnaden har hållts tillbaka eftersom den är för stor
+ 246 - 257
app/core/data_flow/dataflows.py


+ 7 - 0
app/core/data_service/__init__.py

@@ -0,0 +1,7 @@
+# Data Service package initialization
+
+from app.core.data_service.data_product_service import DataProductService
+
+__all__ = [
+    "DataProductService",
+]

+ 601 - 0
app/core/data_service/data_product_service.py

@@ -0,0 +1,601 @@
+"""
+数据产品服务
+提供数据产品的列表查询、数据预览、Excel导出、注册等功能
+"""
+
+from __future__ import annotations
+
+import io
+import logging
+from datetime import datetime
+from typing import Any
+
+from sqlalchemy import text
+
+from app import db
+from app.models.data_product import DataProduct
+
+logger = logging.getLogger(__name__)
+
+
+class DataProductService:
+    """数据产品服务类"""
+
+    @staticmethod
+    def get_data_products(
+        page: int = 1,
+        page_size: int = 20,
+        search: str = "",
+        status: str | None = None,
+    ) -> dict[str, Any]:
+        """
+        获取数据产品列表
+
+        Args:
+            page: 页码
+            page_size: 每页大小
+            search: 搜索关键词
+            status: 状态过滤
+
+        Returns:
+            包含数据产品列表和分页信息的字典
+        """
+        try:
+            query = DataProduct.query
+
+            # 搜索过滤
+            if search:
+                search_pattern = f"%{search}%"
+                query = query.filter(
+                    db.or_(
+                        DataProduct.product_name.ilike(search_pattern),
+                        DataProduct.product_name_en.ilike(search_pattern),
+                        DataProduct.description.ilike(search_pattern),
+                        DataProduct.target_table.ilike(search_pattern),
+                    )
+                )
+
+            # 状态过滤
+            if status:
+                query = query.filter(DataProduct.status == status)
+
+            # 计算总数
+            total = query.count()
+
+            # 分页查询
+            products = (
+                query.order_by(DataProduct.created_at.desc())
+                .offset((page - 1) * page_size)
+                .limit(page_size)
+                .all()
+            )
+
+            # 转换为字典列表
+            product_list = [product.to_dict() for product in products]
+
+            return {
+                "list": product_list,
+                "pagination": {
+                    "page": page,
+                    "page_size": page_size,
+                    "total": total,
+                    "total_pages": (total + page_size - 1) // page_size,
+                },
+            }
+
+        except Exception as e:
+            logger.error(f"获取数据产品列表失败: {str(e)}")
+            raise
+
+    @staticmethod
+    def get_product_by_id(product_id: int) -> DataProduct | None:
+        """
+        根据ID获取数据产品
+
+        Args:
+            product_id: 数据产品ID
+
+        Returns:
+            数据产品对象,不存在则返回None
+        """
+        return DataProduct.query.get(product_id)
+
+    @staticmethod
+    def get_product_preview(
+        product_id: int,
+        limit: int = 200,
+    ) -> dict[str, Any]:
+        """
+        获取数据产品的数据预览
+
+        Args:
+            product_id: 数据产品ID
+            limit: 预览数据条数,默认200
+
+        Returns:
+            包含列信息和数据的字典
+        """
+        try:
+            product = DataProduct.query.get(product_id)
+            if not product:
+                raise ValueError(f"数据产品不存在: ID={product_id}")
+
+            # 构建查询SQL
+            schema = product.target_schema or "public"
+            table = product.target_table
+            full_table_name = f"{schema}.{table}"
+
+            # 先检查表是否存在
+            check_sql = text(
+                """
+                SELECT EXISTS (
+                    SELECT FROM information_schema.tables
+                    WHERE table_schema = :schema
+                    AND table_name = :table
+                )
+                """
+            )
+            result = db.session.execute(
+                check_sql, {"schema": schema, "table": table}
+            ).scalar()
+
+            if not result:
+                return {
+                    "product": product.to_dict(),
+                    "columns": [],
+                    "data": [],
+                    "total_count": 0,
+                    "preview_count": 0,
+                    "error": f"目标表 {full_table_name} 不存在",
+                }
+
+            # 获取列信息
+            columns_sql = text(
+                """
+                SELECT column_name, data_type, is_nullable
+                FROM information_schema.columns
+                WHERE table_schema = :schema AND table_name = :table
+                ORDER BY ordinal_position
+                """
+            )
+            columns_result = db.session.execute(
+                columns_sql, {"schema": schema, "table": table}
+            ).fetchall()
+
+            columns = [
+                {
+                    "name": row[0],
+                    "type": row[1],
+                    "nullable": row[2] == "YES",
+                }
+                for row in columns_result
+            ]
+
+            # 获取总记录数
+            # 使用带引号的表名以避免大小写问题
+            if schema == "public":
+                count_sql = text(f'SELECT COUNT(*) FROM "{table}"')
+            else:
+                count_sql = text(f'SELECT COUNT(*) FROM "{schema}"."{table}"')
+            try:
+                total_count = db.session.execute(count_sql).scalar() or 0
+            except Exception as e:
+                logger.error(f"查询总记录数失败: {e}, SQL: {count_sql}")
+                total_count = 0
+
+            # 获取预览数据
+            # 使用带引号的表名以避免大小写问题
+            if schema == "public":
+                preview_sql = text(f'SELECT * FROM "{table}" LIMIT :limit')
+            else:
+                preview_sql = text(f'SELECT * FROM "{schema}"."{table}" LIMIT :limit')
+            try:
+                preview_result = db.session.execute(
+                    preview_sql, {"limit": limit}
+                ).fetchall()
+                logger.debug(f"查询预览数据成功,返回 {len(preview_result)} 行")
+            except Exception as e:
+                logger.error(f"查询预览数据失败: {e}, SQL: {preview_sql}")
+                preview_result = []
+
+            # 转换数据为字典列表
+            # 如果从information_schema获取的列信息为空,从查询结果中获取列名
+            if columns:
+                column_names = [col["name"] for col in columns]
+            elif preview_result:
+                # 从查询结果的第一行获取列名
+                column_names = list(preview_result[0].keys())
+                # 同步更新columns列表
+                columns = [
+                    {"name": name, "type": "unknown", "nullable": True}
+                    for name in column_names
+                ]
+            else:
+                column_names = []
+
+            data = []
+            for row in preview_result:
+                # row可能是Row对象或元组
+                if hasattr(row, "_mapping"):
+                    # SQLAlchemy Row对象(支持列名访问)
+                    row_dict = dict(row._mapping)
+                elif hasattr(row, "_asdict"):
+                    # namedtuple或类似对象
+                    row_dict = row._asdict()
+                elif isinstance(row, (list, tuple)):
+                    # 元组或列表,使用列名索引
+                    row_dict = {}
+                    for i, value in enumerate(row):
+                        if i < len(column_names):
+                            col_name = column_names[i]
+                            # 处理特殊类型
+                            if isinstance(value, datetime):
+                                row_dict[col_name] = value.isoformat()
+                            elif value is None:
+                                row_dict[col_name] = None
+                            else:
+                                row_dict[col_name] = str(value)
+                else:
+                    # 尝试直接转换为字典
+                    try:
+                        row_dict = dict(row)
+                    except (TypeError, ValueError):
+                        row_dict = {}
+
+                # 统一处理日期时间类型
+                for key, value in row_dict.items():
+                    if isinstance(value, datetime):
+                        row_dict[key] = value.isoformat()
+
+                data.append(row_dict)
+
+            # 更新产品的列数信息
+            if product.column_count != len(columns):
+                product.column_count = len(columns)
+                db.session.commit()
+
+            return {
+                "product": product.to_dict(),
+                "columns": columns,
+                "data": data,
+                "total_count": total_count,
+                "preview_count": len(data),
+            }
+
+        except Exception as e:
+            logger.error(f"获取数据预览失败: {str(e)}")
+            raise
+
+    @staticmethod
+    def export_to_excel(
+        product_id: int,
+        limit: int = 200,
+    ) -> tuple[io.BytesIO, str]:
+        """
+        导出数据产品数据为Excel文件
+
+        Args:
+            product_id: 数据产品ID
+            limit: 导出数据条数,默认200
+
+        Returns:
+            (Excel文件字节流, 文件名)
+        """
+        try:
+            # 延迟导入,避免启动时加载
+            import pandas as pd
+
+            product = DataProduct.query.get(product_id)
+            if not product:
+                raise ValueError(f"数据产品不存在: ID={product_id}")
+
+            schema = product.target_schema or "public"
+            table = product.target_table
+            full_table_name = f"{schema}.{table}"
+
+            # 检查表是否存在
+            check_sql = text(
+                """
+                SELECT EXISTS (
+                    SELECT FROM information_schema.tables
+                    WHERE table_schema = :schema
+                    AND table_name = :table
+                )
+                """
+            )
+            result = db.session.execute(
+                check_sql, {"schema": schema, "table": table}
+            ).scalar()
+
+            if not result:
+                raise ValueError(f"目标表 {full_table_name} 不存在")
+
+            # 查询数据
+            query_sql = text(f'SELECT * FROM "{schema}"."{table}" LIMIT :limit')
+            result = db.session.execute(query_sql, {"limit": limit})
+
+            # 获取列名
+            column_names = list(result.keys())
+
+            # 获取数据
+            rows = result.fetchall()
+
+            # 将 Row 对象转换为元组列表,以便 pandas 正确处理
+            rows_data = [tuple(row) for row in rows]
+
+            # 创建DataFrame
+            # pandas DataFrame 构造函数接受列表和列名,类型检查器可能无法正确推断
+            df = pd.DataFrame(rows_data, columns=column_names)  # type: ignore[arg-type]
+
+            # 创建Excel文件
+            output = io.BytesIO()
+            # ExcelWriter 支持 BytesIO,类型检查器可能无法正确推断
+            with pd.ExcelWriter(output, engine="openpyxl") as writer:  # type: ignore[arg-type]
+                df.to_excel(writer, index=False, sheet_name="数据预览")
+
+            output.seek(0)
+
+            # 生成文件名
+            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+            filename = f"{product.product_name_en}_{timestamp}.xlsx"
+
+            logger.info(f"导出Excel成功: product_id={product_id}, rows={len(rows)}")
+
+            return output, filename
+
+        except Exception as e:
+            logger.error(f"导出Excel失败: {str(e)}")
+            raise
+
+    @staticmethod
+    def mark_as_viewed(product_id: int) -> DataProduct | None:
+        """
+        标记数据产品为已查看
+
+        Args:
+            product_id: 数据产品ID
+
+        Returns:
+            更新后的数据产品对象
+        """
+        try:
+            product = DataProduct.query.get(product_id)
+            if not product:
+                return None
+
+            product.mark_as_viewed()
+            db.session.commit()
+
+            logger.info(f"标记数据产品为已查看: product_id={product_id}")
+            return product
+
+        except Exception as e:
+            db.session.rollback()
+            logger.error(f"标记已查看失败: {str(e)}")
+            raise
+
+    @staticmethod
+    def register_data_product(
+        product_name: str,
+        product_name_en: str,
+        target_table: str,
+        target_schema: str = "public",
+        description: str | None = None,
+        source_dataflow_id: int | None = None,
+        source_dataflow_name: str | None = None,
+        created_by: str = "system",
+    ) -> DataProduct:
+        """
+        注册新的数据产品
+
+        Args:
+            product_name: 数据产品名称(中文)
+            product_name_en: 数据产品英文名
+            target_table: 目标表名
+            target_schema: 目标schema
+            description: 描述
+            source_dataflow_id: 关联的数据流ID
+            source_dataflow_name: 数据流名称
+            created_by: 创建人
+
+        Returns:
+            创建的数据产品对象
+        """
+        try:
+            # 检查是否已存在
+            existing = DataProduct.query.filter_by(
+                target_schema=target_schema,
+                target_table=target_table,
+            ).first()
+
+            if existing:
+                # 更新现有记录
+                existing.product_name = product_name
+                existing.product_name_en = product_name_en
+                existing.description = description
+                existing.source_dataflow_id = source_dataflow_id
+                existing.source_dataflow_name = source_dataflow_name
+                existing.updated_at = datetime.utcnow()
+                existing.last_updated_at = datetime.utcnow()
+                db.session.commit()
+
+                logger.info(
+                    f"更新数据产品: {product_name} -> {target_schema}.{target_table}"
+                )
+                return existing
+
+            # 创建新记录
+            # SQLAlchemy 模型支持关键字参数初始化,类型检查器可能无法正确推断
+            # pyright: ignore[reportCallIssue]
+            product = DataProduct(
+                product_name=product_name,  # type: ignore[arg-type]
+                product_name_en=product_name_en,  # type: ignore[arg-type]
+                target_table=target_table,  # type: ignore[arg-type]
+                target_schema=target_schema,  # type: ignore[arg-type]
+                description=description,  # type: ignore[arg-type]
+                source_dataflow_id=source_dataflow_id,  # type: ignore[arg-type]
+                source_dataflow_name=source_dataflow_name,  # type: ignore[arg-type]
+                created_by=created_by,  # type: ignore[arg-type]
+                last_updated_at=datetime.utcnow(),  # type: ignore[arg-type]
+            )
+
+            db.session.add(product)
+            db.session.commit()
+
+            logger.info(
+                f"注册数据产品成功: {product_name} -> {target_schema}.{target_table}"
+            )
+            return product
+
+        except Exception as e:
+            db.session.rollback()
+            logger.error(f"注册数据产品失败: {str(e)}")
+            raise
+
+    @staticmethod
+    def update_data_stats(
+        product_id: int,
+        record_count: int | None = None,
+        column_count: int | None = None,
+    ) -> DataProduct | None:
+        """
+        更新数据产品的统计信息
+
+        Args:
+            product_id: 数据产品ID
+            record_count: 记录数
+            column_count: 列数
+
+        Returns:
+            更新后的数据产品对象
+        """
+        try:
+            product = DataProduct.query.get(product_id)
+            if not product:
+                return None
+
+            if record_count is not None:
+                product.record_count = record_count
+            if column_count is not None:
+                product.column_count = column_count
+
+            product.last_updated_at = datetime.utcnow()
+            product.updated_at = datetime.utcnow()
+
+            db.session.commit()
+
+            logger.info(
+                f"更新数据产品统计: product_id={product_id}, "
+                f"record_count={record_count}, column_count={column_count}"
+            )
+            return product
+
+        except Exception as e:
+            db.session.rollback()
+            logger.error(f"更新数据统计失败: {str(e)}")
+            raise
+
+    @staticmethod
+    def refresh_product_stats(product_id: int) -> DataProduct | None:
+        """
+        刷新数据产品的统计信息(从目标表重新统计)
+
+        Args:
+            product_id: 数据产品ID
+
+        Returns:
+            更新后的数据产品对象
+        """
+        try:
+            product = DataProduct.query.get(product_id)
+            if not product:
+                return None
+
+            schema = product.target_schema or "public"
+            table = product.target_table
+
+            # 检查表是否存在
+            check_sql = text(
+                """
+                SELECT EXISTS (
+                    SELECT FROM information_schema.tables
+                    WHERE table_schema = :schema
+                    AND table_name = :table
+                )
+                """
+            )
+            exists = db.session.execute(
+                check_sql, {"schema": schema, "table": table}
+            ).scalar()
+
+            if not exists:
+                product.status = "error"
+                product.updated_at = datetime.utcnow()
+                db.session.commit()
+                return product
+
+            # 获取记录数
+            count_sql = text(f'SELECT COUNT(*) FROM "{schema}"."{table}"')
+            record_count = db.session.execute(count_sql).scalar() or 0
+
+            # 获取列数
+            columns_sql = text(
+                """
+                SELECT COUNT(*)
+                FROM information_schema.columns
+                WHERE table_schema = :schema AND table_name = :table
+                """
+            )
+            column_count = (
+                db.session.execute(
+                    columns_sql, {"schema": schema, "table": table}
+                ).scalar()
+                or 0
+            )
+
+            # 更新统计信息
+            product.record_count = record_count
+            product.column_count = column_count
+            product.last_updated_at = datetime.utcnow()
+            product.updated_at = datetime.utcnow()
+            product.status = "active"
+
+            db.session.commit()
+
+            logger.info(
+                f"刷新数据产品统计: product_id={product_id}, "
+                f"record_count={record_count}, column_count={column_count}"
+            )
+            return product
+
+        except Exception as e:
+            db.session.rollback()
+            logger.error(f"刷新数据统计失败: {str(e)}")
+            raise
+
+    @staticmethod
+    def delete_product(product_id: int) -> bool:
+        """
+        删除数据产品
+
+        Args:
+            product_id: 数据产品ID
+
+        Returns:
+            是否删除成功
+        """
+        try:
+            product = DataProduct.query.get(product_id)
+            if not product:
+                return False
+
+            db.session.delete(product)
+            db.session.commit()
+
+            logger.info(f"删除数据产品成功: product_id={product_id}")
+            return True
+
+        except Exception as e:
+            db.session.rollback()
+            logger.error(f"删除数据产品失败: {str(e)}")
+            raise

+ 2 - 0
app/models/__init__.py

@@ -1,8 +1,10 @@
 # Models package initialization
 
+from app.models.data_product import DataProduct
 from app.models.metadata_review import MetadataReviewRecord, MetadataVersionHistory
 
 __all__ = [
+    "DataProduct",
     "MetadataReviewRecord",
     "MetadataVersionHistory",
 ]

+ 120 - 0
app/models/data_product.py

@@ -0,0 +1,120 @@
+"""
+数据产品模型
+用于记录数据工厂加工完成后的数据产品信息
+"""
+
+from __future__ import annotations
+
+from datetime import datetime
+from typing import Any
+
+from app import db
+
+
+class DataProduct(db.Model):
+    """数据产品模型,记录数据工厂加工后的数据产品信息"""
+
+    __tablename__ = "data_products"
+    __table_args__ = {"schema": "public"}
+
+    id = db.Column(db.Integer, primary_key=True)
+
+    # 数据产品基本信息
+    product_name = db.Column(db.String(200), nullable=False)
+    product_name_en = db.Column(db.String(200), nullable=False)
+    description = db.Column(db.Text, nullable=True)
+
+    # 关联信息
+    source_dataflow_id = db.Column(db.Integer, nullable=True)
+    source_dataflow_name = db.Column(db.String(200), nullable=True)
+
+    # 目标表信息
+    target_table = db.Column(db.String(200), nullable=False)
+    target_schema = db.Column(db.String(100), nullable=False, default="public")
+
+    # 数据统计信息
+    record_count = db.Column(db.BigInteger, nullable=False, default=0)
+    column_count = db.Column(db.Integer, nullable=False, default=0)
+
+    # 更新提示相关
+    last_updated_at = db.Column(db.DateTime, nullable=True)
+    last_viewed_at = db.Column(db.DateTime, nullable=True)
+
+    # 状态信息
+    status = db.Column(db.String(50), nullable=False, default="active")
+
+    # 审计字段
+    created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
+    created_by = db.Column(db.String(100), nullable=False, default="system")
+    updated_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
+
+    def to_dict(self) -> dict[str, Any]:
+        """
+        将模型转换为字典
+
+        Returns:
+            包含所有字段的字典
+        """
+        return {
+            "id": self.id,
+            "product_name": self.product_name,
+            "product_name_en": self.product_name_en,
+            "description": self.description,
+            "source_dataflow_id": self.source_dataflow_id,
+            "source_dataflow_name": self.source_dataflow_name,
+            "target_table": self.target_table,
+            "target_schema": self.target_schema,
+            "record_count": self.record_count,
+            "column_count": self.column_count,
+            "last_updated_at": (
+                self.last_updated_at.isoformat() if self.last_updated_at else None
+            ),
+            "last_viewed_at": (
+                self.last_viewed_at.isoformat() if self.last_viewed_at else None
+            ),
+            "status": self.status,
+            "created_at": self.created_at.isoformat() if self.created_at else None,
+            "created_by": self.created_by,
+            "updated_at": self.updated_at.isoformat() if self.updated_at else None,
+            "has_new_data": self._has_new_data(),
+        }
+
+    def _has_new_data(self) -> bool:
+        """
+        判断是否有新数据(用于更新提示)
+
+        Returns:
+            如果 last_updated_at > last_viewed_at 则返回 True
+        """
+        if self.last_updated_at is None:
+            return False
+        if self.last_viewed_at is None:
+            return True
+        return self.last_updated_at > self.last_viewed_at
+
+    def mark_as_viewed(self) -> None:
+        """标记为已查看,更新 last_viewed_at 时间"""
+        self.last_viewed_at = datetime.utcnow()
+        self.updated_at = datetime.utcnow()
+
+    def update_data_stats(
+        self,
+        record_count: int,
+        column_count: int | None = None,
+    ) -> None:
+        """
+        更新数据统计信息
+
+        Args:
+            record_count: 记录数
+            column_count: 列数(可选)
+        """
+        self.record_count = record_count
+        if column_count is not None:
+            self.column_count = column_count
+        self.last_updated_at = datetime.utcnow()
+        self.updated_at = datetime.utcnow()
+
+    def __repr__(self) -> str:
+        return f"<DataProduct {self.product_name} ({self.target_table})>"
+

+ 77 - 0
database/create_data_products_table.sql

@@ -0,0 +1,77 @@
+-- =============================================
+-- 数据产品注册表
+-- 用于记录数据工厂加工完成后的数据产品信息
+-- 创建时间: 2025-12-25
+-- =============================================
+
+-- 创建 data_products 表
+CREATE TABLE IF NOT EXISTS public.data_products (
+    id SERIAL PRIMARY KEY,
+    -- 数据产品基本信息
+    product_name VARCHAR(200) NOT NULL,              -- 数据产品名称(中文)
+    product_name_en VARCHAR(200) NOT NULL,           -- 数据产品英文名(对应目标表名)
+    description TEXT,                                 -- 描述
+    
+    -- 关联信息
+    source_dataflow_id INTEGER,                      -- 关联的数据流ID(Neo4j节点ID)
+    source_dataflow_name VARCHAR(200),               -- 数据流名称(冗余存储便于查询)
+    
+    -- 目标表信息
+    target_table VARCHAR(200) NOT NULL,              -- 目标表名
+    target_schema VARCHAR(100) DEFAULT 'public',     -- 目标schema
+    
+    -- 数据统计信息
+    record_count BIGINT DEFAULT 0,                   -- 记录数
+    column_count INTEGER DEFAULT 0,                  -- 列数
+    
+    -- 更新提示相关
+    last_updated_at TIMESTAMP,                       -- 数据最后更新时间
+    last_viewed_at TIMESTAMP,                        -- 用户最后查看时间
+    
+    -- 状态信息
+    status VARCHAR(50) DEFAULT 'active',             -- 状态: active, inactive, error
+    
+    -- 审计字段
+    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,  -- 创建时间
+    created_by VARCHAR(100) DEFAULT 'system',        -- 创建人
+    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP   -- 更新时间
+);
+
+-- 添加表注释
+COMMENT ON TABLE public.data_products IS '数据产品注册表,记录数据工厂加工完成后的数据产品信息';
+
+-- 添加字段注释
+COMMENT ON COLUMN public.data_products.id IS '主键ID';
+COMMENT ON COLUMN public.data_products.product_name IS '数据产品名称(中文)';
+COMMENT ON COLUMN public.data_products.product_name_en IS '数据产品英文名,对应目标表名';
+COMMENT ON COLUMN public.data_products.description IS '数据产品描述';
+COMMENT ON COLUMN public.data_products.source_dataflow_id IS '关联的数据流ID(Neo4j节点ID)';
+COMMENT ON COLUMN public.data_products.source_dataflow_name IS '数据流名称';
+COMMENT ON COLUMN public.data_products.target_table IS '目标数据表名';
+COMMENT ON COLUMN public.data_products.target_schema IS '目标表所在schema';
+COMMENT ON COLUMN public.data_products.record_count IS '数据记录数';
+COMMENT ON COLUMN public.data_products.column_count IS '数据列数';
+COMMENT ON COLUMN public.data_products.last_updated_at IS '数据最后更新时间';
+COMMENT ON COLUMN public.data_products.last_viewed_at IS '用户最后查看时间';
+COMMENT ON COLUMN public.data_products.status IS '状态: active-正常, inactive-停用, error-异常';
+COMMENT ON COLUMN public.data_products.created_at IS '创建时间';
+COMMENT ON COLUMN public.data_products.created_by IS '创建人';
+COMMENT ON COLUMN public.data_products.updated_at IS '更新时间';
+
+-- 创建索引
+CREATE INDEX IF NOT EXISTS idx_data_products_target_table 
+    ON public.data_products(target_table);
+
+CREATE INDEX IF NOT EXISTS idx_data_products_source_dataflow_id 
+    ON public.data_products(source_dataflow_id);
+
+CREATE INDEX IF NOT EXISTS idx_data_products_status 
+    ON public.data_products(status);
+
+CREATE INDEX IF NOT EXISTS idx_data_products_created_at 
+    ON public.data_products(created_at DESC);
+
+-- 创建唯一索引,确保同一个目标表只有一个数据产品记录
+CREATE UNIQUE INDEX IF NOT EXISTS idx_data_products_unique_target 
+    ON public.data_products(target_schema, target_table);
+

+ 1120 - 0
docs/api_data_service_guide.md

@@ -0,0 +1,1120 @@
+# 数据服务 API 前端开发指南
+
+> **模块说明**: 数据服务 API 提供数据产品的列表查询、详情获取、数据预览、Excel 导出、状态管理等功能。
+>
+> **基础路径**: `/api/dataservice`
+
+---
+
+## 目录
+
+- [通用说明](#通用说明)
+  - [响应格式](#响应格式)
+  - [错误码说明](#错误码说明)
+  - [Axios 配置](#axios-配置)
+- [接口列表](#接口列表)
+  1. [获取数据产品列表](#1-获取数据产品列表)
+  2. [获取数据产品详情](#2-获取数据产品详情)
+  3. [获取数据预览](#3-获取数据预览)
+  4. [下载 Excel 文件](#4-下载-excel-文件)
+  5. [标记已查看](#5-标记已查看)
+  6. [刷新统计信息](#6-刷新统计信息)
+  7. [删除数据产品](#7-删除数据产品)
+  8. [注册数据产品](#8-注册数据产品)
+
+---
+
+## 通用说明
+
+### 响应格式
+
+所有接口返回统一的 JSON 格式:
+
+```json
+{
+  "code": 200,
+  "message": "操作成功",
+  "data": { ... }
+}
+```
+
+| 字段 | 类型 | 说明 |
+|------|------|------|
+| `code` | number | 状态码,200 表示成功,其他表示失败 |
+| `message` | string | 操作结果描述信息 |
+| `data` | object \| array \| null | 返回的数据内容 |
+
+### 错误码说明
+
+| 状态码 | 说明 | 常见场景 |
+|--------|------|----------|
+| 200 | 成功 | 操作成功完成 |
+| 400 | 请求参数错误 | 缺少必填字段、参数格式错误 |
+| 404 | 资源不存在 | 数据产品 ID 不存在 |
+| 500 | 服务器内部错误 | 数据库连接失败、SQL 执行异常 |
+
+### Axios 配置
+
+建议的 Axios 全局配置:
+
+```javascript
+// src/utils/request.js
+import axios from 'axios'
+import { ElMessage } from 'element-plus'
+
+const request = axios.create({
+  baseURL: process.env.VUE_APP_API_BASE_URL || 'http://localhost:5050',
+  timeout: 30000,
+  headers: {
+    'Content-Type': 'application/json'
+  }
+})
+
+// 响应拦截器
+request.interceptors.response.use(
+  response => {
+    const res = response.data
+    if (res.code !== 200) {
+      ElMessage.error(res.message || '请求失败')
+      return Promise.reject(new Error(res.message || 'Error'))
+    }
+    return res
+  },
+  error => {
+    ElMessage.error(error.message || '网络错误')
+    return Promise.reject(error)
+  }
+)
+
+export default request
+```
+
+---
+
+## 接口列表
+
+---
+
+### 1. 获取数据产品列表
+
+分页获取数据产品列表,支持搜索和状态过滤。
+
+#### 请求信息
+
+| 项目 | 说明 |
+|------|------|
+| **URL** | `GET /api/dataservice/products` |
+| **Method** | GET |
+| **Content-Type** | - |
+
+#### 请求参数 (Query String)
+
+| 参数名 | 类型 | 必填 | 默认值 | 说明 |
+|--------|------|------|--------|------|
+| `page` | integer | 否 | 1 | 页码 |
+| `page_size` | integer | 否 | 20 | 每页数量 |
+| `search` | string | 否 | "" | 搜索关键词(匹配名称、英文名、描述、表名) |
+| `status` | string | 否 | - | 状态过滤:`active`、`inactive`、`error` |
+
+#### 响应数据
+
+```json
+{
+  "code": 200,
+  "message": "获取数据产品列表成功",
+  "data": {
+    "list": [
+      {
+        "id": 1,
+        "product_name": "人才数据产品",
+        "product_name_en": "talent_data",
+        "description": "包含所有人才的基本信息",
+        "source_dataflow_id": 10,
+        "source_dataflow_name": "人才数据清洗流程",
+        "target_table": "dwd_talent_info",
+        "target_schema": "public",
+        "record_count": 15890,
+        "column_count": 25,
+        "last_updated_at": "2024-12-26T10:30:00",
+        "last_viewed_at": "2024-12-26T09:00:00",
+        "status": "active",
+        "created_at": "2024-12-01T08:00:00",
+        "created_by": "system",
+        "updated_at": "2024-12-26T10:30:00",
+        "has_new_data": true
+      }
+    ],
+    "pagination": {
+      "page": 1,
+      "page_size": 20,
+      "total": 45,
+      "total_pages": 3
+    }
+  }
+}
+```
+
+#### 数据产品字段说明
+
+| 字段 | 类型 | 说明 |
+|------|------|------|
+| `id` | integer | 数据产品唯一 ID |
+| `product_name` | string | 产品中文名称 |
+| `product_name_en` | string | 产品英文名称 |
+| `description` | string \| null | 产品描述 |
+| `source_dataflow_id` | integer \| null | 关联的数据流 ID |
+| `source_dataflow_name` | string \| null | 关联的数据流名称 |
+| `target_table` | string | 目标数据表名 |
+| `target_schema` | string | 目标 schema,默认 "public" |
+| `record_count` | integer | 数据记录数 |
+| `column_count` | integer | 数据列数 |
+| `last_updated_at` | string \| null | 数据最后更新时间(ISO 8601 格式) |
+| `last_viewed_at` | string \| null | 最后查看时间 |
+| `status` | string | 状态:`active`、`inactive`、`error` |
+| `created_at` | string | 创建时间 |
+| `created_by` | string | 创建人 |
+| `updated_at` | string | 更新时间 |
+| `has_new_data` | boolean | 是否有新数据未查看(用于更新提示) |
+
+#### Vue 接入示例
+
+```vue
+<template>
+  <div class="data-product-list">
+    <!-- 搜索栏 -->
+    <div class="search-bar">
+      <el-input
+        v-model="searchParams.search"
+        placeholder="搜索产品名称..."
+        @keyup.enter="fetchProducts"
+        clearable
+      />
+      <el-select v-model="searchParams.status" placeholder="状态" clearable>
+        <el-option label="活跃" value="active" />
+        <el-option label="非活跃" value="inactive" />
+        <el-option label="错误" value="error" />
+      </el-select>
+      <el-button type="primary" @click="fetchProducts">查询</el-button>
+    </div>
+
+    <!-- 数据表格 -->
+    <el-table :data="productList" v-loading="loading">
+      <el-table-column prop="product_name" label="产品名称">
+        <template #default="{ row }">
+          <span>{{ row.product_name }}</span>
+          <el-tag v-if="row.has_new_data" type="danger" size="small">新</el-tag>
+        </template>
+      </el-table-column>
+      <el-table-column prop="target_table" label="目标表" />
+      <el-table-column prop="record_count" label="记录数" />
+      <el-table-column prop="status" label="状态">
+        <template #default="{ row }">
+          <el-tag :type="getStatusType(row.status)">{{ row.status }}</el-tag>
+        </template>
+      </el-table-column>
+      <el-table-column label="操作" width="200">
+        <template #default="{ row }">
+          <el-button size="small" @click="handlePreview(row.id)">预览</el-button>
+          <el-button size="small" type="success" @click="handleDownload(row.id)">
+            下载
+          </el-button>
+        </template>
+      </el-table-column>
+    </el-table>
+
+    <!-- 分页 -->
+    <el-pagination
+      v-model:current-page="searchParams.page"
+      v-model:page-size="searchParams.page_size"
+      :total="pagination.total"
+      @current-change="fetchProducts"
+    />
+  </div>
+</template>
+
+<script setup>
+import { ref, reactive, onMounted } from 'vue'
+import request from '@/utils/request'
+
+const loading = ref(false)
+const productList = ref([])
+const pagination = ref({})
+const searchParams = reactive({
+  page: 1,
+  page_size: 20,
+  search: '',
+  status: ''
+})
+
+const fetchProducts = async () => {
+  loading.value = true
+  try {
+    const res = await request.get('/api/dataservice/products', {
+      params: searchParams
+    })
+    productList.value = res.data.list
+    pagination.value = res.data.pagination
+  } finally {
+    loading.value = false
+  }
+}
+
+const getStatusType = (status) => {
+  const types = { active: 'success', inactive: 'info', error: 'danger' }
+  return types[status] || 'info'
+}
+
+onMounted(() => {
+  fetchProducts()
+})
+</script>
+```
+
+---
+
+### 2. 获取数据产品详情
+
+根据 ID 获取单个数据产品的详细信息。
+
+#### 请求信息
+
+| 项目 | 说明 |
+|------|------|
+| **URL** | `GET /api/dataservice/products/{product_id}` |
+| **Method** | GET |
+| **Content-Type** | - |
+
+#### 路径参数
+
+| 参数名 | 类型 | 必填 | 说明 |
+|--------|------|------|------|
+| `product_id` | integer | 是 | 数据产品 ID |
+
+#### 响应数据
+
+```json
+{
+  "code": 200,
+  "message": "获取数据产品详情成功",
+  "data": {
+    "id": 1,
+    "product_name": "人才数据产品",
+    "product_name_en": "talent_data",
+    "description": "包含所有人才的基本信息",
+    "source_dataflow_id": 10,
+    "source_dataflow_name": "人才数据清洗流程",
+    "target_table": "dwd_talent_info",
+    "target_schema": "public",
+    "record_count": 15890,
+    "column_count": 25,
+    "last_updated_at": "2024-12-26T10:30:00",
+    "last_viewed_at": "2024-12-26T09:00:00",
+    "status": "active",
+    "created_at": "2024-12-01T08:00:00",
+    "created_by": "system",
+    "updated_at": "2024-12-26T10:30:00",
+    "has_new_data": true
+  }
+}
+```
+
+#### 错误响应
+
+**产品不存在 (404):**
+```json
+{
+  "code": 404,
+  "message": "数据产品不存在",
+  "data": null
+}
+```
+
+#### Vue 接入示例
+
+```javascript
+// 在 API 模块中定义
+export const dataProductApi = {
+  // 获取产品详情
+  getDetail(productId) {
+    return request.get(`/api/dataservice/products/${productId}`)
+  }
+}
+
+// 在组件中使用
+const fetchProductDetail = async (productId) => {
+  try {
+    const res = await dataProductApi.getDetail(productId)
+    productDetail.value = res.data
+  } catch (error) {
+    console.error('获取产品详情失败:', error)
+  }
+}
+```
+
+---
+
+### 3. 获取数据预览
+
+获取数据产品的数据预览,支持自定义预览条数(默认 200 条,最大 1000 条)。
+
+> **注意**: 调用此接口会自动将产品标记为已查看。
+
+#### 请求信息
+
+| 项目 | 说明 |
+|------|------|
+| **URL** | `GET /api/dataservice/products/{product_id}/preview` |
+| **Method** | GET |
+| **Content-Type** | - |
+
+#### 路径参数
+
+| 参数名 | 类型 | 必填 | 说明 |
+|--------|------|------|------|
+| `product_id` | integer | 是 | 数据产品 ID |
+
+#### 请求参数 (Query String)
+
+| 参数名 | 类型 | 必填 | 默认值 | 说明 |
+|--------|------|------|--------|------|
+| `limit` | integer | 否 | 200 | 预览数据条数,最大 1000 |
+
+#### 响应数据
+
+```json
+{
+  "code": 200,
+  "message": "获取数据预览成功",
+  "data": {
+    "product": {
+      "id": 1,
+      "product_name": "人才数据产品",
+      "product_name_en": "talent_data",
+      "target_table": "dwd_talent_info",
+      "target_schema": "public",
+      "record_count": 15890,
+      "column_count": 25,
+      "status": "active",
+      "has_new_data": false
+    },
+    "columns": [
+      { "name": "id", "type": "integer", "nullable": false },
+      { "name": "name", "type": "character varying", "nullable": false },
+      { "name": "email", "type": "character varying", "nullable": true },
+      { "name": "created_at", "type": "timestamp without time zone", "nullable": false }
+    ],
+    "data": [
+      { "id": 1, "name": "张三", "email": "zhangsan@example.com", "created_at": "2024-01-15T08:30:00" },
+      { "id": 2, "name": "李四", "email": "lisi@example.com", "created_at": "2024-01-16T09:45:00" }
+    ],
+    "total_count": 15890,
+    "preview_count": 200
+  }
+}
+```
+
+#### 响应字段说明
+
+| 字段 | 类型 | 说明 |
+|------|------|------|
+| `product` | object | 数据产品基本信息 |
+| `columns` | array | 列定义数组 |
+| `columns[].name` | string | 列名 |
+| `columns[].type` | string | 数据类型 |
+| `columns[].nullable` | boolean | 是否允许为空 |
+| `data` | array | 数据行数组 |
+| `total_count` | integer | 目标表总记录数 |
+| `preview_count` | integer | 本次预览返回的记录数 |
+
+#### 错误响应
+
+**目标表不存在时:**
+```json
+{
+  "code": 200,
+  "message": "获取数据预览成功",
+  "data": {
+    "product": { ... },
+    "columns": [],
+    "data": [],
+    "total_count": 0,
+    "preview_count": 0,
+    "error": "目标表 public.dwd_talent_info 不存在"
+  }
+}
+```
+
+#### Vue 接入示例
+
+```vue
+<template>
+  <div class="data-preview">
+    <!-- 产品信息卡片 -->
+    <el-card class="product-info">
+      <template #header>
+        <span>{{ previewData.product?.product_name }}</span>
+        <el-tag>{{ previewData.total_count }} 条记录</el-tag>
+      </template>
+      <p>目标表: {{ previewData.product?.target_schema }}.{{ previewData.product?.target_table }}</p>
+    </el-card>
+
+    <!-- 数据预览表格 -->
+    <el-table :data="previewData.data" v-loading="loading" border>
+      <el-table-column
+        v-for="col in previewData.columns"
+        :key="col.name"
+        :prop="col.name"
+        :label="col.name"
+      >
+        <template #header>
+          <div>
+            <span>{{ col.name }}</span>
+            <br />
+            <small style="color: #909399">{{ col.type }}</small>
+          </div>
+        </template>
+      </el-table-column>
+    </el-table>
+
+    <!-- 预览条数控制 -->
+    <div class="preview-controls">
+      <span>已加载 {{ previewData.preview_count }} / {{ previewData.total_count }} 条</span>
+      <el-button
+        v-if="previewData.preview_count < previewData.total_count"
+        @click="loadMore"
+      >
+        加载更多
+      </el-button>
+    </div>
+  </div>
+</template>
+
+<script setup>
+import { ref, onMounted } from 'vue'
+import request from '@/utils/request'
+
+const props = defineProps({
+  productId: { type: Number, required: true }
+})
+
+const loading = ref(false)
+const previewData = ref({
+  product: null,
+  columns: [],
+  data: [],
+  total_count: 0,
+  preview_count: 0
+})
+const limit = ref(200)
+
+const fetchPreview = async () => {
+  loading.value = true
+  try {
+    const res = await request.get(
+      `/api/dataservice/products/${props.productId}/preview`,
+      { params: { limit: limit.value } }
+    )
+    previewData.value = res.data
+  } finally {
+    loading.value = false
+  }
+}
+
+const loadMore = () => {
+  limit.value = Math.min(limit.value + 200, 1000)
+  fetchPreview()
+}
+
+onMounted(() => {
+  fetchPreview()
+})
+</script>
+```
+
+---
+
+### 4. 下载 Excel 文件
+
+下载数据产品数据为 Excel 文件。
+
+> **注意**: 调用此接口会自动将产品标记为已查看。
+
+#### 请求信息
+
+| 项目 | 说明 |
+|------|------|
+| **URL** | `GET /api/dataservice/products/{product_id}/download` |
+| **Method** | GET |
+| **Content-Type** | - |
+
+#### 路径参数
+
+| 参数名 | 类型 | 必填 | 说明 |
+|--------|------|------|------|
+| `product_id` | integer | 是 | 数据产品 ID |
+
+#### 请求参数 (Query String)
+
+| 参数名 | 类型 | 必填 | 默认值 | 说明 |
+|--------|------|------|--------|------|
+| `limit` | integer | 否 | 200 | 导出数据条数,最大 10000 |
+
+#### 响应数据
+
+**成功**: 返回 Excel 文件(MIME 类型: `application/vnd.openxmlformats-officedocument.spreadsheetml.sheet`)
+
+**文件名格式**: `{product_name_en}_{YYYYMMDD_HHMMSS}.xlsx`
+
+例如: `talent_data_20241226_103000.xlsx`
+
+#### 错误响应
+
+**产品或表不存在 (JSON 格式):**
+```json
+{
+  "code": 404,
+  "message": "数据产品不存在: ID=999",
+  "data": null
+}
+```
+
+#### Vue 接入示例
+
+```javascript
+// 方式 1: 使用 a 标签下载(推荐简单场景)
+const downloadExcel = (productId, limit = 200) => {
+  const baseUrl = process.env.VUE_APP_API_BASE_URL || 'http://localhost:5050'
+  const url = `${baseUrl}/api/dataservice/products/${productId}/download?limit=${limit}`
+  window.open(url, '_blank')
+}
+
+// 方式 2: 使用 axios 下载(支持进度、Token 验证)
+const downloadExcelWithAxios = async (productId, limit = 200) => {
+  try {
+    const response = await request.get(
+      `/api/dataservice/products/${productId}/download`,
+      {
+        params: { limit },
+        responseType: 'blob'
+      }
+    )
+
+    // 从响应头获取文件名
+    const contentDisposition = response.headers['content-disposition']
+    let filename = 'download.xlsx'
+    if (contentDisposition) {
+      const filenameMatch = contentDisposition.match(/filename[^;=\n]*=((['"]).*?\2|[^;\n]*)/)
+      if (filenameMatch) {
+        filename = filenameMatch[1].replace(/['"]/g, '')
+      }
+    }
+
+    // 创建下载链接
+    const blob = new Blob([response.data], {
+      type: 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
+    })
+    const url = window.URL.createObjectURL(blob)
+    const link = document.createElement('a')
+    link.href = url
+    link.download = filename
+    link.click()
+    window.URL.revokeObjectURL(url)
+  } catch (error) {
+    ElMessage.error('下载失败')
+  }
+}
+```
+
+```vue
+<template>
+  <div class="download-section">
+    <el-input-number v-model="downloadLimit" :min="1" :max="10000" />
+    <el-button type="primary" @click="handleDownload" :loading="downloading">
+      下载 Excel
+    </el-button>
+  </div>
+</template>
+
+<script setup>
+import { ref } from 'vue'
+
+const props = defineProps({
+  productId: { type: Number, required: true }
+})
+
+const downloadLimit = ref(200)
+const downloading = ref(false)
+
+const handleDownload = async () => {
+  downloading.value = true
+  try {
+    await downloadExcelWithAxios(props.productId, downloadLimit.value)
+  } finally {
+    downloading.value = false
+  }
+}
+</script>
+```
+
+---
+
+### 5. 标记已查看
+
+手动标记数据产品为已查看,消除更新提示(红点)。
+
+#### 请求信息
+
+| 项目 | 说明 |
+|------|------|
+| **URL** | `POST /api/dataservice/products/{product_id}/viewed` |
+| **Method** | POST |
+| **Content-Type** | application/json |
+
+#### 路径参数
+
+| 参数名 | 类型 | 必填 | 说明 |
+|--------|------|------|------|
+| `product_id` | integer | 是 | 数据产品 ID |
+
+#### 请求体
+
+无需请求体。
+
+#### 响应数据
+
+```json
+{
+  "code": 200,
+  "message": "标记已查看成功",
+  "data": {
+    "id": 1,
+    "product_name": "人才数据产品",
+    "has_new_data": false,
+    "last_viewed_at": "2024-12-26T14:30:00"
+  }
+}
+```
+
+#### Vue 接入示例
+
+```javascript
+const markAsViewed = async (productId) => {
+  try {
+    await request.post(`/api/dataservice/products/${productId}/viewed`)
+    // 更新本地数据状态
+    const product = productList.value.find(p => p.id === productId)
+    if (product) {
+      product.has_new_data = false
+    }
+  } catch (error) {
+    console.error('标记失败:', error)
+  }
+}
+```
+
+---
+
+### 6. 刷新统计信息
+
+刷新数据产品的统计信息(从目标表重新统计记录数和列数)。
+
+#### 请求信息
+
+| 项目 | 说明 |
+|------|------|
+| **URL** | `POST /api/dataservice/products/{product_id}/refresh` |
+| **Method** | POST |
+| **Content-Type** | application/json |
+
+#### 路径参数
+
+| 参数名 | 类型 | 必填 | 说明 |
+|--------|------|------|------|
+| `product_id` | integer | 是 | 数据产品 ID |
+
+#### 请求体
+
+无需请求体。
+
+#### 响应数据
+
+```json
+{
+  "code": 200,
+  "message": "刷新统计信息成功",
+  "data": {
+    "id": 1,
+    "product_name": "人才数据产品",
+    "record_count": 16500,
+    "column_count": 25,
+    "status": "active",
+    "last_updated_at": "2024-12-26T14:35:00"
+  }
+}
+```
+
+> **说明**: 如果目标表不存在,`status` 会更新为 `error`。
+
+#### Vue 接入示例
+
+```javascript
+const refreshStats = async (productId) => {
+  try {
+    const res = await request.post(`/api/dataservice/products/${productId}/refresh`)
+    ElMessage.success(`刷新成功,共 ${res.data.record_count} 条记录`)
+    // 更新本地数据
+    const index = productList.value.findIndex(p => p.id === productId)
+    if (index !== -1) {
+      productList.value[index] = res.data
+    }
+  } catch (error) {
+    ElMessage.error('刷新失败')
+  }
+}
+```
+
+---
+
+### 7. 删除数据产品
+
+删除数据产品记录(仅删除注册信息,不删除实际数据表)。
+
+#### 请求信息
+
+| 项目 | 说明 |
+|------|------|
+| **URL** | `DELETE /api/dataservice/products/{product_id}` |
+| **Method** | DELETE |
+| **Content-Type** | - |
+
+#### 路径参数
+
+| 参数名 | 类型 | 必填 | 说明 |
+|--------|------|------|------|
+| `product_id` | integer | 是 | 数据产品 ID |
+
+#### 响应数据
+
+```json
+{
+  "code": 200,
+  "message": "删除数据产品成功",
+  "data": {}
+}
+```
+
+#### 错误响应
+
+```json
+{
+  "code": 404,
+  "message": "数据产品不存在",
+  "data": null
+}
+```
+
+#### Vue 接入示例
+
+```javascript
+const deleteProduct = async (productId) => {
+  try {
+    await ElMessageBox.confirm('确定要删除该数据产品吗?此操作不可恢复。', '删除确认', {
+      type: 'warning'
+    })
+
+    await request.delete(`/api/dataservice/products/${productId}`)
+    ElMessage.success('删除成功')
+
+    // 从本地列表移除
+    productList.value = productList.value.filter(p => p.id !== productId)
+  } catch (error) {
+    if (error !== 'cancel') {
+      ElMessage.error('删除失败')
+    }
+  }
+}
+```
+
+---
+
+### 8. 注册数据产品
+
+手动注册新的数据产品。如果目标表已存在对应产品,则更新现有记录。
+
+#### 请求信息
+
+| 项目 | 说明 |
+|------|------|
+| **URL** | `POST /api/dataservice/products` |
+| **Method** | POST |
+| **Content-Type** | application/json |
+
+#### 请求体参数
+
+| 参数名 | 类型 | 必填 | 默认值 | 说明 |
+|--------|------|------|--------|------|
+| `product_name` | string | 是 | - | 产品中文名称 |
+| `product_name_en` | string | 是 | - | 产品英文名称 |
+| `target_table` | string | 是 | - | 目标数据表名 |
+| `target_schema` | string | 否 | "public" | 目标 schema |
+| `description` | string | 否 | null | 产品描述 |
+| `source_dataflow_id` | integer | 否 | null | 关联的数据流 ID |
+| `source_dataflow_name` | string | 否 | null | 关联的数据流名称 |
+| `created_by` | string | 否 | "manual" | 创建人标识 |
+
+#### 请求示例
+
+```json
+{
+  "product_name": "人才简历数据",
+  "product_name_en": "talent_resume_data",
+  "target_table": "dwd_talent_resume",
+  "target_schema": "public",
+  "description": "经过清洗处理的人才简历数据",
+  "source_dataflow_id": 15,
+  "source_dataflow_name": "简历数据清洗流程"
+}
+```
+
+#### 响应数据
+
+```json
+{
+  "code": 200,
+  "message": "注册数据产品成功",
+  "data": {
+    "id": 5,
+    "product_name": "人才简历数据",
+    "product_name_en": "talent_resume_data",
+    "target_table": "dwd_talent_resume",
+    "target_schema": "public",
+    "description": "经过清洗处理的人才简历数据",
+    "source_dataflow_id": 15,
+    "source_dataflow_name": "简历数据清洗流程",
+    "record_count": 0,
+    "column_count": 0,
+    "status": "active",
+    "created_at": "2024-12-26T15:00:00",
+    "created_by": "manual",
+    "has_new_data": false
+  }
+}
+```
+
+#### 错误响应
+
+**缺少必填字段 (400):**
+```json
+{
+  "code": 400,
+  "message": "缺少必填字段: product_name",
+  "data": null
+}
+```
+
+**请求体为空 (400):**
+```json
+{
+  "code": 400,
+  "message": "请求数据不能为空",
+  "data": null
+}
+```
+
+#### Vue 接入示例
+
+```vue
+<template>
+  <el-dialog v-model="dialogVisible" title="注册数据产品">
+    <el-form :model="form" :rules="rules" ref="formRef" label-width="120px">
+      <el-form-item label="产品名称" prop="product_name">
+        <el-input v-model="form.product_name" placeholder="请输入中文名称" />
+      </el-form-item>
+      <el-form-item label="英文名称" prop="product_name_en">
+        <el-input v-model="form.product_name_en" placeholder="请输入英文名称" />
+      </el-form-item>
+      <el-form-item label="目标表名" prop="target_table">
+        <el-input v-model="form.target_table" placeholder="例如: dwd_talent_info" />
+      </el-form-item>
+      <el-form-item label="Schema">
+        <el-input v-model="form.target_schema" placeholder="默认 public" />
+      </el-form-item>
+      <el-form-item label="描述">
+        <el-input v-model="form.description" type="textarea" :rows="3" />
+      </el-form-item>
+    </el-form>
+    <template #footer>
+      <el-button @click="dialogVisible = false">取消</el-button>
+      <el-button type="primary" @click="handleSubmit" :loading="submitting">
+        确定
+      </el-button>
+    </template>
+  </el-dialog>
+</template>
+
+<script setup>
+import { ref, reactive } from 'vue'
+import request from '@/utils/request'
+
+const dialogVisible = ref(false)
+const formRef = ref(null)
+const submitting = ref(false)
+
+const form = reactive({
+  product_name: '',
+  product_name_en: '',
+  target_table: '',
+  target_schema: 'public',
+  description: ''
+})
+
+const rules = {
+  product_name: [{ required: true, message: '请输入产品名称', trigger: 'blur' }],
+  product_name_en: [{ required: true, message: '请输入英文名称', trigger: 'blur' }],
+  target_table: [{ required: true, message: '请输入目标表名', trigger: 'blur' }]
+}
+
+const handleSubmit = async () => {
+  const valid = await formRef.value.validate()
+  if (!valid) return
+
+  submitting.value = true
+  try {
+    const res = await request.post('/api/dataservice/products', form)
+    ElMessage.success('注册成功')
+    dialogVisible.value = false
+    emit('success', res.data)
+  } catch (error) {
+    ElMessage.error(error.message || '注册失败')
+  } finally {
+    submitting.value = false
+  }
+}
+
+const emit = defineEmits(['success'])
+</script>
+```
+
+---
+
+## API 模块封装示例
+
+建议将所有数据服务 API 封装到独立模块:
+
+```javascript
+// src/api/dataService.js
+import request from '@/utils/request'
+
+const BASE_URL = '/api/dataservice'
+
+export const dataServiceApi = {
+  /**
+   * 获取数据产品列表
+   */
+  getProducts(params) {
+    return request.get(`${BASE_URL}/products`, { params })
+  },
+
+  /**
+   * 获取数据产品详情
+   */
+  getProductDetail(productId) {
+    return request.get(`${BASE_URL}/products/${productId}`)
+  },
+
+  /**
+   * 获取数据预览
+   */
+  getProductPreview(productId, limit = 200) {
+    return request.get(`${BASE_URL}/products/${productId}/preview`, {
+      params: { limit }
+    })
+  },
+
+  /**
+   * 下载 Excel (返回 Blob)
+   */
+  downloadExcel(productId, limit = 200) {
+    return request.get(`${BASE_URL}/products/${productId}/download`, {
+      params: { limit },
+      responseType: 'blob'
+    })
+  },
+
+  /**
+   * 获取下载链接
+   */
+  getDownloadUrl(productId, limit = 200) {
+    const baseUrl = process.env.VUE_APP_API_BASE_URL || ''
+    return `${baseUrl}${BASE_URL}/products/${productId}/download?limit=${limit}`
+  },
+
+  /**
+   * 标记为已查看
+   */
+  markAsViewed(productId) {
+    return request.post(`${BASE_URL}/products/${productId}/viewed`)
+  },
+
+  /**
+   * 刷新统计信息
+   */
+  refreshStats(productId) {
+    return request.post(`${BASE_URL}/products/${productId}/refresh`)
+  },
+
+  /**
+   * 删除数据产品
+   */
+  deleteProduct(productId) {
+    return request.delete(`${BASE_URL}/products/${productId}`)
+  },
+
+  /**
+   * 注册数据产品
+   */
+  registerProduct(data) {
+    return request.post(`${BASE_URL}/products`, data)
+  }
+}
+
+export default dataServiceApi
+```
+
+---
+
+## 常见问题
+
+### Q1: 数据预览加载很慢怎么办?
+
+**A**: 减少 `limit` 参数值,默认 200 条已足够预览。对于大表,避免使用 1000 条限制。
+
+### Q2: Excel 下载失败,返回 JSON 错误信息?
+
+**A**: 检查响应的 Content-Type,如果是 `application/json`,说明发生了错误。需要解析 JSON 获取错误信息:
+
+```javascript
+const response = await request.get(url, { responseType: 'blob' })
+if (response.data.type === 'application/json') {
+  const text = await response.data.text()
+  const error = JSON.parse(text)
+  ElMessage.error(error.message)
+}
+```
+
+### Q3: `has_new_data` 什么时候会变为 true?
+
+**A**: 当数据产品的 `last_updated_at` 大于 `last_viewed_at` 时,表示有新数据更新但用户尚未查看。调用预览、下载或标记已查看接口后会自动重置。
+
+### Q4: 如何处理跨域问题?
+
+**A**: 后端已配置 CORS,支持任意 Origin。如遇问题,检查请求头是否正确设置 `Content-Type`。
+
+---
+
+## 更新日志
+
+| 版本 | 日期 | 说明 |
+|------|------|------|
+| 1.0.0 | 2024-12-26 | 初始版本,包含完整的 API 文档 |
+

+ 173 - 0
scripts/create_data_products_table.py

@@ -0,0 +1,173 @@
+"""
+创建 data_products 数据表脚本
+用于在生产环境 PostgreSQL 数据库中创建数据产品注册表
+"""
+
+import os
+import sys
+from pathlib import Path
+
+# 设置控制台编码为 UTF-8(Windows)
+if sys.platform == "win32":
+    import io
+
+    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
+    sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
+
+# 添加项目根目录到路径
+project_root = Path(__file__).parent.parent
+sys.path.insert(0, str(project_root))
+
+import psycopg2
+
+from app.config.config import ProductionConfig
+
+
+def create_data_products_table():
+    """在生产环境数据库中创建 data_products 表"""
+    try:
+        # 从生产环境配置获取数据库连接信息
+        config = ProductionConfig()
+        db_uri = config.SQLALCHEMY_DATABASE_URI
+
+        # 解析数据库连接URI
+        # 格式: postgresql://user:password@host:port/database
+        uri_parts = db_uri.replace("postgresql://", "").split("@")
+        if len(uri_parts) != 2:
+            raise ValueError(f"无效的数据库URI格式: {db_uri}")
+
+        user_pass = uri_parts[0].split(":")
+        username = user_pass[0]
+        password = user_pass[1] if len(user_pass) > 1 else ""
+
+        host_db = uri_parts[1].split("/")
+        if len(host_db) != 2:
+            raise ValueError(f"无效的数据库URI格式: {db_uri}")
+
+        host_port = host_db[0].split(":")
+        hostname = host_port[0]
+        port = int(host_port[1]) if len(host_port) > 1 else 5432
+        database = host_db[1]
+
+        print("正在连接数据库...")
+        print(f"  主机: {hostname}")
+        print(f"  端口: {port}")
+        print(f"  数据库: {database}")
+        print(f"  用户: {username}")
+
+        # 连接数据库
+        conn = psycopg2.connect(
+            host=hostname,
+            port=port,
+            database=database,
+            user=username,
+            password=password,
+        )
+
+        # 设置自动提交
+        conn.autocommit = True
+        cursor = conn.cursor()
+
+        print("\n连接成功!")
+
+        # 读取SQL脚本
+        sql_file = project_root / "database" / "create_data_products_table.sql"
+        if not sql_file.exists():
+            raise FileNotFoundError(f"SQL文件不存在: {sql_file}")
+
+        print(f"\n读取SQL脚本: {sql_file}")
+
+        with open(sql_file, encoding="utf-8") as f:
+            sql_content = f.read()
+
+        # 执行SQL脚本
+        print("\n开始执行SQL脚本...")
+        cursor.execute(sql_content)
+
+        print("\n[成功] SQL脚本执行成功!")
+
+        # 验证表是否创建成功
+        print("\n验证表是否创建成功...")
+        cursor.execute(
+            """
+            SELECT EXISTS (
+                SELECT FROM information_schema.tables 
+                WHERE table_schema = 'public' 
+                AND table_name = 'data_products'
+            )
+            """
+        )
+        table_exists = cursor.fetchone()[0]
+
+        if table_exists:
+            print("[成功] 表 data_products 已成功创建!")
+
+            # 查询表结构
+            cursor.execute(
+                """
+                SELECT column_name, data_type, is_nullable
+                FROM information_schema.columns
+                WHERE table_schema = 'public' 
+                AND table_name = 'data_products'
+                ORDER BY ordinal_position
+                """
+            )
+            columns = cursor.fetchall()
+
+            print(f"\n表结构 ({len(columns)} 列):")
+            for col in columns:
+                nullable = "NULL" if col[2] == "YES" else "NOT NULL"
+                print(f"  - {col[0]:30} {col[1]:20} {nullable}")
+
+            # 查询索引
+            cursor.execute(
+                """
+                SELECT indexname, indexdef
+                FROM pg_indexes
+                WHERE schemaname = 'public' 
+                AND tablename = 'data_products'
+                """
+            )
+            indexes = cursor.fetchall()
+
+            if indexes:
+                print(f"\n索引 ({len(indexes)} 个):")
+                for idx in indexes:
+                    print(f"  - {idx[0]}")
+
+        else:
+            print("[警告] 表 data_products 未找到!")
+
+        cursor.close()
+        conn.close()
+
+        print("\n[完成] 操作成功完成!")
+
+    except psycopg2.Error as e:
+        print(f"\n[错误] 数据库错误: {e}")
+        sys.exit(1)
+    except Exception as e:
+        print(f"\n[错误] 错误: {e}")
+        import traceback
+
+        traceback.print_exc()
+        sys.exit(1)
+
+
+if __name__ == "__main__":
+    print("=" * 60)
+    print("创建 data_products 数据表")
+    print("=" * 60)
+    print()
+
+    # 确认环境
+    env = os.environ.get("FLASK_ENV", "production")
+    print(f"当前环境: {env}")
+
+    if env != "production":
+        response = input("\n警告: 当前不是生产环境,是否继续?(yes/no): ")
+        if response.lower() != "yes":
+            print("已取消操作")
+            sys.exit(0)
+
+    create_data_products_table()

+ 61 - 0
scripts/create_test_tables_direct.sql

@@ -0,0 +1,61 @@
+-- 直接创建测试数据表的 SQL 脚本
+-- 用于在生产环境数据库中创建测试表
+
+-- 表1: 销售数据表
+DROP TABLE IF EXISTS test_sales_data CASCADE;
+
+CREATE TABLE test_sales_data (
+    id SERIAL PRIMARY KEY,
+    order_id VARCHAR(50) NOT NULL,
+    customer_name VARCHAR(100),
+    product_name VARCHAR(200),
+    quantity INTEGER,
+    unit_price DECIMAL(10, 2),
+    total_amount DECIMAL(10, 2),
+    order_date DATE,
+    region VARCHAR(50),
+    status VARCHAR(20),
+    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+);
+
+COMMENT ON TABLE test_sales_data IS '测试销售数据表';
+
+-- 表2: 用户统计表
+DROP TABLE IF EXISTS test_user_statistics CASCADE;
+
+CREATE TABLE test_user_statistics (
+    id SERIAL PRIMARY KEY,
+    user_id VARCHAR(50) NOT NULL,
+    username VARCHAR(100),
+    email VARCHAR(200),
+    registration_date DATE,
+    last_login_date DATE,
+    total_orders INTEGER DEFAULT 0,
+    total_amount DECIMAL(10, 2) DEFAULT 0,
+    user_level VARCHAR(20),
+    is_active BOOLEAN DEFAULT TRUE,
+    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+);
+
+COMMENT ON TABLE test_user_statistics IS '测试用户统计表';
+
+-- 表3: 产品库存表
+DROP TABLE IF EXISTS test_product_inventory CASCADE;
+
+CREATE TABLE test_product_inventory (
+    id SERIAL PRIMARY KEY,
+    product_code VARCHAR(50) UNIQUE NOT NULL,
+    product_name VARCHAR(200),
+    category VARCHAR(100),
+    current_stock INTEGER,
+    min_stock INTEGER,
+    max_stock INTEGER,
+    unit_price DECIMAL(10, 2),
+    supplier VARCHAR(200),
+    last_restock_date DATE,
+    status VARCHAR(20),
+    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+);
+
+COMMENT ON TABLE test_product_inventory IS '测试产品库存表';
+

+ 518 - 0
scripts/prepare_data_service_test_data.py

@@ -0,0 +1,518 @@
+"""
+准备数据服务功能的测试数据
+包括创建测试数据表和注册数据产品
+"""
+
+import os
+import sys
+from datetime import datetime
+from pathlib import Path
+
+# 设置控制台编码为 UTF-8(Windows)
+if sys.platform == "win32":
+    import io
+
+    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
+    sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
+
+# 添加项目根目录到路径
+project_root = Path(__file__).parent.parent
+sys.path.insert(0, str(project_root))
+
+import psycopg2
+from app.config.config import ProductionConfig
+
+
+def get_db_connection():
+    """获取数据库连接"""
+    config = ProductionConfig()
+    db_uri = config.SQLALCHEMY_DATABASE_URI
+
+    # 解析数据库连接URI
+    uri_parts = db_uri.replace("postgresql://", "").split("@")
+    if len(uri_parts) != 2:
+        raise ValueError(f"无效的数据库URI格式: {db_uri}")
+
+    user_pass = uri_parts[0].split(":")
+    username = user_pass[0]
+    password = user_pass[1] if len(user_pass) > 1 else ""
+
+    host_db = uri_parts[1].split("/")
+    if len(host_db) != 2:
+        raise ValueError(f"无效的数据库URI格式: {db_uri}")
+
+    host_port = host_db[0].split(":")
+    hostname = host_port[0]
+    port = int(host_port[1]) if len(host_port) > 1 else 5432
+    database = host_db[1]
+
+    return psycopg2.connect(
+        host=hostname,
+        port=port,
+        database=database,
+        user=username,
+        password=password,
+    )
+
+
+def create_test_tables(conn):
+    """创建测试数据表"""
+    cursor = conn.cursor()
+
+    print("\n[1/3] 创建测试数据表...")
+
+    # 直接使用代码创建表(更可靠)
+    tables_created = []
+    
+    # 表1: 销售数据表
+    try:
+        cursor.execute("DROP TABLE IF EXISTS test_sales_data CASCADE")
+        conn.commit()
+        
+        cursor.execute("""
+            CREATE TABLE test_sales_data (
+                id SERIAL PRIMARY KEY,
+                order_id VARCHAR(50) NOT NULL,
+                customer_name VARCHAR(100),
+                product_name VARCHAR(200),
+                quantity INTEGER,
+                unit_price DECIMAL(10, 2),
+                total_amount DECIMAL(10, 2),
+                order_date DATE,
+                region VARCHAR(50),
+                status VARCHAR(20),
+                create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+            )
+        """)
+        cursor.execute("COMMENT ON TABLE test_sales_data IS '测试销售数据表'")
+        conn.commit()
+        tables_created.append("test_sales_data")
+        print("  ✓ test_sales_data 表创建成功")
+    except Exception as e:
+        print(f"  ✗ test_sales_data 表创建失败: {e}")
+        conn.rollback()
+
+    # 表2: 用户统计表
+    try:
+        cursor.execute("DROP TABLE IF EXISTS test_user_statistics CASCADE")
+        conn.commit()
+        
+        cursor.execute("""
+            CREATE TABLE test_user_statistics (
+                id SERIAL PRIMARY KEY,
+                user_id VARCHAR(50) NOT NULL,
+                username VARCHAR(100),
+                email VARCHAR(200),
+                registration_date DATE,
+                last_login_date DATE,
+                total_orders INTEGER DEFAULT 0,
+                total_amount DECIMAL(10, 2) DEFAULT 0,
+                user_level VARCHAR(20),
+                is_active BOOLEAN DEFAULT TRUE,
+                create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+            )
+        """)
+        cursor.execute("COMMENT ON TABLE test_user_statistics IS '测试用户统计表'")
+        conn.commit()
+        tables_created.append("test_user_statistics")
+        print("  ✓ test_user_statistics 表创建成功")
+    except Exception as e:
+        print(f"  ✗ test_user_statistics 表创建失败: {e}")
+        conn.rollback()
+
+    # 表3: 产品库存表
+    try:
+        cursor.execute("DROP TABLE IF EXISTS test_product_inventory CASCADE")
+        conn.commit()
+        
+        cursor.execute("""
+            CREATE TABLE test_product_inventory (
+                id SERIAL PRIMARY KEY,
+                product_code VARCHAR(50) UNIQUE NOT NULL,
+                product_name VARCHAR(200),
+                category VARCHAR(100),
+                current_stock INTEGER,
+                min_stock INTEGER,
+                max_stock INTEGER,
+                unit_price DECIMAL(10, 2),
+                supplier VARCHAR(200),
+                last_restock_date DATE,
+                status VARCHAR(20),
+                create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+            )
+        """)
+        cursor.execute("COMMENT ON TABLE test_product_inventory IS '测试产品库存表'")
+        conn.commit()
+        tables_created.append("test_product_inventory")
+        print("  ✓ test_product_inventory 表创建成功")
+    except Exception as e:
+        print(f"  ✗ test_product_inventory 表创建失败: {e}")
+        conn.rollback()
+    else:
+        # 如果SQL文件不存在,使用代码创建
+        print("  SQL文件不存在,使用代码创建表...")
+        
+        # 表1: 销售数据表
+        try:
+            cursor.execute("DROP TABLE IF EXISTS test_sales_data CASCADE")
+            cursor.execute("""
+                CREATE TABLE test_sales_data (
+                    id SERIAL PRIMARY KEY,
+                    order_id VARCHAR(50) NOT NULL,
+                    customer_name VARCHAR(100),
+                    product_name VARCHAR(200),
+                    quantity INTEGER,
+                    unit_price DECIMAL(10, 2),
+                    total_amount DECIMAL(10, 2),
+                    order_date DATE,
+                    region VARCHAR(50),
+                    status VARCHAR(20),
+                    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+                )
+            """)
+            cursor.execute("COMMENT ON TABLE test_sales_data IS '测试销售数据表'")
+            print("  ✓ test_sales_data 表创建成功")
+        except Exception as e:
+            print(f"  ✗ test_sales_data 表创建失败: {e}")
+            conn.rollback()
+
+        # 表2: 用户统计表
+        try:
+            cursor.execute("DROP TABLE IF EXISTS test_user_statistics CASCADE")
+            cursor.execute("""
+                CREATE TABLE test_user_statistics (
+                    id SERIAL PRIMARY KEY,
+                    user_id VARCHAR(50) NOT NULL,
+                    username VARCHAR(100),
+                    email VARCHAR(200),
+                    registration_date DATE,
+                    last_login_date DATE,
+                    total_orders INTEGER DEFAULT 0,
+                    total_amount DECIMAL(10, 2) DEFAULT 0,
+                    user_level VARCHAR(20),
+                    is_active BOOLEAN DEFAULT TRUE,
+                    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+                )
+            """)
+            cursor.execute("COMMENT ON TABLE test_user_statistics IS '测试用户统计表'")
+            print("  ✓ test_user_statistics 表创建成功")
+        except Exception as e:
+            print(f"  ✗ test_user_statistics 表创建失败: {e}")
+            conn.rollback()
+
+        # 表3: 产品库存表
+        try:
+            cursor.execute("DROP TABLE IF EXISTS test_product_inventory CASCADE")
+            cursor.execute("""
+                CREATE TABLE test_product_inventory (
+                    id SERIAL PRIMARY KEY,
+                    product_code VARCHAR(50) UNIQUE NOT NULL,
+                    product_name VARCHAR(200),
+                    category VARCHAR(100),
+                    current_stock INTEGER,
+                    min_stock INTEGER,
+                    max_stock INTEGER,
+                    unit_price DECIMAL(10, 2),
+                    supplier VARCHAR(200),
+                    last_restock_date DATE,
+                    status VARCHAR(20),
+                    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+                )
+            """)
+            cursor.execute("COMMENT ON TABLE test_product_inventory IS '测试产品库存表'")
+            print("  ✓ test_product_inventory 表创建成功")
+        except Exception as e:
+            print(f"  ✗ test_product_inventory 表创建失败: {e}")
+            conn.rollback()
+        
+        conn.commit()
+
+    # 验证表是否创建成功
+    cursor.execute("""
+        SELECT table_name FROM information_schema.tables
+        WHERE table_schema = 'public'
+        AND table_name IN ('test_sales_data', 'test_user_statistics', 'test_product_inventory')
+        ORDER BY table_name
+    """)
+    created_tables = [row[0] for row in cursor.fetchall()]
+    print(f"\n[成功] 测试数据表创建完成")
+    print(f"  已创建表: {', '.join(created_tables) if created_tables else '无'}")
+    
+    if not created_tables:
+        print("  [错误] 没有成功创建任何表,请检查错误信息")
+        return False
+    
+    return True
+
+
+def insert_test_data(conn):
+    """插入测试数据"""
+    cursor = conn.cursor()
+
+    print("\n[2/3] 插入测试数据...")
+
+    # 插入销售数据 (250条)
+    sales_data = []
+    regions = ["华东", "华南", "华北", "西南", "西北"]
+    statuses = ["已完成", "处理中", "已取消"]
+    products = ["笔记本电脑", "台式机", "显示器", "键盘", "鼠标", "耳机", "音响", "摄像头"]
+
+    for i in range(250):
+        order_date = datetime(2024, 1, 1).replace(day=(i % 28) + 1, month=(i // 28) % 12 + 1)
+        quantity = (i % 10) + 1
+        unit_price = round(100.0 + (i % 5000), 2)
+        total_amount = quantity * unit_price
+
+        sales_data.append((
+            f"ORD{10000 + i}",
+            f"客户{chr(65 + (i % 26))}{i}",
+            products[i % len(products)],
+            quantity,
+            unit_price,
+            total_amount,
+            order_date,
+            regions[i % len(regions)],
+            statuses[i % len(statuses)],
+        ))
+
+    cursor.executemany("""
+        INSERT INTO test_sales_data 
+        (order_id, customer_name, product_name, quantity, unit_price, 
+         total_amount, order_date, region, status)
+        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
+    """, sales_data)
+
+    # 插入用户统计数据 (200条)
+    user_data = []
+    levels = ["普通", "银卡", "金卡", "钻石"]
+    for i in range(200):
+        reg_date = datetime(2024, 1, 1).replace(day=(i % 28) + 1, month=(i // 28) % 12 + 1)
+        login_date = reg_date.replace(day=(reg_date.day + (i % 10)) % 28 + 1)
+
+        user_data.append((
+            f"USER{1000 + i}",
+            f"user{i}",
+            f"user{i}@example.com",
+            reg_date,
+            login_date,
+            (i % 50) + 1,
+            round(1000.0 + (i % 50000), 2),
+            levels[i % len(levels)],
+            (i % 10) != 0,  # 每10个用户有一个不活跃
+        ))
+
+    cursor.executemany("""
+        INSERT INTO test_user_statistics 
+        (user_id, username, email, registration_date, last_login_date,
+         total_orders, total_amount, user_level, is_active)
+        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
+    """, user_data)
+
+    # 插入产品库存数据 (150条)
+    inventory_data = []
+    categories = ["电子产品", "办公用品", "家具", "服装", "食品"]
+    suppliers = ["供应商A", "供应商B", "供应商C", "供应商D"]
+
+    for i in range(150):
+        current_stock = (i % 500) + 10
+        min_stock = 50
+        max_stock = 1000
+
+        inventory_data.append((
+            f"PROD{10000 + i}",
+            f"产品{i}",
+            categories[i % len(categories)],
+            current_stock,
+            min_stock,
+            max_stock,
+            round(50.0 + (i % 500), 2),
+            suppliers[i % len(suppliers)],
+            datetime(2024, 1, 1).replace(day=(i % 28) + 1),
+            "正常" if current_stock > min_stock else "缺货",
+        ))
+
+    cursor.executemany("""
+        INSERT INTO test_product_inventory 
+        (product_code, product_name, category, current_stock, min_stock, 
+         max_stock, unit_price, supplier, last_restock_date, status)
+        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+    """, inventory_data)
+
+    conn.commit()
+    print(f"[成功] 测试数据插入完成:")
+    print(f"  - test_sales_data: 250 条")
+    print(f"  - test_user_statistics: 200 条")
+    print(f"  - test_product_inventory: 150 条")
+
+
+def register_data_products(conn):
+    """注册数据产品到 data_products 表"""
+    cursor = conn.cursor()
+
+    print("\n[3/3] 注册数据产品...")
+
+    products = [
+        {
+            "product_name": "销售数据分析",
+            "product_name_en": "test_sales_data",
+            "target_table": "test_sales_data",
+            "target_schema": "public",
+            "description": "销售订单数据分析,包含订单详情、客户信息、产品信息等",
+            "source_dataflow_id": 1001,
+            "source_dataflow_name": "销售数据加工流程",
+        },
+        {
+            "product_name": "用户行为统计",
+            "product_name_en": "test_user_statistics",
+            "target_table": "test_user_statistics",
+            "target_schema": "public",
+            "description": "用户注册、登录、订单统计等行为数据分析",
+            "source_dataflow_id": 1002,
+            "source_dataflow_name": "用户数据加工流程",
+        },
+        {
+            "product_name": "产品库存管理",
+            "product_name_en": "test_product_inventory",
+            "target_table": "test_product_inventory",
+            "target_schema": "public",
+            "description": "产品库存信息,包括库存数量、价格、供应商等信息",
+            "source_dataflow_id": 1003,
+            "source_dataflow_name": "库存数据加工流程",
+        },
+    ]
+
+    for product in products:
+        # 先检查表是否存在
+        cursor.execute("""
+            SELECT EXISTS (
+                SELECT FROM information_schema.tables
+                WHERE table_schema = %s AND table_name = %s
+            )
+        """, (product['target_schema'], product['target_table']))
+
+        table_exists = cursor.fetchone()[0]
+        if not table_exists:
+            print(f"  [跳过] 表 {product['target_table']} 不存在,跳过注册")
+            continue
+
+        # 获取表的记录数和列数
+        table_name = product['target_table']
+        cursor.execute(f'SELECT COUNT(*) FROM "{table_name}"')
+        record_count = cursor.fetchone()[0]
+
+        cursor.execute("""
+            SELECT COUNT(*) 
+            FROM information_schema.columns
+            WHERE table_schema = %s AND table_name = %s
+        """, (product['target_schema'], product['target_table']))
+        column_count = cursor.fetchone()[0]
+
+        # 检查是否已存在
+        cursor.execute("""
+            SELECT id FROM data_products
+            WHERE target_schema = %s AND target_table = %s
+        """, (product['target_schema'], product['target_table']))
+
+        existing = cursor.fetchone()
+
+        if existing:
+            # 更新现有记录
+            cursor.execute("""
+                UPDATE data_products SET
+                    product_name = %s,
+                    product_name_en = %s,
+                    description = %s,
+                    source_dataflow_id = %s,
+                    source_dataflow_name = %s,
+                    record_count = %s,
+                    column_count = %s,
+                    last_updated_at = CURRENT_TIMESTAMP,
+                    updated_at = CURRENT_TIMESTAMP
+                WHERE id = %s
+            """, (
+                product['product_name'],
+                product['product_name_en'],
+                product['description'],
+                product['source_dataflow_id'],
+                product['source_dataflow_name'],
+                record_count,
+                column_count,
+                existing[0],
+            ))
+            print(f"  [更新] {product['product_name']} (ID: {existing[0]})")
+        else:
+            # 插入新记录
+            cursor.execute("""
+                INSERT INTO data_products 
+                (product_name, product_name_en, description, source_dataflow_id,
+                 source_dataflow_name, target_table, target_schema, record_count,
+                 column_count, last_updated_at, created_by, status)
+                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, 'test_script', 'active')
+                RETURNING id
+            """, (
+                product['product_name'],
+                product['product_name_en'],
+                product['description'],
+                product['source_dataflow_id'],
+                product['source_dataflow_name'],
+                product['target_table'],
+                product['target_schema'],
+                record_count,
+                column_count,
+            ))
+            product_id = cursor.fetchone()[0]
+            print(f"  [创建] {product['product_name']} (ID: {product_id}, 记录数: {record_count})")
+
+    conn.commit()
+    print("[成功] 数据产品注册完成")
+
+
+def main():
+    """主函数"""
+    print("=" * 60)
+    print("准备数据服务功能测试数据")
+    print("=" * 60)
+
+    env = os.environ.get("FLASK_ENV", "production")
+    print(f"\n当前环境: {env}")
+
+    if env != "production":
+        response = input("\n警告: 当前不是生产环境,是否继续?(yes/no): ")
+        if response.lower() != "yes":
+            print("已取消操作")
+            return
+
+    try:
+        conn = get_db_connection()
+        conn.autocommit = False
+
+        try:
+            create_test_tables(conn)
+            insert_test_data(conn)
+            register_data_products(conn)
+
+            print("\n" + "=" * 60)
+            print("[完成] 测试数据准备完成!")
+            print("=" * 60)
+            print("\n可以开始测试以下 API 接口:")
+            print("  1. GET  /api/dataservice/products - 获取数据产品列表")
+            print("  2. GET  /api/dataservice/products/{id} - 获取产品详情")
+            print("  3. GET  /api/dataservice/products/{id}/preview - 获取数据预览")
+            print("  4. GET  /api/dataservice/products/{id}/download - 下载Excel")
+            print("  5. POST /api/dataservice/products/{id}/viewed - 标记已查看")
+            print("  6. POST /api/dataservice/products/{id}/refresh - 刷新统计信息")
+
+        finally:
+            conn.close()
+
+    except Exception as e:
+        print(f"\n[错误] 操作失败: {e}")
+        import traceback
+
+        traceback.print_exc()
+        sys.exit(1)
+
+
+if __name__ == "__main__":
+    main()
+

+ 427 - 0
scripts/prepare_data_service_test_data_fixed.py

@@ -0,0 +1,427 @@
+"""
+准备数据服务功能的测试数据(修复版)
+包括创建测试数据表和注册数据产品
+"""
+
+import os
+import sys
+from datetime import datetime
+from pathlib import Path
+
+# 设置控制台编码为 UTF-8(Windows)
+if sys.platform == "win32":
+    import io
+
+    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
+    sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
+
+# 添加项目根目录到路径
+project_root = Path(__file__).parent.parent
+sys.path.insert(0, str(project_root))
+
+import psycopg2
+from app.config.config import ProductionConfig
+
+
+def get_db_connection():
+    """获取数据库连接"""
+    config = ProductionConfig()
+    db_uri = config.SQLALCHEMY_DATABASE_URI
+
+    uri_parts = db_uri.replace("postgresql://", "").split("@")
+    user_pass = uri_parts[0].split(":")
+    username = user_pass[0]
+    password = user_pass[1] if len(user_pass) > 1 else ""
+
+    host_db = uri_parts[1].split("/")
+    host_port = host_db[0].split(":")
+    hostname = host_port[0]
+    port = int(host_port[1]) if len(host_port) > 1 else 5432
+    database = host_db[1]
+
+    return psycopg2.connect(
+        host=hostname,
+        port=port,
+        database=database,
+        user=username,
+        password=password,
+    )
+
+
+def create_test_tables(conn):
+    """创建测试数据表"""
+    cursor = conn.cursor()
+    print("\n[1/3] 创建测试数据表...")
+    
+    tables_created = []
+    
+    # 表1: 销售数据表
+    try:
+        cursor.execute("DROP TABLE IF EXISTS test_sales_data CASCADE")
+        conn.commit()
+        
+        cursor.execute("""
+            CREATE TABLE test_sales_data (
+                id SERIAL PRIMARY KEY,
+                order_id VARCHAR(50) NOT NULL,
+                customer_name VARCHAR(100),
+                product_name VARCHAR(200),
+                quantity INTEGER,
+                unit_price DECIMAL(10, 2),
+                total_amount DECIMAL(10, 2),
+                order_date DATE,
+                region VARCHAR(50),
+                status VARCHAR(20),
+                create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+            )
+        """)
+        cursor.execute("COMMENT ON TABLE test_sales_data IS '测试销售数据表'")
+        conn.commit()
+        tables_created.append("test_sales_data")
+        print("  ✓ test_sales_data 表创建成功")
+    except Exception as e:
+        print(f"  ✗ test_sales_data 表创建失败: {e}")
+        conn.rollback()
+
+    # 表2: 用户统计表
+    try:
+        cursor.execute("DROP TABLE IF EXISTS test_user_statistics CASCADE")
+        conn.commit()
+        
+        cursor.execute("""
+            CREATE TABLE test_user_statistics (
+                id SERIAL PRIMARY KEY,
+                user_id VARCHAR(50) NOT NULL,
+                username VARCHAR(100),
+                email VARCHAR(200),
+                registration_date DATE,
+                last_login_date DATE,
+                total_orders INTEGER DEFAULT 0,
+                total_amount DECIMAL(10, 2) DEFAULT 0,
+                user_level VARCHAR(20),
+                is_active BOOLEAN DEFAULT TRUE,
+                create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+            )
+        """)
+        cursor.execute("COMMENT ON TABLE test_user_statistics IS '测试用户统计表'")
+        conn.commit()
+        tables_created.append("test_user_statistics")
+        print("  ✓ test_user_statistics 表创建成功")
+    except Exception as e:
+        print(f"  ✗ test_user_statistics 表创建失败: {e}")
+        conn.rollback()
+
+    # 表3: 产品库存表
+    try:
+        cursor.execute("DROP TABLE IF EXISTS test_product_inventory CASCADE")
+        conn.commit()
+        
+        cursor.execute("""
+            CREATE TABLE test_product_inventory (
+                id SERIAL PRIMARY KEY,
+                product_code VARCHAR(50) UNIQUE NOT NULL,
+                product_name VARCHAR(200),
+                category VARCHAR(100),
+                current_stock INTEGER,
+                min_stock INTEGER,
+                max_stock INTEGER,
+                unit_price DECIMAL(10, 2),
+                supplier VARCHAR(200),
+                last_restock_date DATE,
+                status VARCHAR(20),
+                create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+            )
+        """)
+        cursor.execute("COMMENT ON TABLE test_product_inventory IS '测试产品库存表'")
+        conn.commit()
+        tables_created.append("test_product_inventory")
+        print("  ✓ test_product_inventory 表创建成功")
+    except Exception as e:
+        print(f"  ✗ test_product_inventory 表创建失败: {e}")
+        conn.rollback()
+
+    # 验证表是否创建成功(刷新连接以查看最新状态)
+    conn.rollback()  # 确保任何未提交的事务回滚
+    cursor.execute("""
+        SELECT table_name FROM information_schema.tables
+        WHERE table_schema = 'public'
+        AND table_name IN ('test_sales_data', 'test_user_statistics', 'test_product_inventory')
+        ORDER BY table_name
+    """)
+    created_tables = [row[0] for row in cursor.fetchall()]
+    
+    print(f"\n[成功] 测试数据表创建完成")
+    print(f"  已创建表: {', '.join(created_tables) if created_tables else '无'}")
+    
+    return len(created_tables) > 0
+
+
+def insert_test_data(conn):
+    """插入测试数据"""
+    cursor = conn.cursor()
+    print("\n[2/3] 插入测试数据...")
+
+    # 插入销售数据 (250条)
+    sales_data = []
+    regions = ["华东", "华南", "华北", "西南", "西北"]
+    statuses = ["已完成", "处理中", "已取消"]
+    products = ["笔记本电脑", "台式机", "显示器", "键盘", "鼠标", "耳机", "音响", "摄像头"]
+
+    for i in range(250):
+        order_date = datetime(2024, 1, 1).replace(day=(i % 28) + 1, month=(i // 28) % 12 + 1)
+        quantity = (i % 10) + 1
+        unit_price = round(100.0 + (i % 5000), 2)
+        total_amount = quantity * unit_price
+
+        sales_data.append((
+            f"ORD{10000 + i}",
+            f"客户{chr(65 + (i % 26))}{i}",
+            products[i % len(products)],
+            quantity,
+            unit_price,
+            total_amount,
+            order_date,
+            regions[i % len(regions)],
+            statuses[i % len(statuses)],
+        ))
+
+    cursor.executemany("""
+        INSERT INTO test_sales_data 
+        (order_id, customer_name, product_name, quantity, unit_price, 
+         total_amount, order_date, region, status)
+        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
+    """, sales_data)
+
+    # 插入用户统计数据 (200条)
+    user_data = []
+    levels = ["普通", "银卡", "金卡", "钻石"]
+    for i in range(200):
+        reg_date = datetime(2024, 1, 1).replace(day=(i % 28) + 1, month=(i // 28) % 12 + 1)
+        login_date = reg_date.replace(day=(reg_date.day + (i % 10)) % 28 + 1)
+
+        user_data.append((
+            f"USER{1000 + i}",
+            f"user{i}",
+            f"user{i}@example.com",
+            reg_date,
+            login_date,
+            (i % 50) + 1,
+            round(1000.0 + (i % 50000), 2),
+            levels[i % len(levels)],
+            (i % 10) != 0,
+        ))
+
+    cursor.executemany("""
+        INSERT INTO test_user_statistics 
+        (user_id, username, email, registration_date, last_login_date,
+         total_orders, total_amount, user_level, is_active)
+        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
+    """, user_data)
+
+    # 插入产品库存数据 (150条)
+    inventory_data = []
+    categories = ["电子产品", "办公用品", "家具", "服装", "食品"]
+    suppliers = ["供应商A", "供应商B", "供应商C", "供应商D"]
+
+    for i in range(150):
+        current_stock = (i % 500) + 10
+        min_stock = 50
+        max_stock = 1000
+
+        inventory_data.append((
+            f"PROD{10000 + i}",
+            f"产品{i}",
+            categories[i % len(categories)],
+            current_stock,
+            min_stock,
+            max_stock,
+            round(50.0 + (i % 500), 2),
+            suppliers[i % len(suppliers)],
+            datetime(2024, 1, 1).replace(day=(i % 28) + 1),
+            "正常" if current_stock > min_stock else "缺货",
+        ))
+
+    cursor.executemany("""
+        INSERT INTO test_product_inventory 
+        (product_code, product_name, category, current_stock, min_stock, 
+         max_stock, unit_price, supplier, last_restock_date, status)
+        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+    """, inventory_data)
+
+    conn.commit()
+    print(f"[成功] 测试数据插入完成:")
+    print(f"  - test_sales_data: 250 条")
+    print(f"  - test_user_statistics: 200 条")
+    print(f"  - test_product_inventory: 150 条")
+
+
+def register_data_products(conn):
+    """注册数据产品到 data_products 表"""
+    cursor = conn.cursor()
+    print("\n[3/3] 注册数据产品...")
+
+    products = [
+        {
+            "product_name": "销售数据分析",
+            "product_name_en": "test_sales_data",
+            "target_table": "test_sales_data",
+            "target_schema": "public",
+            "description": "销售订单数据分析,包含订单详情、客户信息、产品信息等",
+            "source_dataflow_id": 1001,
+            "source_dataflow_name": "销售数据加工流程",
+        },
+        {
+            "product_name": "用户行为统计",
+            "product_name_en": "test_user_statistics",
+            "target_table": "test_user_statistics",
+            "target_schema": "public",
+            "description": "用户注册、登录、订单统计等行为数据分析",
+            "source_dataflow_id": 1002,
+            "source_dataflow_name": "用户数据加工流程",
+        },
+        {
+            "product_name": "产品库存管理",
+            "product_name_en": "test_product_inventory",
+            "target_table": "test_product_inventory",
+            "target_schema": "public",
+            "description": "产品库存信息,包括库存数量、价格、供应商等信息",
+            "source_dataflow_id": 1003,
+            "source_dataflow_name": "库存数据加工流程",
+        },
+    ]
+
+    for product in products:
+        table_name = product['target_table']
+        
+        # 直接尝试查询表(更可靠的方式)
+        try:
+            cursor.execute(f'SELECT COUNT(*) FROM "{table_name}"')
+        except Exception as e:
+            print(f"  [跳过] 表 {table_name} 不存在或无法访问: {e}")
+            continue
+
+        # 获取表的记录数和列数
+        record_count = cursor.fetchone()[0]
+
+        cursor.execute("""
+            SELECT COUNT(*) 
+            FROM information_schema.columns
+            WHERE table_schema = %s AND table_name = %s
+        """, (product['target_schema'], product['target_table']))
+        column_count = cursor.fetchone()[0]
+
+        # 检查是否已存在
+        cursor.execute("""
+            SELECT id FROM data_products
+            WHERE target_schema = %s AND target_table = %s
+        """, (product['target_schema'], product['target_table']))
+
+        existing = cursor.fetchone()
+
+        if existing:
+            cursor.execute("""
+                UPDATE data_products SET
+                    product_name = %s,
+                    product_name_en = %s,
+                    description = %s,
+                    source_dataflow_id = %s,
+                    source_dataflow_name = %s,
+                    record_count = %s,
+                    column_count = %s,
+                    last_updated_at = CURRENT_TIMESTAMP,
+                    updated_at = CURRENT_TIMESTAMP
+                WHERE id = %s
+            """, (
+                product['product_name'],
+                product['product_name_en'],
+                product['description'],
+                product['source_dataflow_id'],
+                product['source_dataflow_name'],
+                record_count,
+                column_count,
+                existing[0],
+            ))
+            print(f"  [更新] {product['product_name']} (ID: {existing[0]})")
+        else:
+            cursor.execute("""
+                INSERT INTO data_products 
+                (product_name, product_name_en, description, source_dataflow_id,
+                 source_dataflow_name, target_table, target_schema, record_count,
+                 column_count, last_updated_at, created_by, status)
+                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, 'test_script', 'active')
+                RETURNING id
+            """, (
+                product['product_name'],
+                product['product_name_en'],
+                product['description'],
+                product['source_dataflow_id'],
+                product['source_dataflow_name'],
+                product['target_table'],
+                product['target_schema'],
+                record_count,
+                column_count,
+            ))
+            product_id = cursor.fetchone()[0]
+            print(f"  [创建] {product['product_name']} (ID: {product_id}, 记录数: {record_count})")
+
+    conn.commit()
+    print("[成功] 数据产品注册完成")
+
+
+def main():
+    """主函数"""
+    print("=" * 60)
+    print("准备数据服务功能测试数据")
+    print("=" * 60)
+
+    env = os.environ.get("FLASK_ENV", "production")
+    print(f"\n当前环境: {env}")
+
+    if env != "production":
+        response = input("\n警告: 当前不是生产环境,是否继续?(yes/no): ")
+        if response.lower() != "yes":
+            print("已取消操作")
+            return
+
+    try:
+        conn = get_db_connection()
+        conn.autocommit = False
+
+        try:
+            # 尝试创建表
+            create_test_tables(conn)
+            # 直接尝试插入数据(如果表已存在或创建成功,都能继续)
+            try:
+                insert_test_data(conn)
+                register_data_products(conn)
+            except Exception as e:
+                print(f"\n[错误] 插入数据或注册产品失败: {e}")
+                print("  可能原因: 表不存在或数据已存在")
+                raise
+                
+                print("\n" + "=" * 60)
+                print("[完成] 测试数据准备完成!")
+                print("=" * 60)
+                print("\n可以开始测试以下 API 接口:")
+                print("  1. GET  /api/dataservice/products - 获取数据产品列表")
+                print("  2. GET  /api/dataservice/products/{id} - 获取产品详情")
+                print("  3. GET  /api/dataservice/products/{id}/preview - 获取数据预览")
+                print("  4. GET  /api/dataservice/products/{id}/download - 下载Excel")
+                print("  5. POST /api/dataservice/products/{id}/viewed - 标记已查看")
+                print("  6. POST /api/dataservice/products/{id}/refresh - 刷新统计信息")
+            else:
+                print("\n[警告] 表创建可能失败,但将继续尝试插入数据和注册产品")
+
+        finally:
+            conn.close()
+
+    except Exception as e:
+        print(f"\n[错误] 操作失败: {e}")
+        import traceback
+        traceback.print_exc()
+        sys.exit(1)
+
+
+if __name__ == "__main__":
+    main()
+

+ 275 - 0
scripts/test_data_service_api.py

@@ -0,0 +1,275 @@
+"""
+测试数据服务 API 接口
+用于验证 data_service 功能是否正常工作
+"""
+
+import json
+import sys
+from pathlib import Path
+
+# 设置控制台编码为 UTF-8(Windows)
+if sys.platform == "win32":
+    import io
+
+    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
+    sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
+
+import requests
+
+# 添加项目根目录到路径
+project_root = Path(__file__).parent.parent
+sys.path.insert(0, str(project_root))
+
+# 生产环境 API 地址
+BASE_URL = "https://company.citupro.com:18183/api/dataservice"
+
+
+def test_get_products():
+    """测试获取数据产品列表"""
+    print("\n[测试 1] 获取数据产品列表")
+    print("-" * 60)
+
+    url = f"{BASE_URL}/products"
+    params = {"page": 1, "page_size": 20}
+
+    try:
+        response = requests.get(url, params=params, timeout=10)
+        response.raise_for_status()
+
+        data = response.json()
+        print(f"状态码: {response.status_code}")
+        print(f"响应: {json.dumps(data, ensure_ascii=False, indent=2)}")
+
+        if data.get("code") == 200:
+            products = data.get("data", {}).get("list", [])
+            print(f"\n[成功] 成功获取 {len(products)} 个数据产品")
+            return products
+        else:
+            print(f"[失败] {data.get('message')}")
+            return []
+
+    except Exception as e:
+        print(f"[错误] 请求失败: {e}")
+        return []
+
+
+def test_get_product_detail(product_id: int):
+    """测试获取数据产品详情"""
+    print(f"\n[测试 2] 获取数据产品详情 (ID: {product_id})")
+    print("-" * 60)
+
+    url = f"{BASE_URL}/products/{product_id}"
+
+    try:
+        response = requests.get(url, timeout=10)
+        response.raise_for_status()
+
+        data = response.json()
+        print(f"状态码: {response.status_code}")
+        print(f"响应: {json.dumps(data, ensure_ascii=False, indent=2)}")
+
+        if data.get("code") == 200:
+            product = data.get("data", {})
+            print(f"\n[成功] 成功获取产品: {product.get('product_name')}")
+            return product
+        else:
+            print(f"[失败] {data.get('message')}")
+            return None
+
+    except Exception as e:
+        print(f"[错误] 请求失败: {e}")
+        return None
+
+
+def test_get_product_preview(product_id: int, limit: int = 10):
+    """测试获取数据预览"""
+    print(f"\n[测试 3] 获取数据预览 (ID: {product_id}, 限制: {limit} 条)")
+    print("-" * 60)
+
+    url = f"{BASE_URL}/products/{product_id}/preview"
+    params = {"limit": limit}
+
+    try:
+        response = requests.get(url, params=params, timeout=30)
+        response.raise_for_status()
+
+        data = response.json()
+        print(f"状态码: {response.status_code}")
+
+        if data.get("code") == 200:
+            preview_data = data.get("data", {})
+            columns = preview_data.get("columns", [])
+            rows = preview_data.get("data", [])
+            total_count = preview_data.get("total_count", 0)
+
+            print(f"\n[成功] 成功获取数据预览")
+            print(f"  总记录数: {total_count}")
+            print(f"  预览条数: {len(rows)}")
+            print(f"  列数: {len(columns)}")
+            print(f"\n  列信息:")
+            for col in columns[:5]:  # 只显示前5列
+                print(f"    - {col['name']} ({col['type']})")
+
+            if rows:
+                print(f"\n  前3条数据示例:")
+                for i, row in enumerate(rows[:3], 1):
+                    row_str = ", ".join(
+                        f"{k}={v}" for k, v in list(row.items())[:3]
+                    )
+                    print(f"    {i}. {row_str}")
+
+            return preview_data
+        else:
+            print(f"[失败] {data.get('message')}")
+            return None
+
+    except Exception as e:
+        print(f"[错误] 请求失败: {e}")
+        return None
+
+
+def test_download_excel(product_id: int):
+    """测试下载 Excel 文件"""
+    print(f"\n[测试 4] 下载 Excel 文件 (ID: {product_id})")
+    print("-" * 60)
+
+    url = f"{BASE_URL}/products/{product_id}/download"
+    params = {"limit": 50}
+
+    try:
+        response = requests.get(url, params=params, timeout=60, stream=True)
+        response.raise_for_status()
+
+        if response.headers.get("content-type", "").startswith(
+            "application/vnd.openxmlformats"
+        ):
+            filename = response.headers.get(
+                "content-disposition", ""
+            ).split("filename=")[-1].strip('"')
+
+            # 保存文件
+            output_dir = project_root / "test_output"
+            output_dir.mkdir(exist_ok=True)
+            filepath = output_dir / filename
+
+            with open(filepath, "wb") as f:
+                for chunk in response.iter_content(chunk_size=8192):
+                    f.write(chunk)
+
+            print(f"\n[成功] 成功下载 Excel 文件")
+            print(f"  文件名: {filename}")
+            print(f"  保存路径: {filepath}")
+            print(f"  文件大小: {filepath.stat().st_size} 字节")
+            return True
+        else:
+            print(f"[失败] 响应不是 Excel 文件")
+            print(f"  Content-Type: {response.headers.get('content-type')}")
+            return False
+
+    except Exception as e:
+        print(f"[错误] 请求失败: {e}")
+        return False
+
+
+def test_mark_as_viewed(product_id: int):
+    """测试标记为已查看"""
+    print(f"\n[测试 5] 标记为已查看 (ID: {product_id})")
+    print("-" * 60)
+
+    url = f"{BASE_URL}/products/{product_id}/viewed"
+
+    try:
+        response = requests.post(url, timeout=10)
+        response.raise_for_status()
+
+        data = response.json()
+        print(f"状态码: {response.status_code}")
+        print(f"响应: {json.dumps(data, ensure_ascii=False, indent=2)}")
+
+        if data.get("code") == 200:
+            product = data.get("data", {})
+            has_new_data = product.get("has_new_data", False)
+            print(f"\n[成功] 标记成功")
+            print(f"  是否有新数据: {has_new_data}")
+            return True
+        else:
+            print(f"[失败] {data.get('message')}")
+            return False
+
+    except Exception as e:
+        print(f"[错误] 请求失败: {e}")
+        return False
+
+
+def test_refresh_stats(product_id: int):
+    """测试刷新统计信息"""
+    print(f"\n[测试 6] 刷新统计信息 (ID: {product_id})")
+    print("-" * 60)
+
+    url = f"{BASE_URL}/products/{product_id}/refresh"
+
+    try:
+        response = requests.post(url, timeout=30)
+        response.raise_for_status()
+
+        data = response.json()
+        print(f"状态码: {response.status_code}")
+
+        if data.get("code") == 200:
+            product = data.get("data", {})
+            print(f"\n[成功] 刷新成功")
+            print(f"  记录数: {product.get('record_count')}")
+            print(f"  列数: {product.get('column_count')}")
+            return True
+        else:
+            print(f"[失败] {data.get('message')}")
+            return False
+
+    except Exception as e:
+        print(f"[错误] 请求失败: {e}")
+        return False
+
+
+def main():
+    """主函数"""
+    print("=" * 60)
+    print("数据服务 API 接口测试")
+    print("=" * 60)
+    print(f"\nAPI 地址: {BASE_URL}")
+
+    # 测试1: 获取产品列表
+    products = test_get_products()
+
+    if not products:
+        print("\n[错误] 无法获取数据产品列表,测试终止")
+        return
+
+    # 选择第一个产品进行详细测试
+    if products:
+        product = products[0]
+        product_id = product.get("id")
+
+        if product_id:
+            # 测试2: 获取产品详情
+            test_get_product_detail(product_id)
+
+            # 测试3: 获取数据预览
+            test_get_product_preview(product_id, limit=10)
+
+            # 测试4: 下载 Excel (可选,可能会比较慢)
+            # test_download_excel(product_id)
+
+            # 测试5: 标记为已查看
+            test_mark_as_viewed(product_id)
+
+            # 测试6: 刷新统计信息
+            test_refresh_stats(product_id)
+
+    print("\n" + "=" * 60)
+    print("测试完成")
+    print("=" * 60)
+
+
+if __name__ == "__main__":
+    main()
+

Vissa filer visades inte eftersom för många filer har ändrats