| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537 |
- """Parse HOPMs DOCX table definitions and prepare DataOps import payloads.
- The default mode is dry-run: parse the Word document, generate a JSON report,
- and do not call the DataOps platform.
- """
- from __future__ import annotations
- import argparse
- import json
- import re
- from pathlib import Path
- from typing import Any, Iterable
- import requests
- from docx import Document
- from docx.oxml.ns import qn
- from docx.table import Table
- from docx.text.paragraph import Paragraph
- FIELD_HEADER_ALIASES = {
- "字段中文名": "name_zh",
- "字段名": "name_en",
- "类型": "type",
- "字节": "length",
- "填报/要求": "requirement",
- "填报要求": "requirement",
- "说明": "comment",
- }
- DEFAULT_CATEGORY = "HOPMs标准数据集"
- TABLE_NAME_OVERRIDES = {
- "患者基本信息": "patient_demographics",
- }
- def _clean_text(value: Any) -> str:
- text = "" if value is None else str(value)
- text = text.replace("\u3000", " ")
- text = re.sub(r"\s+", " ", text)
- return text.strip()
- def _compact_header(value: str) -> str:
- return _clean_text(value).replace(" ", "").replace("/", "")
- def _iter_blocks(document: Document) -> Iterable[tuple[str, Any]]:
- for child in document.element.body.iterchildren():
- if child.tag == qn("w:p"):
- yield "paragraph", Paragraph(child, document)
- elif child.tag == qn("w:tbl"):
- yield "table", Table(child, document)
- def _table_rows(table: Table) -> list[list[str]]:
- return [
- [_clean_text(cell.text) for cell in row.cells]
- for row in table.rows
- ]
- def _header_map(header_row: list[str]) -> dict[str, int] | None:
- mapping: dict[str, int] = {}
- for idx, header in enumerate(header_row):
- normalized = _compact_header(header)
- field_name = FIELD_HEADER_ALIASES.get(normalized)
- if field_name:
- mapping[field_name] = idx
- required = {"name_zh", "name_en", "type"}
- if not required.issubset(mapping):
- return None
- return mapping
- def _is_field_definition_table(rows: list[list[str]]) -> bool:
- if not rows:
- return False
- return _header_map(rows[0]) is not None
- def _find_table_title(recent_paragraphs: list[str]) -> tuple[str, str, str]:
- """Return table Chinese name, English/code name, and description."""
- title_pattern = re.compile(r"(.+?)[((]\s*([A-Za-z][A-Za-z0-9_]*)\s*[))]")
- for idx in range(len(recent_paragraphs) - 1, -1, -1):
- paragraph = recent_paragraphs[idx]
- if paragraph in TABLE_NAME_OVERRIDES:
- description = ""
- if idx + 1 < len(recent_paragraphs):
- description = _clean_text(recent_paragraphs[idx + 1])
- return paragraph, TABLE_NAME_OVERRIDES[paragraph], description
- for idx in range(len(recent_paragraphs) - 1, -1, -1):
- paragraph = recent_paragraphs[idx]
- match = title_pattern.search(paragraph)
- if match:
- name_zh = _clean_text(match.group(1))
- name_en = _clean_text(match.group(2))
- description = ""
- if idx + 1 < len(recent_paragraphs):
- description = _clean_text(recent_paragraphs[idx + 1])
- return name_zh, name_en, description
- if recent_paragraphs:
- return _clean_text(recent_paragraphs[-1]), "", ""
- return "", "", ""
- def _normalize_data_type(raw_type: str, raw_length: str) -> str:
- data_type = _clean_text(raw_type).lower()
- length = _clean_text(raw_length)
- if "日期时间" in data_type or "datetime" in data_type or "timestamp" in data_type:
- return "timestamp"
- if data_type == "日期" or data_type == "date":
- return "date"
- if "时间" in data_type and "日期" not in data_type:
- return "time"
- if "整数" in data_type or data_type in {"int", "integer"}:
- return "integer"
- if "长整数" in data_type or data_type == "bigint":
- return "bigint"
- if "数字" in data_type or "数值" in data_type or "金额" in data_type:
- return "numeric"
- if "布尔" in data_type or data_type == "boolean":
- return "boolean"
- if "字符" in data_type or "string" in data_type or "varchar" in data_type:
- if length.isdigit() and int(length) > 0:
- return f"varchar({int(length)})"
- return "varchar(255)"
- if data_type:
- return data_type
- return "varchar(255)"
- def _is_primary_key(requirement: str, comment: str) -> bool:
- joined = f"{requirement} {comment}"
- return "主键" in joined
- def _nullable(requirement: str) -> bool:
- requirement = _clean_text(requirement)
- return "必填" not in requirement
- def _get_mapped_cell(row: list[str], mapping: dict[str, int], field: str) -> str:
- idx = mapping.get(field)
- if idx is None or idx >= len(row):
- return ""
- return _clean_text(row[idx])
- def _parse_columns(rows: list[list[str]], mapping: dict[str, int]) -> list[dict[str, Any]]:
- columns: list[dict[str, Any]] = []
- for row in rows[1:]:
- if not any(_clean_text(cell) for cell in row):
- continue
- name_zh = _get_mapped_cell(row, mapping, "name_zh")
- name_en = _get_mapped_cell(row, mapping, "name_en")
- raw_type = _get_mapped_cell(row, mapping, "type")
- length = _get_mapped_cell(row, mapping, "length")
- requirement = _get_mapped_cell(row, mapping, "requirement")
- comment = _get_mapped_cell(row, mapping, "comment")
- if not name_zh and not name_en:
- continue
- columns.append(
- {
- "name_zh": name_zh,
- "name_en": name_en,
- "data_type": _normalize_data_type(raw_type, length),
- "is_primary": _is_primary_key(requirement, comment),
- "nullable": _nullable(requirement),
- "comment": comment,
- "requirement": requirement,
- "length": length,
- }
- )
- return columns
- def parse_hopms_docx(docx_path: Path) -> list[dict[str, Any]]:
- document = Document(str(docx_path))
- recent_paragraphs: list[str] = []
- parsed_tables: list[dict[str, Any]] = []
- physical_table_index = 0
- for block_type, block in _iter_blocks(document):
- if block_type == "paragraph":
- paragraph_text = _clean_text(block.text)
- if paragraph_text:
- recent_paragraphs.append(paragraph_text)
- recent_paragraphs = recent_paragraphs[-8:]
- continue
- physical_table_index += 1
- rows = _table_rows(block)
- if not _is_field_definition_table(rows):
- continue
- mapping = _header_map(rows[0])
- if mapping is None:
- continue
- name_zh, name_en, description = _find_table_title(recent_paragraphs)
- columns = _parse_columns(rows, mapping)
- if not columns:
- continue
- if not name_en:
- name_en = f"HOPMS_TABLE_{len(parsed_tables) + 1:03d}"
- parsed_tables.append(
- {
- "source_table_index": physical_table_index,
- "table_info": {
- "name_zh": name_zh,
- "name_en": name_en,
- "description": description,
- },
- "columns": columns,
- }
- )
- return parsed_tables
- def _column_to_backend_meta(column: dict[str, Any]) -> dict[str, str]:
- return {
- "name_zh": _clean_text(column.get("name_zh")),
- "name_en": _clean_text(column.get("name_en")),
- "data_type": _clean_text(column.get("data_type")) or "varchar(255)",
- "describe": _clean_text(column.get("comment")),
- }
- def _table_to_backend_payload(table: dict[str, Any]) -> dict[str, Any]:
- info = table["table_info"]
- return {
- "name_zh": info["name_zh"],
- "name_en": info["name_en"],
- "describe": info.get("description", ""),
- "type": "table",
- "category": DEFAULT_CATEGORY,
- "parsed_data": [_column_to_backend_meta(col) for col in table["columns"]],
- }
- def build_dry_run_report(
- tables: list[dict[str, Any]],
- source_path: Path,
- ) -> dict[str, Any]:
- tables_missing_name_en = [
- table for table in tables
- if re.fullmatch(r"HOPMS_TABLE_\d{3}", table["table_info"].get("name_en", ""))
- ]
- empty_column_names = [
- {
- "table": table["table_info"]["name_en"],
- "source_table_index": table["source_table_index"],
- "column_index": idx + 1,
- "column": column,
- }
- for table in tables
- for idx, column in enumerate(table["columns"])
- if not column.get("name_zh") or not column.get("name_en")
- ]
- report_tables = []
- for table in tables:
- report_tables.append(
- {
- **table,
- "backend_payload": _table_to_backend_payload(table),
- }
- )
- return {
- "mode": "dry-run",
- "source": str(source_path),
- "summary": {
- "table_count": len(tables),
- "column_count": sum(len(table["columns"]) for table in tables),
- "tables_missing_name_en": len(tables_missing_name_en),
- "columns_missing_name": len(empty_column_names),
- },
- "warnings": {
- "tables_missing_name_en": [
- {
- "source_table_index": table["source_table_index"],
- "name_zh": table["table_info"]["name_zh"],
- "generated_name_en": table["table_info"]["name_en"],
- }
- for table in tables_missing_name_en
- ],
- "columns_missing_name": empty_column_names,
- },
- "tables": report_tables,
- }
- def _api_url(base_url: str, path: str) -> str:
- return f"{base_url.rstrip('/')}/{path.lstrip('/')}"
- def _response_json(response: Any) -> dict[str, Any]:
- response.raise_for_status()
- data = response.json()
- if not isinstance(data, dict):
- raise ValueError(f"API returned non-object JSON: {data!r}")
- return data
- def _ensure_success(data: dict[str, Any], operation: str) -> None:
- if data.get("code") != 200:
- message = data.get("message") or data.get("error") or data
- raise RuntimeError(f"{operation} failed: {message}")
- def login_dataops(
- session: Any,
- base_url: str,
- username: str,
- password: str,
- timeout: int = 30,
- ) -> dict[str, Any]:
- response = session.post(
- _api_url(base_url, "/api/system/auth/login"),
- json={"username": username, "password": password},
- timeout=timeout,
- )
- data = _response_json(response)
- _ensure_success(data, "login")
- return data
- def find_existing_domain_id(
- session: Any,
- base_url: str,
- name_en: str,
- timeout: int = 60,
- ) -> int | None:
- response = session.post(
- _api_url(base_url, "/api/bd/list"),
- json={"current": 1, "size": 10, "name_en": name_en},
- timeout=timeout,
- )
- data = _response_json(response)
- _ensure_success(data, "find existing business domain")
- records = ((data.get("data") or {}).get("records") or [])
- for record in records:
- if record.get("name_en") == name_en and record.get("id") is not None:
- return int(record["id"])
- return None
- def import_tables(
- report_tables: list[dict[str, Any]],
- base_url: str,
- username: str = "",
- password: str = "",
- session: Any | None = None,
- timeout: int = 60,
- skip_login: bool = True,
- update_existing: bool = False,
- ) -> dict[str, Any]:
- http = session or requests.Session()
- if not skip_login:
- login_dataops(http, base_url, username, password, timeout=timeout)
- results: list[dict[str, Any]] = []
- for index, table in enumerate(report_tables, start=1):
- payload = table["backend_payload"]
- table_info = table["table_info"]
- result_item = {
- "index": index,
- "source_table_index": table.get("source_table_index"),
- "name_zh": table_info.get("name_zh"),
- "name_en": table_info.get("name_en"),
- "column_count": len(table.get("columns") or []),
- "status": "pending",
- }
- try:
- payload = dict(payload)
- existing_id = find_existing_domain_id(
- http,
- base_url,
- str(payload.get("name_en") or ""),
- timeout=timeout,
- )
- if existing_id is not None:
- result_item["existing_id"] = existing_id
- if not update_existing:
- result_item["operation"] = "skip_existing"
- result_item["status"] = "skipped"
- results.append(result_item)
- continue
- payload["id"] = existing_id
- result_item["operation"] = "update"
- endpoint = "/api/bd/update"
- else:
- result_item["operation"] = "create"
- endpoint = "/api/bd/save"
- response = http.post(
- _api_url(base_url, endpoint),
- json=payload,
- timeout=timeout,
- )
- data = _response_json(response)
- _ensure_success(data, "save business domain")
- result_item["status"] = "success"
- result_item["response"] = data.get("data")
- except Exception as exc: # noqa: BLE001 - keep batch import moving.
- result_item["status"] = "failed"
- result_item["error"] = str(exc)
- results.append(result_item)
- succeeded = sum(1 for item in results if item["status"] == "success")
- failed = sum(1 for item in results if item["status"] == "failed")
- skipped = sum(1 for item in results if item["status"] == "skipped")
- return {
- "mode": "commit",
- "base_url": base_url.rstrip("/"),
- "summary": {
- "attempted": len(results),
- "succeeded": succeeded,
- "failed": failed,
- "skipped": skipped,
- },
- "results": results,
- }
- def _find_default_docx() -> Path:
- matches = [
- path for path in Path("docs").glob("HOPMs*.docx")
- if not path.name.startswith("~$")
- ]
- if not matches:
- raise FileNotFoundError("未在 docs 目录找到 HOPMs*.docx")
- return matches[0]
- def _write_report(report: dict[str, Any], output_path: Path) -> None:
- output_path.parent.mkdir(parents=True, exist_ok=True)
- output_path.write_text(
- json.dumps(report, ensure_ascii=False, indent=2),
- encoding="utf-8",
- )
- def main() -> int:
- parser = argparse.ArgumentParser(
- description="Parse HOPMs DOCX definitions and generate a DataOps dry-run report."
- )
- parser.add_argument("--docx", type=Path, default=None, help="HOPMs DOCX path")
- parser.add_argument(
- "--output",
- type=Path,
- default=Path("docs/generated/hopms_dry_run.json"),
- help="Dry-run JSON output path",
- )
- parser.add_argument("--limit", type=int, default=0, help="Only include first N tables")
- parser.add_argument(
- "--commit",
- action="store_true",
- help="Submit parsed table definitions to DataOps.",
- )
- parser.add_argument(
- "--base-url",
- default="https://company.citupro.com:18183",
- help="DataOps platform base URL",
- )
- parser.add_argument("--username", default=None, help="DataOps username")
- parser.add_argument("--password", default=None, help="DataOps password")
- parser.add_argument(
- "--login",
- action="store_true",
- help="Validate username/password before importing. Default is direct API import.",
- )
- parser.add_argument(
- "--update-existing",
- action="store_true",
- help="Call /api/bd/update for existing domains. Default is to skip existing domains.",
- )
- parser.add_argument(
- "--import-output",
- type=Path,
- default=Path("docs/generated/hopms_import_result.json"),
- help="Commit-mode import result JSON output path",
- )
- args = parser.parse_args()
- docx_path = args.docx or _find_default_docx()
- tables = parse_hopms_docx(docx_path)
- if args.limit and args.limit > 0:
- tables = tables[: args.limit]
- report = build_dry_run_report(tables, source_path=docx_path)
- _write_report(report, args.output)
- summary = report["summary"]
- print(f"Dry-run report written: {args.output}")
- print(f"Tables: {summary['table_count']}")
- print(f"Columns: {summary['column_count']}")
- print(f"Generated table names: {summary['tables_missing_name_en']}")
- print(f"Columns missing name: {summary['columns_missing_name']}")
- if args.commit:
- if args.login and (not args.username or not args.password):
- raise SystemExit("--commit --login requires --username and --password")
- import_result = import_tables(
- report["tables"],
- base_url=args.base_url,
- username=args.username or "",
- password=args.password or "",
- skip_login=not args.login,
- update_existing=args.update_existing,
- )
- _write_report(import_result, args.import_output)
- import_summary = import_result["summary"]
- print(f"Import result written: {args.import_output}")
- print(f"Attempted: {import_summary['attempted']}")
- print(f"Succeeded: {import_summary['succeeded']}")
- print(f"Skipped: {import_summary['skipped']}")
- print(f"Failed: {import_summary['failed']}")
- if import_summary["failed"]:
- return 1
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|