Kaynağa Gözat

优化Business Domain的DDLparse功能,扩展支持到SQL,Excel,word,PDF文件。

maxiaolong 3 gün önce
ebeveyn
işleme
33d147ea03

+ 53 - 0
.cursorignore

@@ -1 +1,54 @@
 # Add directories or file patterns to ignore during indexing (e.g. foo/ or *.csv)
+# DataOps Platform - Cursor 上下文忽略配置
+
+# ==================== 缓存目录 ====================
+.mypy_cache/
+__pycache__/
+.pytest_cache/
+.ruff_cache/
+*.pyc
+*.pyo
+
+# ==================== 依赖目录 ====================
+node_modules/
+vendor/
+.venv/
+venv/
+env/
+
+# ==================== 构建输出 ====================
+dist/
+build/
+*.min.js
+*.egg-info/
+
+# ==================== 大型数据文件 ====================
+*.csv
+*.json.gz
+*.xlsx
+*.xls
+datasets/
+
+# ==================== 日志文件 ====================
+logs/
+*.log
+flask_*.log
+
+# ==================== IDE 和编辑器 ====================
+.idea/
+.vscode/
+*.swp
+*.swo
+
+# ==================== Git ====================
+.git/
+
+# ==================== 自动生成的代码 ====================
+src/generated/
+*.g.ts
+
+# ==================== 其他 ====================
+.env
+.env.local
+*.bak
+*.tmp

+ 143 - 91
app/api/business_domain/routes.py

@@ -302,108 +302,160 @@ def bd_graph_all():
         return jsonify(failed("获取业务领域图谱失败", error=str(e)))
 
 
+def _get_file_extension(filename: str) -> str:
+    """获取文件扩展名(小写)"""
+    if "." not in filename:
+        return ""
+    return filename.rsplit(".", 1)[1].lower()
+
+
+def _check_table_existence(table_list: list) -> list:
+    """
+    检查表在 Neo4j 中的存在状态
+
+    Args:
+        table_list: 表信息列表
+
+    Returns:
+        更新了 exist 字段的表信息列表
+    """
+    table_names = []
+    for table_item in table_list:
+        if isinstance(table_item, dict) and "table_info" in table_item:
+            table_name = table_item["table_info"].get("name_en")
+            if table_name:
+                table_names.append(table_name)
+
+    # 初始化 exist 字段
+    for table_item in table_list:
+        if isinstance(table_item, dict):
+            table_item["exist"] = False
+
+    if table_names:
+        try:
+            with neo4j_driver.get_session() as session:
+                table_query = """
+                UNWIND $names AS name
+                OPTIONAL MATCH (n:BusinessDomain {name_en: name})
+                RETURN name, n IS NOT NULL AS exists
+                """
+                table_results = session.run(table_query, names=table_names)
+
+                exist_map = {}
+                for record in table_results:
+                    t_name = record["name"]
+                    exists = record["exists"]
+                    exist_map[t_name] = exists
+
+                for table_item in table_list:
+                    if isinstance(table_item, dict) and "table_info" in table_item:
+                        info = table_item["table_info"]
+                        t_name = info.get("name_en")
+                        if t_name and t_name in exist_map:
+                            table_item["exist"] = exist_map[t_name]
+        except Exception as e:
+            logger.error(f"检查业务领域存在状态失败: {str(e)}")
+
+    return table_list
+
+
+# 支持的文件类型
+ALLOWED_DDL_EXTENSIONS = {"sql", "xlsx", "xls", "docx", "doc", "pdf"}
+
+
 @bp.route("/ddlparse", methods=["POST"])
 def bd_ddl_parse():
-    """解析DDL语句,用于业务领域创建"""
+    """
+    解析文件内容,提取数据表定义信息
+
+    支持的文件类型:
+    - SQL文件 (.sql): 解析DDL建表语句
+    - Excel文件 (.xlsx, .xls): 解析表格中的表结构定义
+    - Word文件 (.docx, .doc): 解析文档中的表结构定义
+    - PDF文件 (.pdf): 解析PDF中的表结构定义
+
+    返回:
+        JSON数组格式的表结构信息
+    """
     try:
-        sql_content = ""
+        if "file" not in request.files:
+            return jsonify(failed("没有找到上传的文件,请上传一个文件"))
 
-        if "file" in request.files:
-            file = request.files["file"]
-            if file and file.filename:
-                if not file.filename.lower().endswith(".sql"):
-                    return jsonify(failed("只接受SQL文件"))
-                sql_content = file.read().decode("utf-8")
-                logger.info(f"从上传的文件中读取SQL内容,文件名: {file.filename}")
-        elif request.is_json and request.json:
-            sql_content = request.json.get("sql", "")
+        file = request.files["file"]
+        if not file or not file.filename:
+            return jsonify(failed("未选择文件"))
 
-        if not sql_content:
-            return jsonify(failed("SQL内容不能为空,请上传SQL文件或提供SQL内容"))
+        filename = file.filename
+        file_ext = _get_file_extension(filename)
+
+        if file_ext not in ALLOWED_DDL_EXTENSIONS:
+            return jsonify(
+                failed(
+                    f"不支持的文件类型: .{file_ext},"
+                    f"支持的类型: {', '.join('.' + ext for ext in ALLOWED_DDL_EXTENSIONS)}"
+                )
+            )
+
+        file_content = file.read()
+        logger.info(f"接收到文件上传,文件名: {filename}, 类型: {file_ext}")
 
         parser = DDLParser()
-        ddl_list = parser.parse_ddl(sql_content)
+        ddl_list = []
+
+        # 根据文件类型选择不同的解析方法
+        if file_ext == "sql":
+            # SQL 文件直接解析 DDL
+            sql_content = file_content.decode("utf-8")
+            ddl_list = parser.parse_ddl(sql_content)
+
+        elif file_ext in {"xlsx", "xls"}:
+            # Excel 文件解析
+            ddl_list = parser.parse_excel_content(file_content)
+
+        elif file_ext in {"docx", "doc"}:
+            # Word 文件解析
+            if file_ext == "doc":
+                return jsonify(
+                    failed("暂不支持 .doc 格式,请转换为 .docx 格式后重新上传")
+                )
+            ddl_list = parser.parse_word_content(file_content)
+
+        elif file_ext == "pdf":
+            # PDF 文件解析
+            ddl_list = parser.parse_pdf_content(file_content)
+
+        # 验证解析结果
         if not ddl_list:
