"""Parse HOPMs DOCX table definitions and prepare DataOps import payloads. The default mode is dry-run: parse the Word document, generate a JSON report, and do not call the DataOps platform. """ from __future__ import annotations import argparse import json import re from pathlib import Path from typing import Any, Iterable import requests from docx import Document from docx.oxml.ns import qn from docx.table import Table from docx.text.paragraph import Paragraph FIELD_HEADER_ALIASES = { "字段中文名": "name_zh", "字段名": "name_en", "类型": "type", "字节": "length", "填报/要求": "requirement", "填报要求": "requirement", "说明": "comment", } DEFAULT_CATEGORY = "HOPMs标准数据集" TABLE_NAME_OVERRIDES = { "患者基本信息": "patient_demographics", } def _clean_text(value: Any) -> str: text = "" if value is None else str(value) text = text.replace("\u3000", " ") text = re.sub(r"\s+", " ", text) return text.strip() def _compact_header(value: str) -> str: return _clean_text(value).replace(" ", "").replace("/", "") def _iter_blocks(document: Document) -> Iterable[tuple[str, Any]]: for child in document.element.body.iterchildren(): if child.tag == qn("w:p"): yield "paragraph", Paragraph(child, document) elif child.tag == qn("w:tbl"): yield "table", Table(child, document) def _table_rows(table: Table) -> list[list[str]]: return [ [_clean_text(cell.text) for cell in row.cells] for row in table.rows ] def _header_map(header_row: list[str]) -> dict[str, int] | None: mapping: dict[str, int] = {} for idx, header in enumerate(header_row): normalized = _compact_header(header) field_name = FIELD_HEADER_ALIASES.get(normalized) if field_name: mapping[field_name] = idx required = {"name_zh", "name_en", "type"} if not required.issubset(mapping): return None return mapping def _is_field_definition_table(rows: list[list[str]]) -> bool: if not rows: return False return _header_map(rows[0]) is not None def _find_table_title(recent_paragraphs: list[str]) -> tuple[str, str, str]: """Return table Chinese name, English/code name, and description.""" title_pattern = re.compile(r"(.+?)[((]\s*([A-Za-z][A-Za-z0-9_]*)\s*[))]") for idx in range(len(recent_paragraphs) - 1, -1, -1): paragraph = recent_paragraphs[idx] if paragraph in TABLE_NAME_OVERRIDES: description = "" if idx + 1 < len(recent_paragraphs): description = _clean_text(recent_paragraphs[idx + 1]) return paragraph, TABLE_NAME_OVERRIDES[paragraph], description for idx in range(len(recent_paragraphs) - 1, -1, -1): paragraph = recent_paragraphs[idx] match = title_pattern.search(paragraph) if match: name_zh = _clean_text(match.group(1)) name_en = _clean_text(match.group(2)) description = "" if idx + 1 < len(recent_paragraphs): description = _clean_text(recent_paragraphs[idx + 1]) return name_zh, name_en, description if recent_paragraphs: return _clean_text(recent_paragraphs[-1]), "", "" return "", "", "" def _normalize_data_type(raw_type: str, raw_length: str) -> str: data_type = _clean_text(raw_type).lower() length = _clean_text(raw_length) if "日期时间" in data_type or "datetime" in data_type or "timestamp" in data_type: return "timestamp" if data_type == "日期" or data_type == "date": return "date" if "时间" in data_type and "日期" not in data_type: return "time" if "整数" in data_type or data_type in {"int", "integer"}: return "integer" if "长整数" in data_type or data_type == "bigint": return "bigint" if "数字" in data_type or "数值" in data_type or "金额" in data_type: return "numeric" if "布尔" in data_type or data_type == "boolean": return "boolean" if "字符" in data_type or "string" in data_type or "varchar" in data_type: if length.isdigit() and int(length) > 0: return f"varchar({int(length)})" return "varchar(255)" if data_type: return data_type return "varchar(255)" def _is_primary_key(requirement: str, comment: str) -> bool: joined = f"{requirement} {comment}" return "主键" in joined def _nullable(requirement: str) -> bool: requirement = _clean_text(requirement) return "必填" not in requirement def _get_mapped_cell(row: list[str], mapping: dict[str, int], field: str) -> str: idx = mapping.get(field) if idx is None or idx >= len(row): return "" return _clean_text(row[idx]) def _parse_columns(rows: list[list[str]], mapping: dict[str, int]) -> list[dict[str, Any]]: columns: list[dict[str, Any]] = [] for row in rows[1:]: if not any(_clean_text(cell) for cell in row): continue name_zh = _get_mapped_cell(row, mapping, "name_zh") name_en = _get_mapped_cell(row, mapping, "name_en") raw_type = _get_mapped_cell(row, mapping, "type") length = _get_mapped_cell(row, mapping, "length") requirement = _get_mapped_cell(row, mapping, "requirement") comment = _get_mapped_cell(row, mapping, "comment") if not name_zh and not name_en: continue columns.append( { "name_zh": name_zh, "name_en": name_en, "data_type": _normalize_data_type(raw_type, length), "is_primary": _is_primary_key(requirement, comment), "nullable": _nullable(requirement), "comment": comment, "requirement": requirement, "length": length, } ) return columns def parse_hopms_docx(docx_path: Path) -> list[dict[str, Any]]: document = Document(str(docx_path)) recent_paragraphs: list[str] = [] parsed_tables: list[dict[str, Any]] = [] physical_table_index = 0 for block_type, block in _iter_blocks(document): if block_type == "paragraph": paragraph_text = _clean_text(block.text) if paragraph_text: recent_paragraphs.append(paragraph_text) recent_paragraphs = recent_paragraphs[-8:] continue physical_table_index += 1 rows = _table_rows(block) if not _is_field_definition_table(rows): continue mapping = _header_map(rows[0]) if mapping is None: continue name_zh, name_en, description = _find_table_title(recent_paragraphs) columns = _parse_columns(rows, mapping) if not columns: continue if not name_en: name_en = f"HOPMS_TABLE_{len(parsed_tables) + 1:03d}" parsed_tables.append( { "source_table_index": physical_table_index, "table_info": { "name_zh": name_zh, "name_en": name_en, "description": description, }, "columns": columns, } ) return parsed_tables def _column_to_backend_meta(column: dict[str, Any]) -> dict[str, str]: return { "name_zh": _clean_text(column.get("name_zh")), "name_en": _clean_text(column.get("name_en")), "data_type": _clean_text(column.get("data_type")) or "varchar(255)", "describe": _clean_text(column.get("comment")), } def _table_to_backend_payload(table: dict[str, Any]) -> dict[str, Any]: info = table["table_info"] return { "name_zh": info["name_zh"], "name_en": info["name_en"], "describe": info.get("description", ""), "type": "table", "category": DEFAULT_CATEGORY, "parsed_data": [_column_to_backend_meta(col) for col in table["columns"]], } def build_dry_run_report( tables: list[dict[str, Any]], source_path: Path, ) -> dict[str, Any]: tables_missing_name_en = [ table for table in tables if re.fullmatch(r"HOPMS_TABLE_\d{3}", table["table_info"].get("name_en", "")) ] empty_column_names = [ { "table": table["table_info"]["name_en"], "source_table_index": table["source_table_index"], "column_index": idx + 1, "column": column, } for table in tables for idx, column in enumerate(table["columns"]) if not column.get("name_zh") or not column.get("name_en") ] report_tables = [] for table in tables: report_tables.append( { **table, "backend_payload": _table_to_backend_payload(table), } ) return { "mode": "dry-run", "source": str(source_path), "summary": { "table_count": len(tables), "column_count": sum(len(table["columns"]) for table in tables), "tables_missing_name_en": len(tables_missing_name_en), "columns_missing_name": len(empty_column_names), }, "warnings": { "tables_missing_name_en": [ { "source_table_index": table["source_table_index"], "name_zh": table["table_info"]["name_zh"], "generated_name_en": table["table_info"]["name_en"], } for table in tables_missing_name_en ], "columns_missing_name": empty_column_names, }, "tables": report_tables, } def _api_url(base_url: str, path: str) -> str: return f"{base_url.rstrip('/')}/{path.lstrip('/')}" def _response_json(response: Any) -> dict[str, Any]: response.raise_for_status() data = response.json() if not isinstance(data, dict): raise ValueError(f"API returned non-object JSON: {data!r}") return data def _ensure_success(data: dict[str, Any], operation: str) -> None: if data.get("code") != 200: message = data.get("message") or data.get("error") or data raise RuntimeError(f"{operation} failed: {message}") def login_dataops( session: Any, base_url: str, username: str, password: str, timeout: int = 30, ) -> dict[str, Any]: response = session.post( _api_url(base_url, "/api/system/auth/login"), json={"username": username, "password": password}, timeout=timeout, ) data = _response_json(response) _ensure_success(data, "login") return data def find_existing_domain_id( session: Any, base_url: str, name_en: str, timeout: int = 60, ) -> int | None: response = session.post( _api_url(base_url, "/api/bd/list"), json={"current": 1, "size": 10, "name_en": name_en}, timeout=timeout, ) data = _response_json(response) _ensure_success(data, "find existing business domain") records = ((data.get("data") or {}).get("records") or []) for record in records: if record.get("name_en") == name_en and record.get("id") is not None: return int(record["id"]) return None def import_tables( report_tables: list[dict[str, Any]], base_url: str, username: str = "", password: str = "", session: Any | None = None, timeout: int = 60, skip_login: bool = True, update_existing: bool = False, ) -> dict[str, Any]: http = session or requests.Session() if not skip_login: login_dataops(http, base_url, username, password, timeout=timeout) results: list[dict[str, Any]] = [] for index, table in enumerate(report_tables, start=1): payload = table["backend_payload"] table_info = table["table_info"] result_item = { "index": index, "source_table_index": table.get("source_table_index"), "name_zh": table_info.get("name_zh"), "name_en": table_info.get("name_en"), "column_count": len(table.get("columns") or []), "status": "pending", } try: payload = dict(payload) existing_id = find_existing_domain_id( http, base_url, str(payload.get("name_en") or ""), timeout=timeout, ) if existing_id is not None: result_item["existing_id"] = existing_id if not update_existing: result_item["operation"] = "skip_existing" result_item["status"] = "skipped" results.append(result_item) continue payload["id"] = existing_id result_item["operation"] = "update" endpoint = "/api/bd/update" else: result_item["operation"] = "create" endpoint = "/api/bd/save" response = http.post( _api_url(base_url, endpoint), json=payload, timeout=timeout, ) data = _response_json(response) _ensure_success(data, "save business domain") result_item["status"] = "success" result_item["response"] = data.get("data") except Exception as exc: # noqa: BLE001 - keep batch import moving. result_item["status"] = "failed" result_item["error"] = str(exc) results.append(result_item) succeeded = sum(1 for item in results if item["status"] == "success") failed = sum(1 for item in results if item["status"] == "failed") skipped = sum(1 for item in results if item["status"] == "skipped") return { "mode": "commit", "base_url": base_url.rstrip("/"), "summary": { "attempted": len(results), "succeeded": succeeded, "failed": failed, "skipped": skipped, }, "results": results, } def _find_default_docx() -> Path: matches = [ path for path in Path("docs").glob("HOPMs*.docx") if not path.name.startswith("~$") ] if not matches: raise FileNotFoundError("未在 docs 目录找到 HOPMs*.docx") return matches[0] def _write_report(report: dict[str, Any], output_path: Path) -> None: output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text( json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8", ) def main() -> int: parser = argparse.ArgumentParser( description="Parse HOPMs DOCX definitions and generate a DataOps dry-run report." ) parser.add_argument("--docx", type=Path, default=None, help="HOPMs DOCX path") parser.add_argument( "--output", type=Path, default=Path("docs/generated/hopms_dry_run.json"), help="Dry-run JSON output path", ) parser.add_argument("--limit", type=int, default=0, help="Only include first N tables") parser.add_argument( "--commit", action="store_true", help="Submit parsed table definitions to DataOps.", ) parser.add_argument( "--base-url", default="https://company.citupro.com:18183", help="DataOps platform base URL", ) parser.add_argument("--username", default=None, help="DataOps username") parser.add_argument("--password", default=None, help="DataOps password") parser.add_argument( "--login", action="store_true", help="Validate username/password before importing. Default is direct API import.", ) parser.add_argument( "--update-existing", action="store_true", help="Call /api/bd/update for existing domains. Default is to skip existing domains.", ) parser.add_argument( "--import-output", type=Path, default=Path("docs/generated/hopms_import_result.json"), help="Commit-mode import result JSON output path", ) args = parser.parse_args() docx_path = args.docx or _find_default_docx() tables = parse_hopms_docx(docx_path) if args.limit and args.limit > 0: tables = tables[: args.limit] report = build_dry_run_report(tables, source_path=docx_path) _write_report(report, args.output) summary = report["summary"] print(f"Dry-run report written: {args.output}") print(f"Tables: {summary['table_count']}") print(f"Columns: {summary['column_count']}") print(f"Generated table names: {summary['tables_missing_name_en']}") print(f"Columns missing name: {summary['columns_missing_name']}") if args.commit: if args.login and (not args.username or not args.password): raise SystemExit("--commit --login requires --username and --password") import_result = import_tables( report["tables"], base_url=args.base_url, username=args.username or "", password=args.password or "", skip_login=not args.login, update_existing=args.update_existing, ) _write_report(import_result, args.import_output) import_summary = import_result["summary"] print(f"Import result written: {args.import_output}") print(f"Attempted: {import_summary['attempted']}") print(f"Succeeded: {import_summary['succeeded']}") print(f"Skipped: {import_summary['skipped']}") print(f"Failed: {import_summary['failed']}") if import_summary["failed"]: return 1 return 0 if __name__ == "__main__": raise SystemExit(main())