import_hopms_dataset.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. """Parse HOPMs DOCX table definitions and prepare DataOps import payloads.
  2. The default mode is dry-run: parse the Word document, generate a JSON report,
  3. and do not call the DataOps platform.
  4. """
  5. from __future__ import annotations
  6. import argparse
  7. import json
  8. import re
  9. from pathlib import Path
  10. from typing import Any, Iterable
  11. import requests
  12. from docx import Document
  13. from docx.oxml.ns import qn
  14. from docx.table import Table
  15. from docx.text.paragraph import Paragraph
  16. FIELD_HEADER_ALIASES = {
  17. "字段中文名": "name_zh",
  18. "字段名": "name_en",
  19. "类型": "type",
  20. "字节": "length",
  21. "填报/要求": "requirement",
  22. "填报要求": "requirement",
  23. "说明": "comment",
  24. }
  25. DEFAULT_CATEGORY = "HOPMs标准数据集"
  26. TABLE_NAME_OVERRIDES = {
  27. "患者基本信息": "patient_demographics",
  28. }
  29. def _clean_text(value: Any) -> str:
  30. text = "" if value is None else str(value)
  31. text = text.replace("\u3000", " ")
  32. text = re.sub(r"\s+", " ", text)
  33. return text.strip()
  34. def _compact_header(value: str) -> str:
  35. return _clean_text(value).replace(" ", "").replace("/", "")
  36. def _iter_blocks(document: Document) -> Iterable[tuple[str, Any]]:
  37. for child in document.element.body.iterchildren():
  38. if child.tag == qn("w:p"):
  39. yield "paragraph", Paragraph(child, document)
  40. elif child.tag == qn("w:tbl"):
  41. yield "table", Table(child, document)
  42. def _table_rows(table: Table) -> list[list[str]]:
  43. return [
  44. [_clean_text(cell.text) for cell in row.cells]
  45. for row in table.rows
  46. ]
  47. def _header_map(header_row: list[str]) -> dict[str, int] | None:
  48. mapping: dict[str, int] = {}
  49. for idx, header in enumerate(header_row):
  50. normalized = _compact_header(header)
  51. field_name = FIELD_HEADER_ALIASES.get(normalized)
  52. if field_name:
  53. mapping[field_name] = idx
  54. required = {"name_zh", "name_en", "type"}
  55. if not required.issubset(mapping):
  56. return None
  57. return mapping
  58. def _is_field_definition_table(rows: list[list[str]]) -> bool:
  59. if not rows:
  60. return False
  61. return _header_map(rows[0]) is not None
  62. def _find_table_title(recent_paragraphs: list[str]) -> tuple[str, str, str]:
  63. """Return table Chinese name, English/code name, and description."""
  64. title_pattern = re.compile(r"(.+?)[((]\s*([A-Za-z][A-Za-z0-9_]*)\s*[))]")
  65. for idx in range(len(recent_paragraphs) - 1, -1, -1):
  66. paragraph = recent_paragraphs[idx]
  67. if paragraph in TABLE_NAME_OVERRIDES:
  68. description = ""
  69. if idx + 1 < len(recent_paragraphs):
  70. description = _clean_text(recent_paragraphs[idx + 1])
  71. return paragraph, TABLE_NAME_OVERRIDES[paragraph], description
  72. for idx in range(len(recent_paragraphs) - 1, -1, -1):
  73. paragraph = recent_paragraphs[idx]
  74. match = title_pattern.search(paragraph)
  75. if match:
  76. name_zh = _clean_text(match.group(1))
  77. name_en = _clean_text(match.group(2))
  78. description = ""
  79. if idx + 1 < len(recent_paragraphs):
  80. description = _clean_text(recent_paragraphs[idx + 1])
  81. return name_zh, name_en, description
  82. if recent_paragraphs:
  83. return _clean_text(recent_paragraphs[-1]), "", ""
  84. return "", "", ""
  85. def _normalize_data_type(raw_type: str, raw_length: str) -> str:
  86. data_type = _clean_text(raw_type).lower()
  87. length = _clean_text(raw_length)
  88. if "日期时间" in data_type or "datetime" in data_type or "timestamp" in data_type:
  89. return "timestamp"
  90. if data_type == "日期" or data_type == "date":
  91. return "date"
  92. if "时间" in data_type and "日期" not in data_type:
  93. return "time"
  94. if "整数" in data_type or data_type in {"int", "integer"}:
  95. return "integer"
  96. if "长整数" in data_type or data_type == "bigint":
  97. return "bigint"
  98. if "数字" in data_type or "数值" in data_type or "金额" in data_type:
  99. return "numeric"
  100. if "布尔" in data_type or data_type == "boolean":
  101. return "boolean"
  102. if "字符" in data_type or "string" in data_type or "varchar" in data_type:
  103. if length.isdigit() and int(length) > 0:
  104. return f"varchar({int(length)})"
  105. return "varchar(255)"
  106. if data_type:
  107. return data_type
  108. return "varchar(255)"
  109. def _is_primary_key(requirement: str, comment: str) -> bool:
  110. joined = f"{requirement} {comment}"
  111. return "主键" in joined
  112. def _nullable(requirement: str) -> bool:
  113. requirement = _clean_text(requirement)
  114. return "必填" not in requirement
  115. def _get_mapped_cell(row: list[str], mapping: dict[str, int], field: str) -> str:
  116. idx = mapping.get(field)
  117. if idx is None or idx >= len(row):
  118. return ""
  119. return _clean_text(row[idx])
  120. def _parse_columns(rows: list[list[str]], mapping: dict[str, int]) -> list[dict[str, Any]]:
  121. columns: list[dict[str, Any]] = []
  122. for row in rows[1:]:
  123. if not any(_clean_text(cell) for cell in row):
  124. continue
  125. name_zh = _get_mapped_cell(row, mapping, "name_zh")
  126. name_en = _get_mapped_cell(row, mapping, "name_en")
  127. raw_type = _get_mapped_cell(row, mapping, "type")
  128. length = _get_mapped_cell(row, mapping, "length")
  129. requirement = _get_mapped_cell(row, mapping, "requirement")
  130. comment = _get_mapped_cell(row, mapping, "comment")
  131. if not name_zh and not name_en:
  132. continue
  133. columns.append(
  134. {
  135. "name_zh": name_zh,
  136. "name_en": name_en,
  137. "data_type": _normalize_data_type(raw_type, length),
  138. "is_primary": _is_primary_key(requirement, comment),
  139. "nullable": _nullable(requirement),
  140. "comment": comment,
  141. "requirement": requirement,
  142. "length": length,
  143. }
  144. )
  145. return columns
  146. def parse_hopms_docx(docx_path: Path) -> list[dict[str, Any]]:
  147. document = Document(str(docx_path))
  148. recent_paragraphs: list[str] = []
  149. parsed_tables: list[dict[str, Any]] = []
  150. physical_table_index = 0
  151. for block_type, block in _iter_blocks(document):
  152. if block_type == "paragraph":
  153. paragraph_text = _clean_text(block.text)
  154. if paragraph_text:
  155. recent_paragraphs.append(paragraph_text)
  156. recent_paragraphs = recent_paragraphs[-8:]
  157. continue
  158. physical_table_index += 1
  159. rows = _table_rows(block)
  160. if not _is_field_definition_table(rows):
  161. continue
  162. mapping = _header_map(rows[0])
  163. if mapping is None:
  164. continue
  165. name_zh, name_en, description = _find_table_title(recent_paragraphs)
  166. columns = _parse_columns(rows, mapping)
  167. if not columns:
  168. continue
  169. if not name_en:
  170. name_en = f"HOPMS_TABLE_{len(parsed_tables) + 1:03d}"
  171. parsed_tables.append(
  172. {
  173. "source_table_index": physical_table_index,
  174. "table_info": {
  175. "name_zh": name_zh,
  176. "name_en": name_en,
  177. "description": description,
  178. },
  179. "columns": columns,
  180. }
  181. )
  182. return parsed_tables
  183. def _column_to_backend_meta(column: dict[str, Any]) -> dict[str, str]:
  184. return {
  185. "name_zh": _clean_text(column.get("name_zh")),
  186. "name_en": _clean_text(column.get("name_en")),
  187. "data_type": _clean_text(column.get("data_type")) or "varchar(255)",
  188. "describe": _clean_text(column.get("comment")),
  189. }
  190. def _table_to_backend_payload(table: dict[str, Any]) -> dict[str, Any]:
  191. info = table["table_info"]
  192. return {
  193. "name_zh": info["name_zh"],
  194. "name_en": info["name_en"],
  195. "describe": info.get("description", ""),
  196. "type": "table",
  197. "category": DEFAULT_CATEGORY,
  198. "parsed_data": [_column_to_backend_meta(col) for col in table["columns"]],
  199. }
  200. def build_dry_run_report(
  201. tables: list[dict[str, Any]],
  202. source_path: Path,
  203. ) -> dict[str, Any]:
  204. tables_missing_name_en = [
  205. table for table in tables
  206. if re.fullmatch(r"HOPMS_TABLE_\d{3}", table["table_info"].get("name_en", ""))
  207. ]
  208. empty_column_names = [
  209. {
  210. "table": table["table_info"]["name_en"],
  211. "source_table_index": table["source_table_index"],
  212. "column_index": idx + 1,
  213. "column": column,
  214. }
  215. for table in tables
  216. for idx, column in enumerate(table["columns"])
  217. if not column.get("name_zh") or not column.get("name_en")
  218. ]
  219. report_tables = []
  220. for table in tables:
  221. report_tables.append(
  222. {
  223. **table,
  224. "backend_payload": _table_to_backend_payload(table),
  225. }
  226. )
  227. return {
  228. "mode": "dry-run",
  229. "source": str(source_path),
  230. "summary": {
  231. "table_count": len(tables),
  232. "column_count": sum(len(table["columns"]) for table in tables),
  233. "tables_missing_name_en": len(tables_missing_name_en),
  234. "columns_missing_name": len(empty_column_names),
  235. },
  236. "warnings": {
  237. "tables_missing_name_en": [
  238. {
  239. "source_table_index": table["source_table_index"],
  240. "name_zh": table["table_info"]["name_zh"],
  241. "generated_name_en": table["table_info"]["name_en"],
  242. }
  243. for table in tables_missing_name_en
  244. ],
  245. "columns_missing_name": empty_column_names,
  246. },
  247. "tables": report_tables,
  248. }
  249. def _api_url(base_url: str, path: str) -> str:
  250. return f"{base_url.rstrip('/')}/{path.lstrip('/')}"
  251. def _response_json(response: Any) -> dict[str, Any]:
  252. response.raise_for_status()
  253. data = response.json()
  254. if not isinstance(data, dict):
  255. raise ValueError(f"API returned non-object JSON: {data!r}")
  256. return data
  257. def _ensure_success(data: dict[str, Any], operation: str) -> None:
  258. if data.get("code") != 200:
  259. message = data.get("message") or data.get("error") or data
  260. raise RuntimeError(f"{operation} failed: {message}")
  261. def login_dataops(
  262. session: Any,
  263. base_url: str,
  264. username: str,
  265. password: str,
  266. timeout: int = 30,
  267. ) -> dict[str, Any]:
  268. response = session.post(
  269. _api_url(base_url, "/api/system/auth/login"),
  270. json={"username": username, "password": password},
  271. timeout=timeout,
  272. )
  273. data = _response_json(response)
  274. _ensure_success(data, "login")
  275. return data
  276. def find_existing_domain_id(
  277. session: Any,
  278. base_url: str,
  279. name_en: str,
  280. timeout: int = 60,
  281. ) -> int | None:
  282. response = session.post(
  283. _api_url(base_url, "/api/bd/list"),
  284. json={"current": 1, "size": 10, "name_en": name_en},
  285. timeout=timeout,
  286. )
  287. data = _response_json(response)
  288. _ensure_success(data, "find existing business domain")
  289. records = ((data.get("data") or {}).get("records") or [])
  290. for record in records:
  291. if record.get("name_en") == name_en and record.get("id") is not None:
  292. return int(record["id"])
  293. return None
  294. def import_tables(
  295. report_tables: list[dict[str, Any]],
  296. base_url: str,
  297. username: str = "",
  298. password: str = "",
  299. session: Any | None = None,
  300. timeout: int = 60,
  301. skip_login: bool = True,
  302. update_existing: bool = False,
  303. ) -> dict[str, Any]:
  304. http = session or requests.Session()
  305. if not skip_login:
  306. login_dataops(http, base_url, username, password, timeout=timeout)
  307. results: list[dict[str, Any]] = []
  308. for index, table in enumerate(report_tables, start=1):
  309. payload = table["backend_payload"]
  310. table_info = table["table_info"]
  311. result_item = {
  312. "index": index,
  313. "source_table_index": table.get("source_table_index"),
  314. "name_zh": table_info.get("name_zh"),
  315. "name_en": table_info.get("name_en"),
  316. "column_count": len(table.get("columns") or []),
  317. "status": "pending",
  318. }
  319. try:
  320. payload = dict(payload)
  321. existing_id = find_existing_domain_id(
  322. http,
  323. base_url,
  324. str(payload.get("name_en") or ""),
  325. timeout=timeout,
  326. )
  327. if existing_id is not None:
  328. result_item["existing_id"] = existing_id
  329. if not update_existing:
  330. result_item["operation"] = "skip_existing"
  331. result_item["status"] = "skipped"
  332. results.append(result_item)
  333. continue
  334. payload["id"] = existing_id
  335. result_item["operation"] = "update"
  336. endpoint = "/api/bd/update"
  337. else:
  338. result_item["operation"] = "create"
  339. endpoint = "/api/bd/save"
  340. response = http.post(
  341. _api_url(base_url, endpoint),
  342. json=payload,
  343. timeout=timeout,
  344. )
  345. data = _response_json(response)
  346. _ensure_success(data, "save business domain")
  347. result_item["status"] = "success"
  348. result_item["response"] = data.get("data")
  349. except Exception as exc: # noqa: BLE001 - keep batch import moving.
  350. result_item["status"] = "failed"
  351. result_item["error"] = str(exc)
  352. results.append(result_item)
  353. succeeded = sum(1 for item in results if item["status"] == "success")
  354. failed = sum(1 for item in results if item["status"] == "failed")
  355. skipped = sum(1 for item in results if item["status"] == "skipped")
  356. return {
  357. "mode": "commit",
  358. "base_url": base_url.rstrip("/"),
  359. "summary": {
  360. "attempted": len(results),
  361. "succeeded": succeeded,
  362. "failed": failed,
  363. "skipped": skipped,
  364. },
  365. "results": results,
  366. }
  367. def _find_default_docx() -> Path:
  368. matches = [
  369. path for path in Path("docs").glob("HOPMs*.docx")
  370. if not path.name.startswith("~$")
  371. ]
  372. if not matches:
  373. raise FileNotFoundError("未在 docs 目录找到 HOPMs*.docx")
  374. return matches[0]
  375. def _write_report(report: dict[str, Any], output_path: Path) -> None:
  376. output_path.parent.mkdir(parents=True, exist_ok=True)
  377. output_path.write_text(
  378. json.dumps(report, ensure_ascii=False, indent=2),
  379. encoding="utf-8",
  380. )
  381. def main() -> int:
  382. parser = argparse.ArgumentParser(
  383. description="Parse HOPMs DOCX definitions and generate a DataOps dry-run report."
  384. )
  385. parser.add_argument("--docx", type=Path, default=None, help="HOPMs DOCX path")
  386. parser.add_argument(
  387. "--output",
  388. type=Path,
  389. default=Path("docs/generated/hopms_dry_run.json"),
  390. help="Dry-run JSON output path",
  391. )
  392. parser.add_argument("--limit", type=int, default=0, help="Only include first N tables")
  393. parser.add_argument(
  394. "--commit",
  395. action="store_true",
  396. help="Submit parsed table definitions to DataOps.",
  397. )
  398. parser.add_argument(
  399. "--base-url",
  400. default="https://company.citupro.com:18183",
  401. help="DataOps platform base URL",
  402. )
  403. parser.add_argument("--username", default=None, help="DataOps username")
  404. parser.add_argument("--password", default=None, help="DataOps password")
  405. parser.add_argument(
  406. "--login",
  407. action="store_true",
  408. help="Validate username/password before importing. Default is direct API import.",
  409. )
  410. parser.add_argument(
  411. "--update-existing",
  412. action="store_true",
  413. help="Call /api/bd/update for existing domains. Default is to skip existing domains.",
  414. )
  415. parser.add_argument(
  416. "--import-output",
  417. type=Path,
  418. default=Path("docs/generated/hopms_import_result.json"),
  419. help="Commit-mode import result JSON output path",
  420. )
  421. args = parser.parse_args()
  422. docx_path = args.docx or _find_default_docx()
  423. tables = parse_hopms_docx(docx_path)
  424. if args.limit and args.limit > 0:
  425. tables = tables[: args.limit]
  426. report = build_dry_run_report(tables, source_path=docx_path)
  427. _write_report(report, args.output)
  428. summary = report["summary"]
  429. print(f"Dry-run report written: {args.output}")
  430. print(f"Tables: {summary['table_count']}")
  431. print(f"Columns: {summary['column_count']}")
  432. print(f"Generated table names: {summary['tables_missing_name_en']}")
  433. print(f"Columns missing name: {summary['columns_missing_name']}")
  434. if args.commit:
  435. if args.login and (not args.username or not args.password):
  436. raise SystemExit("--commit --login requires --username and --password")
  437. import_result = import_tables(
  438. report["tables"],
  439. base_url=args.base_url,
  440. username=args.username or "",
  441. password=args.password or "",
  442. skip_login=not args.login,
  443. update_existing=args.update_existing,
  444. )
  445. _write_report(import_result, args.import_output)
  446. import_summary = import_result["summary"]
  447. print(f"Import result written: {args.import_output}")
  448. print(f"Attempted: {import_summary['attempted']}")
  449. print(f"Succeeded: {import_summary['succeeded']}")
  450. print(f"Skipped: {import_summary['skipped']}")
  451. print(f"Failed: {import_summary['failed']}")
  452. if import_summary["failed"]:
  453. return 1
  454. return 0
  455. if __name__ == "__main__":
  456. raise SystemExit(main())