-            return jsonify(failed("未找到有效的CREATE TABLE语句"))
-
-        if isinstance(ddl_list, list):
-            table_names = []
-            for table_item in ddl_list:
-                if isinstance(table_item, dict) and "table_info" in table_item:
-                    table_name = table_item["table_info"].get("name_en")
-                    if table_name:
-                        table_names.append(table_name)
-
-            for table_item in ddl_list:
-                if isinstance(table_item, dict):
-                    table_item["exist"] = False
-
-            if table_names:
-                try:
-                    with neo4j_driver.get_session() as session:
-                        table_query = """
-                        UNWIND $names AS name
-                        OPTIONAL MATCH (n:BusinessDomain {name_en: name})
-                        RETURN name, n IS NOT NULL AS exists
-                        """
-                        table_results = session.run(table_query, names=table_names)
-
-                        exist_map = {}
-                        for record in table_results:
-                            t_name = record["name"]
-                            exists = record["exists"]
-                            exist_map[t_name] = exists
-
-                        for table_item in ddl_list:
-                            if (
-                                isinstance(table_item, dict)
-                                and "table_info" in table_item
-                            ):
-                                info = table_item["table_info"]
-                                t_name = info.get("name_en")
-                                if t_name and t_name in exist_map:
-                                    table_item["exist"] = exist_map[t_name]
-                except Exception as e:
-                    logger.error(f"检查业务领域存在状态失败: {str(e)}")
-
-        elif isinstance(ddl_list, dict):
-            table_names = list(ddl_list.keys())
-            for table_name in table_names:
-                if isinstance(ddl_list[table_name], dict):
-                    ddl_list[table_name]["exist"] = False
-                else:
-                    logger.warning(
-                        f"表 {table_name} 的值不是字典类型: "
-                        f"{type(ddl_list[table_name])}"
-                    )
-
-            if table_names:
-                try:
-                    with neo4j_driver.get_session() as session:
-                        table_query = """
-                        UNWIND $names AS name
-                        OPTIONAL MATCH (n:BusinessDomain {name_en: name})
-                        RETURN name, n IS NOT NULL AS exists
-                        """
-                        table_results = session.run(table_query, names=table_names)
-
-                        for record in table_results:
-                            t_name = record["name"]
-                            exists = record["exists"]
-                            is_valid = t_name in ddl_list and isinstance(
-                                ddl_list[t_name], dict
-                            )
-                            if is_valid:
-                                ddl_list[t_name]["exist"] = exists
-                except Exception as e:
-                    logger.error(f"检查业务领域存在状态失败: {str(e)}")
-
-        logger.debug(f"识别到的DDL语句: {json.dumps(ddl_list, ensure_ascii=False)}")
+            return jsonify(failed("未找到有效的数据表定义信息"))
+
+        # 确保结果是列表格式
+        if isinstance(ddl_list, dict):
+            if "table_info" in ddl_list:
+                ddl_list = [ddl_list]
+            else:
+                # 兼容旧格式(字典形式的多表)
+                table_names = list(ddl_list.keys())
+                converted_list = []
+                for table_name in table_names:
+                    table_data = ddl_list[table_name]
+                    if isinstance(table_data, dict):
+                        table_data["exist"] = False
+                        converted_list.append(table_data)
+                ddl_list = converted_list
+
+        # 检查表在 Neo4j 中的存在状态
+        ddl_list = _check_table_existence(ddl_list)
+
+        logger.debug(f"识别到的数据表: {json.dumps(ddl_list, ensure_ascii=False)}")
         return jsonify(success(ddl_list))
+
+    except ValueError as e:
+        logger.error(f"文件解析失败: {str(e)}")
+        return jsonify(failed(str(e)))
     except Exception as e:
-        logger.error(f"解析DDL语句失败: {str(e)}")
+        logger.error(f"解析文件失败: {str(e)}")
         logger.error(traceback.format_exc())
-        return jsonify(failed("解析DDL语句失败", error=str(e)))
+        return jsonify(failed("解析文件失败", error=str(e)))
 
 
 @bp.route("/search", methods=["POST"])

+ 2 - 0
app/config/config.py

@@ -48,6 +48,8 @@ class BaseConfig:
         "csv",
         "sql",
         "dll",
+        "docx",
+        "doc",
     }
 
     # 数据抽取配置

+ 247 - 0
app/core/llm/ddl_parser.py

@@ -1,7 +1,11 @@
+from __future__ import annotations
+
+import io
 import json
 import logging
 import re
 import time
+from typing import Any
 
 import requests
 from flask import current_app
@@ -433,3 +437,246 @@ class DDLParser:
         except Exception as e:
             logger.error(f"LLM 验证数据库连接字符串失败: {str(e)}")
             return "failure"
