test_windows.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914
  1. #!/usr/bin/env python3
  2. # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Windows specific tests."""
  6. import datetime
  7. import glob
  8. import os
  9. import platform
  10. import re
  11. import shutil
  12. import signal
  13. import subprocess
  14. import sys
  15. import time
  16. import warnings
  17. from unittest import mock
  18. import psutil
  19. from psutil import WINDOWS
  20. from psutil.tests import GITHUB_ACTIONS
  21. from psutil.tests import HAS_BATTERY
  22. from psutil.tests import IS_64BIT
  23. from psutil.tests import PYPY
  24. from psutil.tests import TOLERANCE_DISK_USAGE
  25. from psutil.tests import TOLERANCE_SYS_MEM
  26. from psutil.tests import PsutilTestCase
  27. from psutil.tests import pytest
  28. from psutil.tests import retry_on_failure
  29. from psutil.tests import sh
  30. from psutil.tests import spawn_testproc
  31. from psutil.tests import terminate
  32. if WINDOWS and not PYPY:
  33. with warnings.catch_warnings():
  34. warnings.simplefilter("ignore")
  35. import win32api # requires "pip install pywin32"
  36. import win32con
  37. import win32process
  38. import wmi # requires "pip install wmi" / "make install-pydeps-test"
  39. if WINDOWS:
  40. from psutil._pswindows import convert_oserror
  41. cext = psutil._psplatform.cext
  42. @pytest.mark.skipif(not WINDOWS, reason="WINDOWS only")
  43. @pytest.mark.skipif(PYPY, reason="pywin32 not available on PYPY")
  44. class WindowsTestCase(PsutilTestCase):
  45. pass
  46. def powershell(cmd):
  47. """Currently not used, but available just in case. Usage:
  48. >>> powershell(
  49. "Get-CIMInstance Win32_PageFileUsage | Select AllocatedBaseSize")
  50. """
  51. if not shutil.which("powershell.exe"):
  52. raise pytest.skip("powershell.exe not available")
  53. cmdline = (
  54. "powershell.exe -ExecutionPolicy Bypass -NoLogo -NonInteractive "
  55. f"-NoProfile -WindowStyle Hidden -Command \"{cmd}\"" # noqa: Q003
  56. )
  57. return sh(cmdline)
  58. def wmic(path, what, converter=int):
  59. """Currently not used, but available just in case. Usage:
  60. >>> wmic("Win32_OperatingSystem", "FreePhysicalMemory")
  61. 2134124534
  62. """
  63. out = sh(f"wmic path {path} get {what}").strip()
  64. data = "".join(out.splitlines()[1:]).strip() # get rid of the header
  65. if converter is not None:
  66. if "," in what:
  67. return tuple(converter(x) for x in data.split())
  68. else:
  69. return converter(data)
  70. else:
  71. return data
  72. # ===================================================================
  73. # System APIs
  74. # ===================================================================
  75. class TestCpuAPIs(WindowsTestCase):
  76. @pytest.mark.skipif(
  77. 'NUMBER_OF_PROCESSORS' not in os.environ,
  78. reason="NUMBER_OF_PROCESSORS env var is not available",
  79. )
  80. def test_cpu_count_vs_NUMBER_OF_PROCESSORS(self):
  81. # Will likely fail on many-cores systems:
  82. # https://stackoverflow.com/questions/31209256
  83. num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
  84. assert num_cpus == psutil.cpu_count()
  85. def test_cpu_count_vs_GetSystemInfo(self):
  86. # Will likely fail on many-cores systems:
  87. # https://stackoverflow.com/questions/31209256
  88. sys_value = win32api.GetSystemInfo()[5]
  89. psutil_value = psutil.cpu_count()
  90. assert sys_value == psutil_value
  91. def test_cpu_count_logical_vs_wmi(self):
  92. w = wmi.WMI()
  93. procs = sum(
  94. proc.NumberOfLogicalProcessors for proc in w.Win32_Processor()
  95. )
  96. assert psutil.cpu_count() == procs
  97. def test_cpu_count_cores_vs_wmi(self):
  98. w = wmi.WMI()
  99. cores = sum(proc.NumberOfCores for proc in w.Win32_Processor())
  100. assert psutil.cpu_count(logical=False) == cores
  101. def test_cpu_count_vs_cpu_times(self):
  102. assert psutil.cpu_count() == len(psutil.cpu_times(percpu=True))
  103. def test_cpu_freq(self):
  104. w = wmi.WMI()
  105. proc = w.Win32_Processor()[0]
  106. assert proc.CurrentClockSpeed == psutil.cpu_freq().current
  107. assert proc.MaxClockSpeed == psutil.cpu_freq().max
  108. class TestSystemAPIs(WindowsTestCase):
  109. def test_nic_names(self):
  110. out = sh('ipconfig /all')
  111. nics = psutil.net_io_counters(pernic=True).keys()
  112. for nic in nics:
  113. if "pseudo-interface" in nic.replace(' ', '-').lower():
  114. continue
  115. if nic not in out:
  116. raise self.fail(
  117. f"{nic!r} nic wasn't found in 'ipconfig /all' output"
  118. )
  119. def test_total_phymem(self):
  120. w = wmi.WMI().Win32_ComputerSystem()[0]
  121. assert int(w.TotalPhysicalMemory) == psutil.virtual_memory().total
  122. def test_free_phymem(self):
  123. w = wmi.WMI().Win32_PerfRawData_PerfOS_Memory()[0]
  124. assert (
  125. abs(int(w.AvailableBytes) - psutil.virtual_memory().free)
  126. < TOLERANCE_SYS_MEM
  127. )
  128. def test_total_swapmem(self):
  129. w = wmi.WMI().Win32_PerfRawData_PerfOS_Memory()[0]
  130. assert (
  131. int(w.CommitLimit) - psutil.virtual_memory().total
  132. == psutil.swap_memory().total
  133. )
  134. if psutil.swap_memory().total == 0:
  135. assert psutil.swap_memory().free == 0
  136. assert psutil.swap_memory().used == 0
  137. def test_percent_swapmem(self):
  138. if psutil.swap_memory().total > 0:
  139. w = wmi.WMI().Win32_PerfRawData_PerfOS_PagingFile(Name="_Total")[0]
  140. # calculate swap usage to percent
  141. percentSwap = int(w.PercentUsage) * 100 / int(w.PercentUsage_Base)
  142. # exact percent may change but should be reasonable
  143. # assert within +/- 5% and between 0 and 100%
  144. assert psutil.swap_memory().percent >= 0
  145. assert abs(psutil.swap_memory().percent - percentSwap) < 5
  146. assert psutil.swap_memory().percent <= 100
  147. # @pytest.mark.skipif(wmi is None, reason="wmi module is not installed")
  148. # def test__UPTIME(self):
  149. # # _UPTIME constant is not public but it is used internally
  150. # # as value to return for pid 0 creation time.
  151. # # WMI behaves the same.
  152. # w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  153. # p = psutil.Process(0)
  154. # wmic_create = str(w.CreationDate.split('.')[0])
  155. # psutil_create = time.strftime("%Y%m%d%H%M%S",
  156. # time.localtime(p.create_time()))
  157. # Note: this test is not very reliable
  158. @retry_on_failure()
  159. def test_pids(self):
  160. # Note: this test might fail if the OS is starting/killing
  161. # other processes in the meantime
  162. w = wmi.WMI().Win32_Process()
  163. wmi_pids = {x.ProcessId for x in w}
  164. psutil_pids = set(psutil.pids())
  165. assert wmi_pids == psutil_pids
  166. @retry_on_failure()
  167. def test_disks(self):
  168. ps_parts = psutil.disk_partitions(all=True)
  169. wmi_parts = wmi.WMI().Win32_LogicalDisk()
  170. for ps_part in ps_parts:
  171. for wmi_part in wmi_parts:
  172. if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
  173. if not ps_part.mountpoint:
  174. # this is usually a CD-ROM with no disk inserted
  175. break
  176. if 'cdrom' in ps_part.opts:
  177. break
  178. if ps_part.mountpoint.startswith('A:'):
  179. break # floppy
  180. try:
  181. usage = psutil.disk_usage(ps_part.mountpoint)
  182. except FileNotFoundError:
  183. # usually this is the floppy
  184. break
  185. assert usage.total == int(wmi_part.Size)
  186. wmi_free = int(wmi_part.FreeSpace)
  187. assert usage.free == wmi_free
  188. # 10 MB tolerance
  189. if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
  190. raise self.fail(f"psutil={usage.free}, wmi={wmi_free}")
  191. break
  192. else:
  193. raise self.fail(f"can't find partition {ps_part!r}")
  194. @retry_on_failure()
  195. def test_disk_usage(self):
  196. for disk in psutil.disk_partitions():
  197. if 'cdrom' in disk.opts:
  198. continue
  199. sys_value = win32api.GetDiskFreeSpaceEx(disk.mountpoint)
  200. psutil_value = psutil.disk_usage(disk.mountpoint)
  201. assert abs(sys_value[0] - psutil_value.free) < TOLERANCE_DISK_USAGE
  202. assert (
  203. abs(sys_value[1] - psutil_value.total) < TOLERANCE_DISK_USAGE
  204. )
  205. assert psutil_value.used == psutil_value.total - psutil_value.free
  206. def test_disk_partitions(self):
  207. sys_value = [
  208. x + '\\'
  209. for x in win32api.GetLogicalDriveStrings().split("\\\x00")
  210. if x and not x.startswith('A:')
  211. ]
  212. psutil_value = [
  213. x.mountpoint
  214. for x in psutil.disk_partitions(all=True)
  215. if not x.mountpoint.startswith('A:')
  216. ]
  217. assert sys_value == psutil_value
  218. def test_net_if_stats(self):
  219. ps_names = set(cext.net_if_stats())
  220. wmi_adapters = wmi.WMI().Win32_NetworkAdapter()
  221. wmi_names = set()
  222. for wmi_adapter in wmi_adapters:
  223. wmi_names.add(wmi_adapter.Name)
  224. wmi_names.add(wmi_adapter.NetConnectionID)
  225. assert (
  226. ps_names & wmi_names
  227. ), f"no common entries in {ps_names}, {wmi_names}"
  228. def test_boot_time(self):
  229. wmi_os = wmi.WMI().Win32_OperatingSystem()
  230. wmi_btime_str = wmi_os[0].LastBootUpTime.split('.')[0]
  231. wmi_btime_dt = datetime.datetime.strptime(
  232. wmi_btime_str, "%Y%m%d%H%M%S"
  233. )
  234. psutil_dt = datetime.datetime.fromtimestamp(psutil.boot_time())
  235. diff = abs((wmi_btime_dt - psutil_dt).total_seconds())
  236. assert diff <= 5
  237. def test_boot_time_fluctuation(self):
  238. # https://github.com/giampaolo/psutil/issues/1007
  239. with mock.patch('psutil._pswindows.cext.boot_time', return_value=5):
  240. assert psutil.boot_time() == 5
  241. with mock.patch('psutil._pswindows.cext.boot_time', return_value=4):
  242. assert psutil.boot_time() == 5
  243. with mock.patch('psutil._pswindows.cext.boot_time', return_value=6):
  244. assert psutil.boot_time() == 5
  245. with mock.patch('psutil._pswindows.cext.boot_time', return_value=333):
  246. assert psutil.boot_time() == 333
  247. # ===================================================================
  248. # sensors_battery()
  249. # ===================================================================
  250. class TestSensorsBattery(WindowsTestCase):
  251. def test_has_battery(self):
  252. if win32api.GetPwrCapabilities()['SystemBatteriesPresent']:
  253. assert psutil.sensors_battery() is not None
  254. else:
  255. assert psutil.sensors_battery() is None
  256. @pytest.mark.skipif(not HAS_BATTERY, reason="no battery")
  257. def test_percent(self):
  258. w = wmi.WMI()
  259. battery_wmi = w.query('select * from Win32_Battery')[0]
  260. battery_psutil = psutil.sensors_battery()
  261. assert (
  262. abs(battery_psutil.percent - battery_wmi.EstimatedChargeRemaining)
  263. < 1
  264. )
  265. @pytest.mark.skipif(not HAS_BATTERY, reason="no battery")
  266. def test_power_plugged(self):
  267. w = wmi.WMI()
  268. battery_wmi = w.query('select * from Win32_Battery')[0]
  269. battery_psutil = psutil.sensors_battery()
  270. # Status codes:
  271. # https://msdn.microsoft.com/en-us/library/aa394074(v=vs.85).aspx
  272. assert battery_psutil.power_plugged == (battery_wmi.BatteryStatus == 2)
  273. def test_emulate_no_battery(self):
  274. with mock.patch(
  275. "psutil._pswindows.cext.sensors_battery",
  276. return_value=(0, 128, 0, 0),
  277. ) as m:
  278. assert psutil.sensors_battery() is None
  279. assert m.called
  280. def test_emulate_power_connected(self):
  281. with mock.patch(
  282. "psutil._pswindows.cext.sensors_battery", return_value=(1, 0, 0, 0)
  283. ) as m:
  284. assert (
  285. psutil.sensors_battery().secsleft
  286. == psutil.POWER_TIME_UNLIMITED
  287. )
  288. assert m.called
  289. def test_emulate_power_charging(self):
  290. with mock.patch(
  291. "psutil._pswindows.cext.sensors_battery", return_value=(0, 8, 0, 0)
  292. ) as m:
  293. assert (
  294. psutil.sensors_battery().secsleft
  295. == psutil.POWER_TIME_UNLIMITED
  296. )
  297. assert m.called
  298. def test_emulate_secs_left_unknown(self):
  299. with mock.patch(
  300. "psutil._pswindows.cext.sensors_battery",
  301. return_value=(0, 0, 0, -1),
  302. ) as m:
  303. assert (
  304. psutil.sensors_battery().secsleft == psutil.POWER_TIME_UNKNOWN
  305. )
  306. assert m.called
  307. # ===================================================================
  308. # Process APIs
  309. # ===================================================================
  310. class TestProcess(WindowsTestCase):
  311. @classmethod
  312. def setUpClass(cls):
  313. cls.pid = spawn_testproc().pid
  314. @classmethod
  315. def tearDownClass(cls):
  316. terminate(cls.pid)
  317. def test_issue_24(self):
  318. p = psutil.Process(0)
  319. with pytest.raises(psutil.AccessDenied):
  320. p.kill()
  321. def test_special_pid(self):
  322. p = psutil.Process(4)
  323. assert p.name() == 'System'
  324. # use __str__ to access all common Process properties to check
  325. # that nothing strange happens
  326. str(p)
  327. p.username()
  328. assert p.create_time() >= 0.0
  329. try:
  330. rss, _vms = p.memory_info()[:2]
  331. except psutil.AccessDenied:
  332. # expected on Windows Vista and Windows 7
  333. if platform.uname()[1] not in {'vista', 'win-7', 'win7'}:
  334. raise
  335. else:
  336. assert rss > 0
  337. def test_send_signal(self):
  338. p = psutil.Process(self.pid)
  339. with pytest.raises(ValueError):
  340. p.send_signal(signal.SIGINT)
  341. def test_num_handles_increment(self):
  342. p = psutil.Process(os.getpid())
  343. before = p.num_handles()
  344. handle = win32api.OpenProcess(
  345. win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, os.getpid()
  346. )
  347. after = p.num_handles()
  348. assert after == before + 1
  349. win32api.CloseHandle(handle)
  350. assert p.num_handles() == before
  351. def test_ctrl_signals(self):
  352. p = psutil.Process(self.spawn_testproc().pid)
  353. p.send_signal(signal.CTRL_C_EVENT)
  354. p.send_signal(signal.CTRL_BREAK_EVENT)
  355. p.kill()
  356. p.wait()
  357. with pytest.raises(psutil.NoSuchProcess):
  358. p.send_signal(signal.CTRL_C_EVENT)
  359. with pytest.raises(psutil.NoSuchProcess):
  360. p.send_signal(signal.CTRL_BREAK_EVENT)
  361. def test_username(self):
  362. name = win32api.GetUserNameEx(win32con.NameSamCompatible)
  363. if name.endswith('$'):
  364. # When running as a service account (most likely to be
  365. # NetworkService), these user name calculations don't produce the
  366. # same result, causing the test to fail.
  367. raise pytest.skip('running as service account')
  368. assert psutil.Process().username() == name
  369. def test_cmdline(self):
  370. sys_value = re.sub(r"[ ]+", " ", win32api.GetCommandLine()).strip()
  371. psutil_value = ' '.join(psutil.Process().cmdline())
  372. if sys_value[0] == '"' != psutil_value[0]:
  373. # The PyWin32 command line may retain quotes around argv[0] if they
  374. # were used unnecessarily, while psutil will omit them. So remove
  375. # the first 2 quotes from sys_value if not in psutil_value.
  376. # A path to an executable will not contain quotes, so this is safe.
  377. sys_value = sys_value.replace('"', '', 2)
  378. assert sys_value == psutil_value
  379. # XXX - occasional failures
  380. # def test_cpu_times(self):
  381. # handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
  382. # win32con.FALSE, os.getpid())
  383. # self.addCleanup(win32api.CloseHandle, handle)
  384. # sys_value = win32process.GetProcessTimes(handle)
  385. # psutil_value = psutil.Process().cpu_times()
  386. # self.assertAlmostEqual(
  387. # psutil_value.user, sys_value['UserTime'] / 10000000.0,
  388. # delta=0.2)
  389. # self.assertAlmostEqual(
  390. # psutil_value.user, sys_value['KernelTime'] / 10000000.0,
  391. # delta=0.2)
  392. def test_nice(self):
  393. handle = win32api.OpenProcess(
  394. win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, os.getpid()
  395. )
  396. self.addCleanup(win32api.CloseHandle, handle)
  397. sys_value = win32process.GetPriorityClass(handle)
  398. psutil_value = psutil.Process().nice()
  399. assert psutil_value == sys_value
  400. def test_memory_info(self):
  401. handle = win32api.OpenProcess(
  402. win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, self.pid
  403. )
  404. self.addCleanup(win32api.CloseHandle, handle)
  405. sys_value = win32process.GetProcessMemoryInfo(handle)
  406. psutil_value = psutil.Process(self.pid).memory_info()
  407. assert sys_value['PeakWorkingSetSize'] == psutil_value.peak_wset
  408. assert sys_value['WorkingSetSize'] == psutil_value.wset
  409. assert (
  410. sys_value['QuotaPeakPagedPoolUsage']
  411. == psutil_value.peak_paged_pool
  412. )
  413. assert sys_value['QuotaPagedPoolUsage'] == psutil_value.paged_pool
  414. assert (
  415. sys_value['QuotaPeakNonPagedPoolUsage']
  416. == psutil_value.peak_nonpaged_pool
  417. )
  418. assert (
  419. sys_value['QuotaNonPagedPoolUsage'] == psutil_value.nonpaged_pool
  420. )
  421. assert sys_value['PagefileUsage'] == psutil_value.pagefile
  422. assert sys_value['PeakPagefileUsage'] == psutil_value.peak_pagefile
  423. assert psutil_value.rss == psutil_value.wset
  424. assert psutil_value.vms == psutil_value.pagefile
  425. def test_wait(self):
  426. handle = win32api.OpenProcess(
  427. win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, self.pid
  428. )
  429. self.addCleanup(win32api.CloseHandle, handle)
  430. p = psutil.Process(self.pid)
  431. p.terminate()
  432. psutil_value = p.wait()
  433. sys_value = win32process.GetExitCodeProcess(handle)
  434. assert psutil_value == sys_value
  435. def test_cpu_affinity(self):
  436. def from_bitmask(x):
  437. return [i for i in range(64) if (1 << i) & x]
  438. handle = win32api.OpenProcess(
  439. win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, self.pid
  440. )
  441. self.addCleanup(win32api.CloseHandle, handle)
  442. sys_value = from_bitmask(
  443. win32process.GetProcessAffinityMask(handle)[0]
  444. )
  445. psutil_value = psutil.Process(self.pid).cpu_affinity()
  446. assert psutil_value == sys_value
  447. def test_io_counters(self):
  448. handle = win32api.OpenProcess(
  449. win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, os.getpid()
  450. )
  451. self.addCleanup(win32api.CloseHandle, handle)
  452. sys_value = win32process.GetProcessIoCounters(handle)
  453. psutil_value = psutil.Process().io_counters()
  454. assert psutil_value.read_count == sys_value['ReadOperationCount']
  455. assert psutil_value.write_count == sys_value['WriteOperationCount']
  456. assert psutil_value.read_bytes == sys_value['ReadTransferCount']
  457. assert psutil_value.write_bytes == sys_value['WriteTransferCount']
  458. assert psutil_value.other_count == sys_value['OtherOperationCount']
  459. assert psutil_value.other_bytes == sys_value['OtherTransferCount']
  460. def test_num_handles(self):
  461. import ctypes
  462. import ctypes.wintypes
  463. PROCESS_QUERY_INFORMATION = 0x400
  464. handle = ctypes.windll.kernel32.OpenProcess(
  465. PROCESS_QUERY_INFORMATION, 0, self.pid
  466. )
  467. self.addCleanup(ctypes.windll.kernel32.CloseHandle, handle)
  468. hndcnt = ctypes.wintypes.DWORD()
  469. ctypes.windll.kernel32.GetProcessHandleCount(
  470. handle, ctypes.byref(hndcnt)
  471. )
  472. sys_value = hndcnt.value
  473. psutil_value = psutil.Process(self.pid).num_handles()
  474. assert psutil_value == sys_value
  475. def test_error_partial_copy(self):
  476. # https://github.com/giampaolo/psutil/issues/875
  477. exc = OSError()
  478. exc.winerror = 299
  479. with mock.patch("psutil._psplatform.cext.proc_cwd", side_effect=exc):
  480. with mock.patch("time.sleep") as m:
  481. p = psutil.Process()
  482. with pytest.raises(psutil.AccessDenied):
  483. p.cwd()
  484. assert m.call_count >= 5
  485. def test_exe(self):
  486. # NtQuerySystemInformation succeeds if process is gone. Make sure
  487. # it raises NSP for a non existent pid.
  488. pid = psutil.pids()[-1] + 99999
  489. proc = psutil._psplatform.Process(pid)
  490. with pytest.raises(psutil.NoSuchProcess):
  491. proc.exe()
  492. class TestProcessWMI(WindowsTestCase):
  493. """Compare Process API results with WMI."""
  494. @classmethod
  495. def setUpClass(cls):
  496. cls.pid = spawn_testproc().pid
  497. @classmethod
  498. def tearDownClass(cls):
  499. terminate(cls.pid)
  500. def test_name(self):
  501. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  502. p = psutil.Process(self.pid)
  503. assert p.name() == w.Caption
  504. # This fail on github because using virtualenv for test environment
  505. @pytest.mark.skipif(
  506. GITHUB_ACTIONS, reason="unreliable path on GITHUB_ACTIONS"
  507. )
  508. def test_exe(self):
  509. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  510. p = psutil.Process(self.pid)
  511. # Note: wmi reports the exe as a lower case string.
  512. # Being Windows paths case-insensitive we ignore that.
  513. assert p.exe().lower() == w.ExecutablePath.lower()
  514. def test_cmdline(self):
  515. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  516. p = psutil.Process(self.pid)
  517. assert ' '.join(p.cmdline()) == w.CommandLine.replace('"', '')
  518. def test_username(self):
  519. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  520. p = psutil.Process(self.pid)
  521. domain, _, username = w.GetOwner()
  522. username = f"{domain}\\{username}"
  523. assert p.username() == username
  524. @retry_on_failure()
  525. def test_memory_rss(self):
  526. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  527. p = psutil.Process(self.pid)
  528. rss = p.memory_info().rss
  529. assert rss == int(w.WorkingSetSize)
  530. @retry_on_failure()
  531. def test_memory_vms(self):
  532. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  533. p = psutil.Process(self.pid)
  534. vms = p.memory_info().vms
  535. # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
  536. # ...claims that PageFileUsage is represented in Kilo
  537. # bytes but funnily enough on certain platforms bytes are
  538. # returned instead.
  539. wmi_usage = int(w.PageFileUsage)
  540. if vms not in {wmi_usage, wmi_usage * 1024}:
  541. raise self.fail(f"wmi={wmi_usage}, psutil={vms}")
  542. def test_create_time(self):
  543. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  544. p = psutil.Process(self.pid)
  545. wmic_create = str(w.CreationDate.split('.')[0])
  546. psutil_create = time.strftime(
  547. "%Y%m%d%H%M%S", time.localtime(p.create_time())
  548. )
  549. assert wmic_create == psutil_create
  550. # ---
  551. @pytest.mark.skipif(not WINDOWS, reason="WINDOWS only")
  552. class TestDualProcessImplementation(PsutilTestCase):
  553. """Certain APIs on Windows have 2 internal implementations, one
  554. based on documented Windows APIs, another one based
  555. NtQuerySystemInformation() which gets called as fallback in
  556. case the first fails because of limited permission error.
  557. Here we test that the two methods return the exact same value,
  558. see:
  559. https://github.com/giampaolo/psutil/issues/304.
  560. """
  561. @classmethod
  562. def setUpClass(cls):
  563. cls.pid = spawn_testproc().pid
  564. @classmethod
  565. def tearDownClass(cls):
  566. terminate(cls.pid)
  567. def test_memory_info(self):
  568. mem_1 = psutil.Process(self.pid).memory_info()
  569. with mock.patch(
  570. "psutil._psplatform.cext.proc_memory_info",
  571. side_effect=PermissionError,
  572. ) as fun:
  573. mem_2 = psutil.Process(self.pid).memory_info()
  574. assert len(mem_1) == len(mem_2)
  575. for i in range(len(mem_1)):
  576. assert mem_1[i] >= 0
  577. assert mem_2[i] >= 0
  578. assert abs(mem_1[i] - mem_2[i]) < 512
  579. assert fun.called
  580. def test_create_time(self):
  581. ctime = psutil.Process(self.pid).create_time()
  582. with mock.patch(
  583. "psutil._psplatform.cext.proc_times",
  584. side_effect=PermissionError,
  585. ) as fun:
  586. assert psutil.Process(self.pid).create_time() == ctime
  587. assert fun.called
  588. def test_cpu_times(self):
  589. cpu_times_1 = psutil.Process(self.pid).cpu_times()
  590. with mock.patch(
  591. "psutil._psplatform.cext.proc_times",
  592. side_effect=PermissionError,
  593. ) as fun:
  594. cpu_times_2 = psutil.Process(self.pid).cpu_times()
  595. assert fun.called
  596. assert abs(cpu_times_1.user - cpu_times_2.user) < 0.01
  597. assert abs(cpu_times_1.system - cpu_times_2.system) < 0.01
  598. def test_io_counters(self):
  599. io_counters_1 = psutil.Process(self.pid).io_counters()
  600. with mock.patch(
  601. "psutil._psplatform.cext.proc_io_counters",
  602. side_effect=PermissionError,
  603. ) as fun:
  604. io_counters_2 = psutil.Process(self.pid).io_counters()
  605. for i in range(len(io_counters_1)):
  606. assert abs(io_counters_1[i] - io_counters_2[i]) < 5
  607. assert fun.called
  608. def test_num_handles(self):
  609. num_handles = psutil.Process(self.pid).num_handles()
  610. with mock.patch(
  611. "psutil._psplatform.cext.proc_num_handles",
  612. side_effect=PermissionError,
  613. ) as fun:
  614. assert psutil.Process(self.pid).num_handles() == num_handles
  615. assert fun.called
  616. def test_cmdline(self):
  617. for pid in psutil.pids():
  618. try:
  619. a = cext.proc_cmdline(pid, use_peb=True)
  620. b = cext.proc_cmdline(pid, use_peb=False)
  621. except OSError as err:
  622. err = convert_oserror(err)
  623. if not isinstance(
  624. err, (psutil.AccessDenied, psutil.NoSuchProcess)
  625. ):
  626. raise
  627. else:
  628. assert a == b
  629. @pytest.mark.skipif(not WINDOWS, reason="WINDOWS only")
  630. class RemoteProcessTestCase(PsutilTestCase):
  631. """Certain functions require calling ReadProcessMemory.
  632. This trivially works when called on the current process.
  633. Check that this works on other processes, especially when they
  634. have a different bitness.
  635. """
  636. @staticmethod
  637. def find_other_interpreter():
  638. # find a python interpreter that is of the opposite bitness from us
  639. code = "import sys; sys.stdout.write(str(sys.maxsize > 2**32))"
  640. # XXX: a different and probably more stable approach might be to access
  641. # the registry but accessing 64 bit paths from a 32 bit process
  642. for filename in glob.glob(r"C:\Python*\python.exe"):
  643. proc = subprocess.Popen(
  644. args=[filename, "-c", code],
  645. stdout=subprocess.PIPE,
  646. stderr=subprocess.STDOUT,
  647. )
  648. output, _ = proc.communicate()
  649. proc.wait()
  650. if output == str(not IS_64BIT):
  651. return filename
  652. test_args = ["-c", "import sys; sys.stdin.read()"]
  653. def setUp(self):
  654. super().setUp()
  655. other_python = self.find_other_interpreter()
  656. if other_python is None:
  657. raise pytest.skip(
  658. "could not find interpreter with opposite bitness"
  659. )
  660. if IS_64BIT:
  661. self.python64 = sys.executable
  662. self.python32 = other_python
  663. else:
  664. self.python64 = other_python
  665. self.python32 = sys.executable
  666. env = os.environ.copy()
  667. env["THINK_OF_A_NUMBER"] = str(os.getpid())
  668. self.proc32 = self.spawn_testproc(
  669. [self.python32] + self.test_args, env=env, stdin=subprocess.PIPE
  670. )
  671. self.proc64 = self.spawn_testproc(
  672. [self.python64] + self.test_args, env=env, stdin=subprocess.PIPE
  673. )
  674. def tearDown(self):
  675. super().tearDown()
  676. self.proc32.communicate()
  677. self.proc64.communicate()
  678. def test_cmdline_32(self):
  679. p = psutil.Process(self.proc32.pid)
  680. assert len(p.cmdline()) == 3
  681. assert p.cmdline()[1:] == self.test_args
  682. def test_cmdline_64(self):
  683. p = psutil.Process(self.proc64.pid)
  684. assert len(p.cmdline()) == 3
  685. assert p.cmdline()[1:] == self.test_args
  686. def test_cwd_32(self):
  687. p = psutil.Process(self.proc32.pid)
  688. assert p.cwd() == os.getcwd()
  689. def test_cwd_64(self):
  690. p = psutil.Process(self.proc64.pid)
  691. assert p.cwd() == os.getcwd()
  692. def test_environ_32(self):
  693. p = psutil.Process(self.proc32.pid)
  694. e = p.environ()
  695. assert "THINK_OF_A_NUMBER" in e
  696. assert e["THINK_OF_A_NUMBER"] == str(os.getpid())
  697. def test_environ_64(self):
  698. p = psutil.Process(self.proc64.pid)
  699. try:
  700. p.environ()
  701. except psutil.AccessDenied:
  702. pass
  703. # ===================================================================
  704. # Windows services
  705. # ===================================================================
  706. @pytest.mark.skipif(not WINDOWS, reason="WINDOWS only")
  707. class TestServices(PsutilTestCase):
  708. def test_win_service_iter(self):
  709. valid_statuses = {
  710. "running",
  711. "paused",
  712. "start",
  713. "pause",
  714. "continue",
  715. "stop",
  716. "stopped",
  717. }
  718. valid_start_types = {"automatic", "manual", "disabled"}
  719. valid_statuses = {
  720. "running",
  721. "paused",
  722. "start_pending",
  723. "pause_pending",
  724. "continue_pending",
  725. "stop_pending",
  726. "stopped",
  727. }
  728. for serv in psutil.win_service_iter():
  729. data = serv.as_dict()
  730. assert isinstance(data['name'], str)
  731. assert data['name'].strip()
  732. assert isinstance(data['display_name'], str)
  733. assert isinstance(data['username'], str)
  734. assert data['status'] in valid_statuses
  735. if data['pid'] is not None:
  736. psutil.Process(data['pid'])
  737. assert isinstance(data['binpath'], str)
  738. assert isinstance(data['username'], str)
  739. assert isinstance(data['start_type'], str)
  740. assert data['start_type'] in valid_start_types
  741. assert data['status'] in valid_statuses
  742. assert isinstance(data['description'], str)
  743. pid = serv.pid()
  744. if pid is not None:
  745. p = psutil.Process(pid)
  746. assert p.is_running()
  747. # win_service_get
  748. s = psutil.win_service_get(serv.name())
  749. # test __eq__
  750. assert serv == s
  751. def test_win_service_get(self):
  752. ERROR_SERVICE_DOES_NOT_EXIST = (
  753. psutil._psplatform.cext.ERROR_SERVICE_DOES_NOT_EXIST
  754. )
  755. ERROR_ACCESS_DENIED = psutil._psplatform.cext.ERROR_ACCESS_DENIED
  756. name = next(psutil.win_service_iter()).name()
  757. with pytest.raises(psutil.NoSuchProcess) as cm:
  758. psutil.win_service_get(name + '???')
  759. assert cm.value.name == name + '???'
  760. # test NoSuchProcess
  761. service = psutil.win_service_get(name)
  762. exc = OSError(0, "msg", 0)
  763. exc.winerror = ERROR_SERVICE_DOES_NOT_EXIST
  764. with mock.patch(
  765. "psutil._psplatform.cext.winservice_query_status", side_effect=exc
  766. ):
  767. with pytest.raises(psutil.NoSuchProcess):
  768. service.status()
  769. with mock.patch(
  770. "psutil._psplatform.cext.winservice_query_config", side_effect=exc
  771. ):
  772. with pytest.raises(psutil.NoSuchProcess):
  773. service.username()
  774. # test AccessDenied
  775. exc = OSError(0, "msg", 0)
  776. exc.winerror = ERROR_ACCESS_DENIED
  777. with mock.patch(
  778. "psutil._psplatform.cext.winservice_query_status", side_effect=exc
  779. ):
  780. with pytest.raises(psutil.AccessDenied):
  781. service.status()
  782. with mock.patch(
  783. "psutil._psplatform.cext.winservice_query_config", side_effect=exc
  784. ):
  785. with pytest.raises(psutil.AccessDenied):
  786. service.username()
  787. # test __str__ and __repr__
  788. assert service.name() in str(service)
  789. assert service.display_name() in str(service)
  790. assert service.name() in repr(service)
  791. assert service.display_name() in repr(service)