test_hopms_import.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. import sys
  2. from pathlib import Path
  3. from docx import Document
  4. sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
  5. from scripts.import_hopms_dataset import (
  6. build_dry_run_report,
  7. import_tables,
  8. parse_hopms_docx,
  9. )
  10. def _write_sample_doc(path: Path) -> None:
  11. doc = Document()
  12. doc.add_paragraph("门急诊业务")
  13. doc.add_paragraph("患者基本信息")
  14. doc.add_paragraph("用于对患者在挂号环节产生的挂号数据进行上传。")
  15. table = doc.add_table(rows=3, cols=6)
  16. headers = ["字段中文名", "字段名", "类型", "字节", "填报 / 要求", "说明"]
  17. for idx, header in enumerate(headers):
  18. table.rows[0].cells[idx].text = header
  19. table.rows[1].cells[0].text = "医疗机构代码"
  20. table.rows[1].cells[1].text = "YLJGDM"
  21. table.rows[1].cells[2].text = "字符串"
  22. table.rows[1].cells[3].text = "22"
  23. table.rows[1].cells[4].text = "必填"
  24. table.rows[1].cells[5].text = "复合主键。医疗机构在HOPMs的唯一识别码"
  25. table.rows[2].cells[0].text = "挂号时间"
  26. table.rows[2].cells[1].text = "GHSJ"
  27. table.rows[2].cells[2].text = "日期时间"
  28. table.rows[2].cells[3].text = ""
  29. table.rows[2].cells[4].text = "应填"
  30. table.rows[2].cells[5].text = "患者挂号时间"
  31. doc.add_paragraph("CV5501.12 入院病情代码")
  32. dict_table = doc.add_table(rows=2, cols=3)
  33. for idx, header in enumerate(["值", "值含义", "说明"]):
  34. dict_table.rows[0].cells[idx].text = header
  35. dict_table.rows[1].cells[0].text = "1"
  36. dict_table.rows[1].cells[1].text = "有"
  37. dict_table.rows[1].cells[2].text = "示例代码"
  38. doc.save(path)
  39. def test_parse_hopms_docx_extracts_field_definition_tables(tmp_path):
  40. docx_path = tmp_path / "sample.docx"
  41. _write_sample_doc(docx_path)
  42. tables = parse_hopms_docx(docx_path)
  43. assert len(tables) == 1
  44. table = tables[0]
  45. assert table["table_info"] == {
  46. "name_zh": "患者基本信息",
  47. "name_en": "patient_demographics",
  48. "description": "用于对患者在挂号环节产生的挂号数据进行上传。",
  49. }
  50. assert table["columns"][0] == {
  51. "name_zh": "医疗机构代码",
  52. "name_en": "YLJGDM",
  53. "data_type": "varchar(22)",
  54. "is_primary": True,
  55. "nullable": False,
  56. "comment": "复合主键。医疗机构在HOPMs的唯一识别码",
  57. "requirement": "必填",
  58. "length": "22",
  59. }
  60. assert table["columns"][1]["data_type"] == "timestamp"
  61. assert table["columns"][1]["nullable"] is True
  62. def test_build_dry_run_report_shapes_backend_payload(tmp_path):
  63. docx_path = tmp_path / "sample.docx"
  64. _write_sample_doc(docx_path)
  65. tables = parse_hopms_docx(docx_path)
  66. report = build_dry_run_report(tables, source_path=docx_path)
  67. assert report["summary"]["table_count"] == 1
  68. assert report["summary"]["column_count"] == 2
  69. assert report["summary"]["tables_missing_name_en"] == 0
  70. payload = report["tables"][0]["backend_payload"]
  71. assert payload["name_zh"] == "患者基本信息"
  72. assert payload["name_en"] == "patient_demographics"
  73. assert payload["type"] == "table"
  74. assert payload["category"] == "HOPMs标准数据集"
  75. assert payload["parsed_data"] == [
  76. {
  77. "name_zh": "医疗机构代码",
  78. "name_en": "YLJGDM",
  79. "data_type": "varchar(22)",
  80. "describe": "复合主键。医疗机构在HOPMs的唯一识别码",
  81. },
  82. {
  83. "name_zh": "挂号时间",
  84. "name_en": "GHSJ",
  85. "data_type": "timestamp",
  86. "describe": "患者挂号时间",
  87. },
  88. ]
  89. class _FakeResponse:
  90. def __init__(self, payload, status_code=200):
  91. self._payload = payload
  92. self.status_code = status_code
  93. self.text = str(payload)
  94. def raise_for_status(self):
  95. if self.status_code >= 400:
  96. raise RuntimeError(f"HTTP {self.status_code}")
  97. def json(self):
  98. return self._payload
  99. class _FakeSession:
  100. def __init__(self):
  101. self.calls = []
  102. def post(self, url, json=None, timeout=None):
  103. self.calls.append({"url": url, "json": json, "timeout": timeout})
  104. if url.endswith("/api/system/auth/login"):
  105. return _FakeResponse({"code": 200, "data": {"username": "testuser"}})
  106. if url.endswith("/api/bd/list"):
  107. return _FakeResponse({"code": 200, "data": {"records": []}})
  108. if url.endswith("/api/bd/save"):
  109. return _FakeResponse(
  110. {
  111. "code": 200,
  112. "data": {
  113. "id": 123,
  114. "name_zh": json["name_zh"],
  115. "name_en": json["name_en"],
  116. },
  117. }
  118. )
  119. if url.endswith("/api/bd/update"):
  120. return _FakeResponse(
  121. {
  122. "code": 200,
  123. "data": {
  124. "id": json["id"],
  125. "name_zh": json["name_zh"],
  126. "name_en": json["name_en"],
  127. },
  128. }
  129. )
  130. raise AssertionError(f"Unexpected URL: {url}")
  131. def test_import_tables_logs_in_and_posts_backend_payload(tmp_path):
  132. docx_path = tmp_path / "sample.docx"
  133. _write_sample_doc(docx_path)
  134. report = build_dry_run_report(parse_hopms_docx(docx_path), source_path=docx_path)
  135. session = _FakeSession()
  136. result = import_tables(
  137. report["tables"],
  138. base_url="https://example.test/",
  139. username="testuser",
  140. password="testpassword123",
  141. session=session,
  142. skip_login=False,
  143. )
  144. assert result["summary"] == {
  145. "attempted": 1,
  146. "succeeded": 1,
  147. "failed": 0,
  148. "skipped": 0,
  149. }
  150. assert session.calls[0]["url"] == "https://example.test/api/system/auth/login"
  151. assert session.calls[0]["json"] == {
  152. "username": "testuser",
  153. "password": "testpassword123",
  154. }
  155. assert session.calls[1]["url"] == "https://example.test/api/bd/list"
  156. assert session.calls[1]["json"] == {
  157. "current": 1,
  158. "size": 10,
  159. "name_en": "patient_demographics",
  160. }
  161. assert session.calls[2]["url"] == "https://example.test/api/bd/save"
  162. assert session.calls[2]["json"]["name_en"] == "patient_demographics"
  163. assert session.calls[2]["json"]["parsed_data"][0]["name_en"] == "YLJGDM"
  164. def test_import_tables_can_skip_login_when_endpoint_is_broken(tmp_path):
  165. docx_path = tmp_path / "sample.docx"
  166. _write_sample_doc(docx_path)
  167. report = build_dry_run_report(parse_hopms_docx(docx_path), source_path=docx_path)
  168. session = _FakeSession()
  169. result = import_tables(
  170. report["tables"],
  171. base_url="https://example.test/",
  172. username="",
  173. password="",
  174. session=session,
  175. skip_login=True,
  176. )
  177. assert result["summary"] == {
  178. "attempted": 1,
  179. "succeeded": 1,
  180. "failed": 0,
  181. "skipped": 0,
  182. }
  183. assert len(session.calls) == 2
  184. assert session.calls[0]["url"] == "https://example.test/api/bd/list"
  185. assert session.calls[1]["url"] == "https://example.test/api/bd/save"
  186. class _ExistingDomainSession(_FakeSession):
  187. def post(self, url, json=None, timeout=None):
  188. if url.endswith("/api/bd/list"):
  189. self.calls.append({"url": url, "json": json, "timeout": timeout})
  190. return _FakeResponse(
  191. {
  192. "code": 200,
  193. "data": {
  194. "records": [
  195. {
  196. "id": 456,
  197. "name_zh": "患者基本信息",
  198. "name_en": "patient_demographics",
  199. }
  200. ]
  201. },
  202. }
  203. )
  204. return super().post(url, json=json, timeout=timeout)
  205. def test_import_tables_skips_existing_domain_by_name_en(tmp_path):
  206. docx_path = tmp_path / "sample.docx"
  207. _write_sample_doc(docx_path)
  208. report = build_dry_run_report(parse_hopms_docx(docx_path), source_path=docx_path)
  209. session = _ExistingDomainSession()
  210. result = import_tables(
  211. report["tables"],
  212. base_url="https://example.test/",
  213. session=session,
  214. )
  215. assert result["summary"] == {
  216. "attempted": 1,
  217. "succeeded": 0,
  218. "failed": 0,
  219. "skipped": 1,
  220. }
  221. assert len(session.calls) == 1
  222. assert result["results"][0]["status"] == "skipped"
  223. assert result["results"][0]["existing_id"] == 456