+
+    def parse_excel_content(self, file_content: bytes) -> list[dict[str, Any]]:
+        """
+        解析 Excel 文件内容,提取数据表定义信息
+
+        Args:
+            file_content: Excel 文件的二进制内容
+
+        Returns:
+            解析后的表结构列表
+        """
+        try:
+            import pandas as pd
+
+            # 读取 Excel 文件的所有 sheet
+            excel_file = io.BytesIO(file_content)
+            xl = pd.ExcelFile(excel_file)
+
+            # 将所有 sheet 的内容转换为文本
+            all_content = []
+            for sheet_name in xl.sheet_names:
+                df = pd.read_excel(xl, sheet_name=sheet_name)
+                # 将 DataFrame 转换为 markdown 表格格式
+                sheet_content = f"## Sheet: {sheet_name}\n"
+                sheet_content += df.to_markdown(index=False)
+                all_content.append(sheet_content)
+
+            combined_content = "\n\n".join(all_content)
+            logger.info(f"Excel 文件解析完成,共 {len(xl.sheet_names)} 个 sheet")
+
+            # 使用 LLM 解析表结构
+            return self._parse_document_content(combined_content, "Excel")
+
+        except Exception as e:
+            logger.error(f"Excel 文件解析失败: {str(e)}")
+            raise ValueError(f"Excel 文件解析失败: {str(e)}") from e
+
+    def parse_word_content(self, file_content: bytes) -> list[dict[str, Any]]:
+        """
+        解析 Word 文件内容,提取数据表定义信息
+
+        Args:
+            file_content: Word 文件的二进制内容
+
+        Returns:
+            解析后的表结构列表
+        """
+        try:
+            from docx import Document
+
+            # 读取 Word 文件
+            doc = Document(io.BytesIO(file_content))
+
+            # 提取所有段落文本
+            paragraphs = [para.text for para in doc.paragraphs if para.text.strip()]
+
+            # 提取所有表格
+            tables_content = []
+            for table_idx, table in enumerate(doc.tables):
+                table_text = f"\n### 表格 {table_idx + 1}:\n"
+                for row in table.rows:
+                    row_text = " | ".join(cell.text.strip() for cell in row.cells)
+                    table_text += row_text + "\n"
+                tables_content.append(table_text)
+
+            # 组合内容
+            combined_content = "\n".join(paragraphs)
+            if tables_content:
+                combined_content += "\n\n## 文档中的表格:\n" + "\n".join(tables_content)
+
+            logger.info(
+                f"Word 文件解析完成,共 {len(paragraphs)} 个段落,{len(doc.tables)} 个表格"
+            )
+
+            # 使用 LLM 解析表结构
+            return self._parse_document_content(combined_content, "Word")
+
+        except Exception as e:
+            logger.error(f"Word 文件解析失败: {str(e)}")
+            raise ValueError(f"Word 文件解析失败: {str(e)}") from e
+
+    def parse_pdf_content(self, file_content: bytes) -> list[dict[str, Any]]:
+        """
+        解析 PDF 文件内容,提取数据表定义信息
+
+        Args:
+            file_content: PDF 文件的二进制内容
+
+        Returns:
+            解析后的表结构列表
+        """
+        try:
+            import pdfplumber
+
+            # 读取 PDF 文件
+            pdf = pdfplumber.open(io.BytesIO(file_content))
+
+            all_content = []
+            for page_num, page in enumerate(pdf.pages):
+                page_text = f"## 第 {page_num + 1} 页:\n"
+
+                # 提取页面文本
+                text = page.extract_text()
+                if text:
+                    page_text += text + "\n"
+
+                # 提取页面中的表格
+                tables = page.extract_tables()
+                for table_idx, table in enumerate(tables):
+                    page_text += f"\n### 表格 {table_idx + 1}:\n"
+                    for row in table:
+                        row_text = " | ".join(str(cell) if cell else "" for cell in row)
+                        page_text += row_text + "\n"
+
+                all_content.append(page_text)
+
+            pdf.close()
+
+            combined_content = "\n\n".join(all_content)
+            logger.info(f"PDF 文件解析完成,共 {len(pdf.pages)} 页")
+
+            # 使用 LLM 解析表结构
+            return self._parse_document_content(combined_content, "PDF")
+
+        except Exception as e:
+            logger.error(f"PDF 文件解析失败: {str(e)}")
+            raise ValueError(f"PDF 文件解析失败: {str(e)}") from e
+
+    def _parse_document_content(
+        self, content: str, file_type: str
+    ) -> list[dict[str, Any]]:
+        """
+        使用 LLM 解析文档内容,提取数据表定义信息
+
+        Args:
+            content: 文档的文本内容
+            file_type: 文件类型(用于日志记录)
+
+        Returns:
+            解析后的表结构列表
+        """
+        prompt = self._get_document_parse_prompt()
+        payload = {
+            "model": self.model_name,
+            "messages": [
+                {
+                    "role": "system",
+                    "content": "你是一个专业的数据表结构解析专家,擅长从各种文档中识别和提取数据表定义信息并转换为结构化的JSON格式。",
+                },
+                {"role": "user", "content": f"{prompt}\n\n{content}"},
+            ],
+        }
+
+        try:
+            result = self._make_llm_request(payload, f"{file_type}文档解析")
+
+            if not result:
+                raise ValueError(f"API请求失败: 在{self.max_retries}次尝试后仍然失败")
+
+            if "choices" in result and len(result["choices"]) > 0:
+                response_content = result["choices"][0]["message"]["content"]
+
+                try:
+                    json_match = re.search(
+                        r"```json\s*([\s\S]*?)\s*```", response_content
+                    )
+                    if json_match:
+                        json_content = json_match.group(1)
+                    else:
+                        json_content = response_content
+
+                    parsed_result = json.loads(json_content)
+
+                    # 确保返回的是列表格式
+                    if isinstance(parsed_result, dict):
+                        parsed_result = [parsed_result]
+
+                    return parsed_result
+
+                except json.JSONDecodeError as e:
+                    raise ValueError(f"无法解析返回的JSON: {str(e)}") from e
+
+            raise ValueError("无法获取有效响应")
+
+        except Exception as e:
+            logger.error(f"{file_type}文档解析异常: {str(e)}")
+            raise
+
+    def _get_document_parse_prompt(self) -> str:
+        """返回文档解析的提示词模板"""
+        return """
+请从以下文档内容中识别并提取所有数据表的定义信息,按照指定的JSON格式返回结果。
+
+规则说明:
+1. 仔细阅读文档内容,识别所有描述数据表结构的部分。
+2. 一个文档可能包含一个或多个数据表的定义,请将所有表放在一个JSON数组中返回。
+3. 表的英文名称(name_en):
+   - 如果文档中有英文表名,使用原始大小写
+   - 如果没有英文名,尝试根据中文名翻译或生成合适的英文名
+4. 表的中文名称(name_zh):
+   - 从文档中提取表的中文名称或描述
+   - 如果没有明确的中文名,根据内容推断
+5. 对于每个表,提取所有字段信息到columns数组中,每个字段包含:
+   - name_zh: 字段中文名称
+   - name_en: 字段英文名称(如果没有,根据中文名翻译)
+   - data_type: 数据类型(如VARCHAR(255)、INTEGER、DATE等,如果文档未指定则根据字段用途推断)
+   - is_primary: 是否主键("是"或"否")
+   - comment: 字段说明或注释
+   - nullable: 是否可为空("是"或"否",如果文档未指定默认为"是")
+
+6. 返回格式(必须是JSON数组):
+[
+    {
+        "table_info": {
+            "name_zh": "用户信息表",
+            "name_en": "user_info"
+        },
+        "columns": [
+            {
+                "name_zh": "用户ID",
+                "name_en": "user_id",
+                "data_type": "INTEGER",
+                "is_primary": "是",
+                "comment": "用户唯一标识",
+                "nullable": "否"
+            },
+            {
+                "name_zh": "用户名",
+                "name_en": "username",
+                "data_type": "VARCHAR(50)",
+                "is_primary": "否",
+                "comment": "用户登录名",
+                "nullable": "否"
+            }
+        ]
+    }
+]
+
+注意:
+- 即使只识别到一个表,也必须返回数组格式:[{table_info: {...}, columns: [...]}]
+- 如果文档中没有找到任何数据表定义,返回空数组:[]
+- 请仅返回JSON格式结果,不要包含任何其他解释文字。
+"""

+ 7 - 0
cursorssh-143

@@ -0,0 +1,7 @@
+-----BEGIN OPENSSH PRIVATE KEY-----
+b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
+QyNTUxOQAAACBdKkJA6GRwrZ3t4GnWe+NKh3+k77JbCMMDgWtnDcqi+gAAAJD6RTuQ+kU7
+kAAAAAtzc2gtZWQyNTUxOQAAACBdKkJA6GRwrZ3t4GnWe+NKh3+k77JbCMMDgWtnDcqi+g
+AAAEC+8ITt6SnbSzJOryOnHB9GJEC9sTPWMNl69xWFBHX3f10qQkDoZHCtne3gadZ740qH
+f6TvslsIwwOBa2cNyqL6AAAABmN1cnNvcgECAwQFBgc=
+-----END OPENSSH PRIVATE KEY-----

+ 1 - 0
cursorssh-143.pub

@@ -0,0 +1 @@
+ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIF0qQkDoZHCtne3gadZ740qHf6TvslsIwwOBa2cNyqL6 cursor

+ 6 - 0
requirements.txt

@@ -22,6 +22,12 @@ python-dotenv==1.0.0
 requests==2.31.0
 pandas==2.0.3
 numpy==1.24.3
+openpyxl==3.1.5
+tabulate==0.9.0
+
+# 文档解析
+python-docx==1.1.2
+pdfplumber==0.11.4
 
 # 系统监控
 psutil==5.9.8

+ 28 - 0
scripts/check_columns.py

@@ -0,0 +1,28 @@
+"""Check table columns"""
+
+import psycopg2
+
+conn = psycopg2.connect(
+    host="192.168.3.143",
+    port=5432,
+    database="dataops",
+    user="postgres",
+    password="dataOps",
+)
+cur = conn.cursor()
+
+tables = ["test_sales_data", "test_user_statistics", "test_product_inventory"]
+
+for table in tables:
+    cur.execute(
+        """
+        SELECT column_name FROM information_schema.columns
+        WHERE table_name = %s ORDER BY ordinal_position
+        """,
+        (table,),
+    )
+    columns = [r[0] for r in cur.fetchall()]
+    print(f"{table}: {columns}")
+
+cur.close()
+conn.close()

+ 471 - 0
scripts/cleanup_and_create_test_data.py

@@ -0,0 +1,471 @@
+"""
+清理并创建测试数据表脚本
+删除所有旧的测试表,然后重新创建
+"""
+
+from __future__ import annotations
+
+import random
+from datetime import datetime, timedelta
+from typing import Any
+
+import psycopg2
+from loguru import logger
+
+# 生产环境数据库配置
+DB_CONFIG = {
+    "host": "192.168.3.143",
+    "port": 5432,
+    "database": "dataops",
+    "user": "postgres",
+    "password": "dataOps",
+}
+
+
+def get_connection():
+    """获取数据库连接"""
+    return psycopg2.connect(**DB_CONFIG)
+
+
+def cleanup_all_test_tables(conn) -> None:
+    """清理所有测试表(所有 schema)"""
+    logger.info("Cleaning up all test tables...")
+
+    with conn.cursor() as cur:
+        # 查找所有 schema 中的测试表
+        cur.execute("""
+            SELECT table_schema, table_name
+            FROM information_schema.tables
+            WHERE table_name IN ('test_sales_data', 'test_user_statistics', 'test_product_inventory')
+        """)
+        tables = cur.fetchall()
+
+        for schema, table in tables:
+            logger.info(f"Dropping {schema}.{table}")
+            cur.execute(f'DROP TABLE IF EXISTS "{schema}"."{table}" CASCADE')
+
+        conn.commit()
+        logger.info("Cleanup complete")
+
+
+def create_test_sales_data_table(conn) -> None:
+    """创建销售数据分析表"""
+    logger.info("Creating test_sales_data table...")
+
+    with conn.cursor() as cur:
+        cur.execute("""
+            CREATE TABLE public.test_sales_data (
+                id SERIAL PRIMARY KEY,
+                order_id VARCHAR(50) NOT NULL,
+                order_date DATE NOT NULL,
+                customer_id VARCHAR(50) NOT NULL,
+                customer_name VARCHAR(100),
+                product_id VARCHAR(50) NOT NULL,
+                product_name VARCHAR(200),
+                category VARCHAR(100),
+                quantity INTEGER NOT NULL,
+                unit_price DECIMAL(10, 2) NOT NULL,
+                total_amount DECIMAL(12, 2) NOT NULL,
+                discount_rate DECIMAL(5, 2) DEFAULT 0,
+                payment_method VARCHAR(50),
+                region VARCHAR(100),
+                city VARCHAR(100),
+                status VARCHAR(50) DEFAULT 'completed',
+                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+            )
+        """)
+        cur.execute(
+            "COMMENT ON TABLE public.test_sales_data IS 'Sales data table - test data'"
+        )
+        conn.commit()
+        logger.info("test_sales_data table created")
+
+
+def insert_test_sales_data(conn, num_records: int = 500) -> None:
+    """插入销售测试数据"""
+    logger.info(f"Inserting {num_records} sales records...")
+
+    customers = [
+        ("C001", "Zhang San"),
+        ("C002", "Li Si"),
+        ("C003", "Wang Wu"),
+        ("C004", "Zhao Liu"),
+        ("C005", "Qian Qi"),
+    ]
+
+    products = [
+        ("P001", "iPhone 15 Pro", "Electronics", 7999.00),
+        ("P002", "MacBook Pro", "Computers", 14999.00),
+        ("P003", "AirPods Pro", "Accessories", 1899.00),
+        ("P004", "iPad Pro", "Tablets", 8999.00),
+        ("P005", "Apple Watch", "Wearables", 3299.00),
+    ]
+
+    regions = [
+        ("East", ["Shanghai", "Hangzhou", "Nanjing"]),
+        ("North", ["Beijing", "Tianjin", "Shijiazhuang"]),
+        ("South", ["Guangzhou", "Shenzhen", "Dongguan"]),
+    ]
+
+    payment_methods = ["Alipay", "WeChat Pay", "Bank Card", "Credit Card"]
+    statuses = ["completed", "completed", "completed", "pending", "cancelled"]
+
+    with conn.cursor() as cur:
+        records: list[tuple[Any, ...]] = []
+        base_date = datetime.now() - timedelta(days=180)
+
+        for i in range(num_records):
+            order_id = f"ORD{datetime.now().strftime('%Y%m%d')}{i + 1:05d}"
+            order_date = base_date + timedelta(days=random.randint(0, 180))
+            customer = random.choice(customers)
+            product = random.choice(products)
+            region_data = random.choice(regions)
+            quantity = random.randint(1, 5)
+            unit_price = product[3]
+            discount_rate = random.choice([0, 0, 0, 0.05, 0.10, 0.15])
+            total_amount = round(quantity * unit_price * (1 - discount_rate), 2)
+
+            records.append(
+                (
+                    order_id,
+                    order_date.date(),
+                    customer[0],
+                    customer[1],
+                    product[0],
+                    product[1],
+                    product[2],
+                    quantity,
+                    unit_price,
+                    total_amount,
+                    discount_rate,
+                    random.choice(payment_methods),
+                    region_data[0],
+                    random.choice(region_data[1]),
+                    random.choice(statuses),
+                )
+            )
+
+        cur.executemany(
+            """
+            INSERT INTO public.test_sales_data (
+                order_id, order_date, customer_id, customer_name,
+                product_id, product_name, category, quantity,
+                unit_price, total_amount, discount_rate,
+                payment_method, region, city, status
+            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+            """,
+            records,
+        )
+        conn.commit()
+        logger.info(f"Inserted {num_records} sales records")
+
+
+def create_test_user_statistics_table(conn) -> None:
+    """创建用户行为统计表"""
+    logger.info("Creating test_user_statistics table...")
+
+    with conn.cursor() as cur:
+        cur.execute("""
+            CREATE TABLE public.test_user_statistics (
+                id SERIAL PRIMARY KEY,
+                user_id VARCHAR(50) NOT NULL,
+                username VARCHAR(100),
+                email VARCHAR(200),
+                register_date DATE,
+                last_login_date TIMESTAMP,
+                login_count INTEGER DEFAULT 0,
+                total_orders INTEGER DEFAULT 0,
+                total_amount DECIMAL(12, 2) DEFAULT 0,
+                avg_order_amount DECIMAL(10, 2) DEFAULT 0,
+                favorite_category VARCHAR(100),
+                user_level VARCHAR(50),
+                points INTEGER DEFAULT 0,
+                is_vip BOOLEAN DEFAULT FALSE,
+                device_type VARCHAR(50),
+                platform VARCHAR(50),
+                province VARCHAR(100),
+                city VARCHAR(100),
+                age_group VARCHAR(50),
+                gender VARCHAR(20),
+                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+            )
+        """)
+        cur.execute(
+            "COMMENT ON TABLE public.test_user_statistics IS 'User statistics table - test data'"
+        )
+        conn.commit()
+        logger.info("test_user_statistics table created")
+
+
+def insert_test_user_statistics(conn, num_records: int = 300) -> None:
+    """插入用户统计测试数据"""
+    logger.info(f"Inserting {num_records} user statistics records...")
+
+    names = ["Alice", "Bob", "Charlie", "David", "Eva", "Frank", "Grace", "Henry"]
+    categories = ["Electronics", "Computers", "Home", "Fashion", "Beauty", "Food"]
+    levels = ["Regular", "Silver", "Gold", "Diamond", "VIP"]
+    devices = ["iOS", "Android", "Windows", "macOS", "Web"]
+    platforms = ["App", "Mini Program", "PC Web", "H5"]
+    provinces = ["Beijing", "Shanghai", "Guangdong", "Zhejiang", "Jiangsu"]
+    age_groups = ["18-25", "26-35", "36-45", "46-55", "55+"]
+    genders = ["Male", "Female"]
+
+    with conn.cursor() as cur:
+        records: list[tuple[Any, ...]] = []
+        base_date = datetime.now() - timedelta(days=365)
+
+        for i in range(num_records):
+            user_id = f"U{100000 + i}"
+            name = f"{random.choice(names)}{i}"
+            register_date = base_date + timedelta(days=random.randint(0, 365))
+            last_login = register_date + timedelta(
+                days=random.randint(0, (datetime.now() - register_date).days)
+            )
+            login_count = random.randint(1, 500)
+            total_orders = random.randint(0, 100)
+            total_amount = round(random.uniform(0, 50000), 2) if total_orders > 0 else 0
+            avg_amount = (
+                round(total_amount / total_orders, 2) if total_orders > 0 else 0
+            )
+            points = random.randint(0, 10000)
+            is_vip = points > 5000
+
+            records.append(
+                (
+                    user_id,
+                    name,
+                    f"{user_id.lower()}@example.com",
+                    register_date.date(),
+                    last_login,
+                    login_count,
+                    total_orders,
+                    total_amount,
+                    avg_amount,
+                    random.choice(categories),
+                    random.choice(levels),
+                    points,
+                    is_vip,
+                    random.choice(devices),
+                    random.choice(platforms),
+                    random.choice(provinces),
+                    f"{random.choice(provinces)} City",
+                    random.choice(age_groups),
+                    random.choice(genders),
+                )
+            )
+
+        cur.executemany(
+            """
+            INSERT INTO public.test_user_statistics (
+                user_id, username, email, register_date, last_login_date,
+                login_count, total_orders, total_amount, avg_order_amount,
+                favorite_category, user_level, points, is_vip,
+                device_type, platform, province, city, age_group, gender
+            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+            """,
+            records,
+        )
+        conn.commit()
+        logger.info(f"Inserted {num_records} user statistics records")
+
+
+def create_test_product_inventory_table(conn) -> None:
+    """创建商品库存表"""
+    logger.info("Creating test_product_inventory table...")
+
+    with conn.cursor() as cur:
+        cur.execute("""
+            CREATE TABLE public.test_product_inventory (
+                id SERIAL PRIMARY KEY,
+                sku VARCHAR(50) NOT NULL,
+                product_name VARCHAR(200) NOT NULL,
+                category VARCHAR(100),
+                brand VARCHAR(100),
+                supplier VARCHAR(200),
+                warehouse VARCHAR(100),
+                current_stock INTEGER DEFAULT 0,
+                safety_stock INTEGER DEFAULT 0,
+                max_stock INTEGER DEFAULT 0,
+                unit_cost DECIMAL(10, 2),
+                selling_price DECIMAL(10, 2),
+                stock_status VARCHAR(50),
+                last_inbound_date DATE,
+                last_outbound_date DATE,
+                inbound_quantity_30d INTEGER DEFAULT 0,
+                outbound_quantity_30d INTEGER DEFAULT 0,
+                turnover_rate DECIMAL(5, 2),
+                is_active BOOLEAN DEFAULT TRUE,
+                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+            )
+        """)
+        cur.execute(
+            "COMMENT ON TABLE public.test_product_inventory IS 'Product inventory table - test data'"
+        )
+        conn.commit()
+        logger.info("test_product_inventory table created")
+
+
+def insert_test_product_inventory(conn, num_records: int = 200) -> None:
+    """插入商品库存测试数据"""
+    logger.info(f"Inserting {num_records} product inventory records...")
+
+    products = [
+        ("iPhone 15 Pro", "Electronics", "Apple"),
+        ("MacBook Pro", "Computers", "Apple"),
+        ("AirPods Pro", "Accessories", "Apple"),
+        ("Huawei Mate 60", "Electronics", "Huawei"),
+        ("Xiaomi 14 Pro", "Electronics", "Xiaomi"),
+        ("Dyson Vacuum", "Home", "Dyson"),
+        ("Sony TV", "Home", "Sony"),
+        ("ThinkPad X1", "Computers", "Lenovo"),
+    ]
+
+    suppliers = ["Tech Co.", "Trade Inc.", "Electronics Ltd.", "Digital Corp."]
+    warehouses = ["Beijing WH", "Shanghai WH", "Guangzhou WH", "Chengdu WH"]
+
+    with conn.cursor() as cur:
+        records: list[tuple[Any, ...]] = []
+
+        for i in range(num_records):
+            product = random.choice(products)
+            sku = f"SKU{100000 + i}"
+            current_stock = random.randint(0, 1000)
+            safety_stock = random.randint(50, 200)
+            max_stock = random.randint(800, 2000)
+            unit_cost = round(random.uniform(10, 5000), 2)
+            selling_price = round(unit_cost * random.uniform(1.2, 2.0), 2)
+
+            if current_stock == 0:
+                stock_status = "Out of Stock"
+            elif current_stock < safety_stock:
+                stock_status = "Low Stock"
+            elif current_stock > max_stock * 0.9:
+                stock_status = "Overstocked"
+            else:
+                stock_status = "Normal"
+
+            last_inbound = datetime.now() - timedelta(days=random.randint(1, 60))
+            last_outbound = datetime.now() - timedelta(days=random.randint(1, 30))
+            inbound_30d = random.randint(0, 500)
+            outbound_30d = random.randint(0, 400)
+            turnover = min(round(outbound_30d / max(current_stock, 1) * 30, 2), 999.99)
+
+            records.append(
+                (
+                    sku,
+                    f"{product[0]} - Model {chr(65 + i % 26)}",
+                    product[1],
+                    product[2],
+                    random.choice(suppliers),
+                    random.choice(warehouses),
+                    current_stock,
+                    safety_stock,
+                    max_stock,
+                    unit_cost,
+                    selling_price,
+                    stock_status,
+                    last_inbound.date(),
+                    last_outbound.date(),
+                    inbound_30d,
+                    outbound_30d,
+                    turnover,
+                    random.choice([True, True, True, False]),
+                )
+            )
+
+        cur.executemany(
+            """
+            INSERT INTO public.test_product_inventory (
+                sku, product_name, category, brand, supplier, warehouse,
+                current_stock, safety_stock, max_stock, unit_cost, selling_price,
+                stock_status, last_inbound_date, last_outbound_date,
+                inbound_quantity_30d, outbound_quantity_30d, turnover_rate, is_active
+            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+            """,
+            records,
+        )
+        conn.commit()
+        logger.info(f"Inserted {num_records} product inventory records")
+
+
+def update_data_products_stats(conn) -> None:
+    """更新 data_products 表中的统计信息"""
+    logger.info("Updating data_products statistics...")
+
+    tables_info = [
+        ("test_sales_data", 17),
+        ("test_user_statistics", 22),
+        ("test_product_inventory", 21),
+    ]
+
+    with conn.cursor() as cur:
+        for table_name, column_count in tables_info:
+            cur.execute(f"SELECT COUNT(*) FROM public.{table_name}")
+            record_count = cur.fetchone()[0]
+
+            cur.execute(
+                """
+                UPDATE public.data_products
+                SET record_count = %s,
+                    column_count = %s,
+                    last_updated_at = CURRENT_TIMESTAMP,
+                    updated_at = CURRENT_TIMESTAMP,
+                    status = 'active'
+                WHERE target_table = %s AND target_schema = 'public'
+                """,
+                (record_count, column_count, table_name),
+            )
+            logger.info(
+                f"Updated {table_name}: records={record_count}, columns={column_count}"
+            )
+
+        conn.commit()
+        logger.info("Statistics update complete")
+
+
+def main() -> None:
+    """主函数"""
+    logger.info("=" * 60)
+    logger.info("Starting test data creation...")
+    logger.info("=" * 60)
+
+    try:
+        conn = get_connection()
+        logger.info("Database connected")
+
+        # 清理所有旧表
+        cleanup_all_test_tables(conn)
+
+        # 创建表和插入数据
+        create_test_sales_data_table(conn)
+        insert_test_sales_data(conn, num_records=500)
+
+        create_test_user_statistics_table(conn)
+        insert_test_user_statistics(conn, num_records=300)
+
+        create_test_product_inventory_table(conn)
+        insert_test_product_inventory(conn, num_records=200)
+
+        # 更新统计信息
+        update_data_products_stats(conn)
+
+        conn.close()
+
+        logger.info("=" * 60)
+        logger.info("All test data created successfully!")
+        logger.info("=" * 60)
+        logger.info("Created tables:")
+        logger.info("  1. test_sales_data (500 records)")
+        logger.info("  2. test_user_statistics (300 records)")
+        logger.info("  3. test_product_inventory (200 records)")
+        logger.info("=" * 60)
+
+    except Exception as e:
+        logger.error(f"Failed to create test data: {e}")
+        raise
+
+
+if __name__ == "__main__":
+    main()

+ 555 - 0
scripts/create_test_data_tables.py

@@ -0,0 +1,555 @@
+"""
+创建测试数据表脚本
+为 data_products 表中注册的数据产品创建对应的数据表,并填充测试数据
+"""
+
+from __future__ import annotations
+
+import random
+from datetime import datetime, timedelta
+from typing import Any
+
+import psycopg2
+from loguru import logger
+
+# 生产环境数据库配置
+DB_CONFIG = {
+    "host": "192.168.3.143",
+    "port": 5432,
+    "database": "dataops",
+    "user": "postgres",
+    "password": "dataOps",
+}
+
+
+def get_connection():
+    """获取数据库连接"""
+    return psycopg2.connect(**DB_CONFIG)
+
+
+def create_test_sales_data_table(conn) -> None:
+    """
+    创建销售数据分析表 test_sales_data
+    模拟电商销售数据
+    """
+    logger.info("创建 test_sales_data 表...")
+
+    with conn.cursor() as cur:
+        # 删除已存在的表
+        cur.execute("DROP TABLE IF EXISTS public.test_sales_data CASCADE")
+
+        # 创建表
+        cur.execute("""
+            CREATE TABLE public.test_sales_data (
+                id SERIAL PRIMARY KEY,
+                order_id VARCHAR(50) NOT NULL,
+                order_date DATE NOT NULL,
+                customer_id VARCHAR(50) NOT NULL,
+                customer_name VARCHAR(100),
+                product_id VARCHAR(50) NOT NULL,
+                product_name VARCHAR(200),
+                category VARCHAR(100),
+                quantity INTEGER NOT NULL,
+                unit_price DECIMAL(10, 2) NOT NULL,
+                total_amount DECIMAL(12, 2) NOT NULL,
+                discount_rate DECIMAL(5, 2) DEFAULT 0,
+                payment_method VARCHAR(50),
+                region VARCHAR(100),
+                city VARCHAR(100),
+                status VARCHAR(50) DEFAULT 'completed',
+                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+            )
+        """)
+
+        # 添加注释
+        cur.execute(
+            "COMMENT ON TABLE public.test_sales_data IS '销售数据分析表 - 测试数据'"
+        )
+
+        conn.commit()
+        logger.info("test_sales_data 表创建成功")
+
+
+def insert_test_sales_data(conn, num_records: int = 500) -> None:
+    """
+    插入销售测试数据
+
+    Args:
+        conn: 数据库连接
+        num_records: 要插入的记录数
+    """
+    logger.info(f"插入 {num_records} 条销售测试数据...")
+
+    # 模拟数据
+    customers = [
+        ("C001", "张三"),
+        ("C002", "李四"),
+        ("C003", "王五"),
+        ("C004", "赵六"),
+        ("C005", "钱七"),
+        ("C006", "孙八"),
+        ("C007", "周九"),
+        ("C008", "吴十"),
+        ("C009", "郑明"),
+        ("C010", "陈华"),
+    ]
+
+    products = [
+        ("P001", "iPhone 15 Pro", "手机数码", 7999.00),
+        ("P002", "MacBook Pro 14", "电脑办公", 14999.00),
+        ("P003", "AirPods Pro 2", "手机配件", 1899.00),
+        ("P004", "iPad Pro 12.9", "平板电脑", 8999.00),
+        ("P005", "Apple Watch S9", "智能穿戴", 3299.00),
+        ("P006", "戴森吸尘器 V15", "家用电器", 4990.00),
+        ("P007", "索尼降噪耳机", "音频设备", 2499.00),
+        ("P008", "小米电视 75寸", "家用电器", 5999.00),
+        ("P009", "华为 Mate 60 Pro", "手机数码", 6999.00),
+        ("P010", "联想ThinkPad X1", "电脑办公", 12999.00),
+    ]
+
+    regions = [
+        ("华东", ["上海", "杭州", "南京", "苏州", "无锡"]),
+        ("华北", ["北京", "天津", "石家庄", "太原", "济南"]),
+        ("华南", ["广州", "深圳", "东莞", "佛山", "珠海"]),
+        ("西南", ["成都", "重庆", "昆明", "贵阳", "西安"]),
+        ("华中", ["武汉", "长沙", "郑州", "合肥", "南昌"]),
+    ]
+
+    payment_methods = ["支付宝", "微信支付", "银行卡", "信用卡", "货到付款"]
+    statuses = ["completed", "completed", "completed", "pending", "cancelled"]
+
+    with conn.cursor() as cur:
+        records: list[tuple[Any, ...]] = []
+        base_date = datetime.now() - timedelta(days=180)
+
+        for i in range(num_records):
+            order_id = f"ORD{datetime.now().strftime('%Y%m%d')}{i + 1:05d}"
+            order_date = base_date + timedelta(days=random.randint(0, 180))
+            customer = random.choice(customers)
+            product = random.choice(products)
+            region_data = random.choice(regions)
+            quantity = random.randint(1, 5)
+            unit_price = product[3]
+            discount_rate = random.choice([0, 0, 0, 0.05, 0.10, 0.15, 0.20])
+            total_amount = round(quantity * unit_price * (1 - discount_rate), 2)
+
+            records.append(
+                (
+                    order_id,
+                    order_date.date(),
+                    customer[0],
+                    customer[1],
+                    product[0],
+                    product[1],
+                    product[2],
+                    quantity,
+                    unit_price,
+                    total_amount,
+                    discount_rate,
+                    random.choice(payment_methods),
+                    region_data[0],
+                    random.choice(region_data[1]),
+                    random.choice(statuses),
+                )
+            )
+
+        cur.executemany(
+            """
+            INSERT INTO public.test_sales_data (
+                order_id, order_date, customer_id, customer_name,
+                product_id, product_name, category, quantity,
+                unit_price, total_amount, discount_rate,
+                payment_method, region, city, status
+            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+            """,
+            records,
+        )
+
+        conn.commit()
+        logger.info(f"成功插入 {num_records} 条销售数据")
+
+
+def create_test_user_statistics_table(conn) -> None:
+    """
+    创建用户行为统计表 test_user_statistics
+    模拟用户活跃度和行为数据
+    """
+    logger.info("创建 test_user_statistics 表...")
+
+    with conn.cursor() as cur:
+        cur.execute("DROP TABLE IF EXISTS public.test_user_statistics CASCADE")
+
+        cur.execute("""
+            CREATE TABLE public.test_user_statistics (
+                id SERIAL PRIMARY KEY,
+                user_id VARCHAR(50) NOT NULL,
+                username VARCHAR(100),
+                email VARCHAR(200),
+                register_date DATE,
+                last_login_date TIMESTAMP,
+                login_count INTEGER DEFAULT 0,
+                total_orders INTEGER DEFAULT 0,
+                total_amount DECIMAL(12, 2) DEFAULT 0,
+                avg_order_amount DECIMAL(10, 2) DEFAULT 0,
+                favorite_category VARCHAR(100),
+                user_level VARCHAR(50),
+                points INTEGER DEFAULT 0,
+                is_vip BOOLEAN DEFAULT FALSE,
+                device_type VARCHAR(50),
+                platform VARCHAR(50),
+                province VARCHAR(100),
+                city VARCHAR(100),
+                age_group VARCHAR(50),
+                gender VARCHAR(20),
+                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+            )
+        """)
+
+        cur.execute(
+            "COMMENT ON TABLE public.test_user_statistics IS '用户行为统计表 - 测试数据'"
+        )
+
+        conn.commit()
+        logger.info("test_user_statistics 表创建成功")
+
+
+def insert_test_user_statistics(conn, num_records: int = 300) -> None:
+    """
+    插入用户统计测试数据
+
+    Args:
+        conn: 数据库连接
+        num_records: 要插入的记录数
+    """
+    logger.info(f"插入 {num_records} 条用户统计测试数据...")
+
+    names = [
+        "张伟",
+        "王芳",
+        "李娜",
+        "刘洋",
+        "陈明",
+        "杨静",
+        "赵强",
+        "黄丽",
+        "周杰",
+        "吴敏",
+        "徐涛",
+        "孙燕",
+        "马超",
+        "朱婷",
+        "胡磊",
+        "郭琳",
+        "林峰",
+        "何雪",
+        "高飞",
+        "梁慧",
+        "郑鹏",
+        "谢雨",
+        "韩冰",
+        "唐昊",
+    ]
+
+    categories = [
+        "手机数码",
+        "电脑办公",
+        "家用电器",
+        "服装鞋帽",
+        "美妆护肤",
+        "食品生鲜",
+    ]
+    levels = ["普通用户", "银牌会员", "金牌会员", "钻石会员", "至尊会员"]
+    devices = ["iOS", "Android", "Windows", "macOS", "Web"]
+    platforms = ["App", "小程序", "PC网页", "H5"]
+    provinces = ["北京", "上海", "广东", "浙江", "江苏", "四川", "湖北", "山东"]
+    age_groups = ["18-25", "26-35", "36-45", "46-55", "55+"]
+    genders = ["男", "女"]
+
+    with conn.cursor() as cur:
+        records: list[tuple[Any, ...]] = []
+        base_date = datetime.now() - timedelta(days=365)
+
+        for i in range(num_records):
+            user_id = f"U{100000 + i}"
+            name = random.choice(names)
+            register_date = base_date + timedelta(days=random.randint(0, 365))
+            last_login = register_date + timedelta(
+                days=random.randint(0, (datetime.now() - register_date).days)
+            )
+            login_count = random.randint(1, 500)
+            total_orders = random.randint(0, 100)
+            total_amount = round(random.uniform(0, 50000), 2) if total_orders > 0 else 0
+            avg_amount = (
+                round(total_amount / total_orders, 2) if total_orders > 0 else 0
+            )
+            points = random.randint(0, 10000)
+            is_vip = points > 5000
+
+            records.append(
+                (
+                    user_id,
+                    name,
+                    f"{user_id.lower()}@example.com",
+                    register_date.date(),
+                    last_login,
+                    login_count,
+                    total_orders,
+                    total_amount,
+                    avg_amount,
+                    random.choice(categories),
+                    random.choice(levels),
+                    points,
+                    is_vip,
+                    random.choice(devices),
+                    random.choice(platforms),
+                    random.choice(provinces),
+                    f"{random.choice(provinces)}市",
+                    random.choice(age_groups),
+                    random.choice(genders),
+                )
+            )
+
+        cur.executemany(
+            """
+            INSERT INTO public.test_user_statistics (
+                user_id, username, email, register_date, last_login_date,
+                login_count, total_orders, total_amount, avg_order_amount,
+                favorite_category, user_level, points, is_vip,
+                device_type, platform, province, city, age_group, gender
+            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+            """,
+            records,
+        )
+
+        conn.commit()
+        logger.info(f"成功插入 {num_records} 条用户统计数据")
+
+
+def create_test_product_inventory_table(conn) -> None:
+    """
+    创建商品库存表 test_product_inventory
+    模拟商品库存和进销存数据
+    """
+    logger.info("创建 test_product_inventory 表...")
+
+    with conn.cursor() as cur:
+        cur.execute("DROP TABLE IF EXISTS public.test_product_inventory CASCADE")
+
+        cur.execute("""
+            CREATE TABLE public.test_product_inventory (
+                id SERIAL PRIMARY KEY,
+                sku VARCHAR(50) NOT NULL,
+                product_name VARCHAR(200) NOT NULL,
+                category VARCHAR(100),
+                brand VARCHAR(100),
+                supplier VARCHAR(200),
+                warehouse VARCHAR(100),
+                current_stock INTEGER DEFAULT 0,
+                safety_stock INTEGER DEFAULT 0,
+                max_stock INTEGER DEFAULT 0,
+                unit_cost DECIMAL(10, 2),
+                selling_price DECIMAL(10, 2),
+                stock_status VARCHAR(50),
+                last_inbound_date DATE,
+                last_outbound_date DATE,
+                inbound_quantity_30d INTEGER DEFAULT 0,
+                outbound_quantity_30d INTEGER DEFAULT 0,
+                turnover_rate DECIMAL(5, 2),
+                is_active BOOLEAN DEFAULT TRUE,
+                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+            )
+        """)
+
+        cur.execute(
+            "COMMENT ON TABLE public.test_product_inventory IS '商品库存表 - 测试数据'"
+        )
+
+        conn.commit()
+        logger.info("test_product_inventory 表创建成功")
+
+
+def insert_test_product_inventory(conn, num_records: int = 200) -> None:
+    """
+    插入商品库存测试数据
+
+    Args:
+        conn: 数据库连接
+        num_records: 要插入的记录数
+    """
+    logger.info(f"插入 {num_records} 条商品库存测试数据...")
+
+    products = [
+        ("iPhone 15 Pro", "手机数码", "Apple"),
+        ("MacBook Pro", "电脑办公", "Apple"),
+        ("AirPods Pro", "手机配件", "Apple"),
+        ("华为Mate 60", "手机数码", "华为"),
+        ("小米14 Pro", "手机数码", "小米"),
+        ("戴森吸尘器", "家用电器", "戴森"),
+        ("索尼电视", "家用电器", "索尼"),
+        ("联想ThinkPad", "电脑办公", "联想"),
+        ("Nike运动鞋", "服装鞋帽", "Nike"),
+        ("Adidas外套", "服装鞋帽", "Adidas"),
+        ("雅诗兰黛精华", "美妆护肤", "雅诗兰黛"),
+        ("SK-II神仙水", "美妆护肤", "SK-II"),
+        ("海蓝之谜面霜", "美妆护肤", "海蓝之谜"),
+        ("飞利浦剃须刀", "个人护理", "飞利浦"),
+        ("松下电饭煲", "家用电器", "松下"),
+    ]
+
+    suppliers = [
+        "北京科技有限公司",
+        "上海贸易有限公司",
+        "广州电子有限公司",
+        "深圳数码有限公司",
+        "杭州商贸有限公司",
+    ]
+
+    warehouses = ["北京仓", "上海仓", "广州仓", "成都仓", "武汉仓"]
+
+    with conn.cursor() as cur:
+        records: list[tuple[Any, ...]] = []
+
+        for i in range(num_records):
+            product = random.choice(products)
+            sku = f"SKU{100000 + i}"
+            current_stock = random.randint(0, 1000)
+            safety_stock = random.randint(50, 200)
+            max_stock = random.randint(800, 2000)
+            unit_cost = round(random.uniform(10, 5000), 2)
+            selling_price = round(unit_cost * random.uniform(1.2, 2.0), 2)
+
+            if current_stock == 0:
+                stock_status = "缺货"
+            elif current_stock < safety_stock:
+                stock_status = "库存不足"
+            elif current_stock > max_stock * 0.9:
+                stock_status = "库存过剩"
+            else:
+                stock_status = "正常"
+
+            last_inbound = datetime.now() - timedelta(days=random.randint(1, 60))
+            last_outbound = datetime.now() - timedelta(days=random.randint(1, 30))
+            inbound_30d = random.randint(0, 500)
+            outbound_30d = random.randint(0, 400)
+            turnover = min(round(outbound_30d / max(current_stock, 1) * 30, 2), 999.99)
+
+            records.append(
+                (
+                    sku,
+                    f"{product[0]} - 型号{chr(65 + i % 26)}",
+                    product[1],
+                    product[2],
+                    random.choice(suppliers),
+                    random.choice(warehouses),
+                    current_stock,
+                    safety_stock,
+                    max_stock,
+                    unit_cost,
+                    selling_price,
+                    stock_status,
+                    last_inbound.date(),
+                    last_outbound.date(),
+                    inbound_30d,
+                    outbound_30d,
+                    turnover,
+                    random.choice([True, True, True, False]),
+                )
+            )
+
+        cur.executemany(
+            """
+            INSERT INTO public.test_product_inventory (
+                sku, product_name, category, brand, supplier, warehouse,
+                current_stock, safety_stock, max_stock, unit_cost, selling_price,
+                stock_status, last_inbound_date, last_outbound_date,
+                inbound_quantity_30d, outbound_quantity_30d, turnover_rate, is_active
+            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+            """,
+            records,
+        )
+
+        conn.commit()
+        logger.info(f"成功插入 {num_records} 条商品库存数据")
+
+
+def update_data_products_stats(conn) -> None:
+    """更新 data_products 表中的统计信息"""
+    logger.info("更新 data_products 表统计信息...")
+
+    tables_info = [
+        ("test_sales_data", 17),
+        ("test_user_statistics", 22),
+        ("test_product_inventory", 21),
+    ]
+
+    with conn.cursor() as cur:
+        for table_name, column_count in tables_info:
+            # 获取记录数
+            cur.execute(f"SELECT COUNT(*) FROM public.{table_name}")
+            record_count = cur.fetchone()[0]
+
+            # 更新 data_products 表
+            cur.execute(
+                """
+                UPDATE public.data_products
+                SET record_count = %s,
+                    column_count = %s,
+                    last_updated_at = CURRENT_TIMESTAMP,
+                    updated_at = CURRENT_TIMESTAMP,
+                    status = 'active'
+                WHERE target_table = %s AND target_schema = 'public'
+                """,
+                (record_count, column_count, table_name),
+            )
+
+            logger.info(
+                f"更新 {table_name}: record_count={record_count}, column_count={column_count}"
+            )
+
+        conn.commit()
+        logger.info("data_products 统计信息更新完成")
+
+
+def main() -> None:
+    """主函数"""
+    logger.info("=" * 60)
+    logger.info("开始创建测试数据表和数据...")
+    logger.info("=" * 60)
+
+    try:
+        conn = get_connection()
+        logger.info("数据库连接成功")
+
+        # 创建表和插入数据
+        create_test_sales_data_table(conn)
+        insert_test_sales_data(conn, num_records=500)
+
+        create_test_user_statistics_table(conn)
+        insert_test_user_statistics(conn, num_records=300)
+
+        create_test_product_inventory_table(conn)
+        insert_test_product_inventory(conn, num_records=200)
+
+        # 更新 data_products 统计信息
+        update_data_products_stats(conn)
+
+        conn.close()
+
+        logger.info("=" * 60)
+        logger.info("所有测试数据创建完成!")
+        logger.info("=" * 60)
+        logger.info("已创建以下测试表:")
+        logger.info("  1. test_sales_data (500条) - 销售数据分析")
+        logger.info("  2. test_user_statistics (300条) - 用户行为统计")
+        logger.info("  3. test_product_inventory (200条) - 商品库存")
+        logger.info("=" * 60)
+
+    except Exception as e:
+        logger.error(f"创建测试数据失败: {e}")
+        raise
+
+
+if __name__ == "__main__":
+    main()

+ 1 - 0
scripts/create_test_tables_direct.sql

@@ -59,3 +59,4 @@ CREATE TABLE test_product_inventory (
 
 COMMENT ON TABLE test_product_inventory IS '测试产品库存表';
 
+

+ 79 - 0
scripts/verify_test_data.py

@@ -0,0 +1,79 @@
+"""Verify test data script"""
+
+import psycopg2
+
+DB_CONFIG = {
+    "host": "192.168.3.143",
+    "port": 5432,
+    "database": "dataops",
+    "user": "postgres",
+    "password": "dataOps",
+}
+
+
+def main():
+    conn = psycopg2.connect(**DB_CONFIG)
+    cur = conn.cursor()
+
+    print("=" * 70)
+    print("Database Test Data Verification")
+    print("=" * 70)
+
+    # data_products
+    print("\n[data_products]")
+    cur.execute(
+        "SELECT id, product_name, target_table, record_count, column_count, status "
+        "FROM public.data_products ORDER BY id"
+    )
+    for row in cur.fetchall():
+        print(
+            f"  ID={row[0]}, name={row[1]}, table={row[2]}, "
+            f"records={row[3]}, columns={row[4]}, status={row[5]}"
+        )
+
+    # test_sales_data
+    print("\n[test_sales_data - first 5 rows]")
+    cur.execute(
+        "SELECT order_id, order_date, customer_name, product_name, quantity, total_amount "
+        "FROM public.test_sales_data LIMIT 5"
+    )
+    for row in cur.fetchall():
+        print(
+            f"  order={row[0]}, date={row[1]}, customer={row[2]}, "
+            f"product={row[3]}, qty={row[4]}, amount={row[5]}"
+        )
+
+    # test_user_statistics
+    print("\n[test_user_statistics - first 5 rows]")
+    cur.execute(
+        "SELECT user_id, username, login_count, total_orders, total_amount, user_level "
+        "FROM public.test_user_statistics LIMIT 5"
+    )
+    for row in cur.fetchall():
+        print(
+            f"  user={row[0]}, name={row[1]}, logins={row[2]}, "
+            f"orders={row[3]}, amount={row[4]}, level={row[5]}"
+        )
+
+    # test_product_inventory
+    print("\n[test_product_inventory - first 5 rows]")
+    cur.execute(
+        "SELECT sku, product_name, brand, current_stock, stock_status, selling_price "
+        "FROM public.test_product_inventory LIMIT 5"
+    )
+    for row in cur.fetchall():
+        print(
+            f"  sku={row[0]}, product={row[1]}, brand={row[2]}, "
+            f"stock={row[3]}, status={row[4]}, price={row[5]}"
+        )
+
+    print("\n" + "=" * 70)
+    print("Verification complete!")
+    print("=" * 70)
+
+    cur.close()
+    conn.close()
+
+
+if __name__ == "__main__":
+    main()