123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535 |
- #!/usr/bin/env python3
- # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- """Iterate over all process PIDs and for each one of them invoke and
- test all psutil.Process() methods.
- """
- import enum
- import errno
- import multiprocessing
- import os
- import stat
- import time
- import traceback
- import psutil
- from psutil import AIX
- from psutil import BSD
- from psutil import FREEBSD
- from psutil import LINUX
- from psutil import MACOS
- from psutil import NETBSD
- from psutil import OPENBSD
- from psutil import OSX
- from psutil import POSIX
- from psutil import WINDOWS
- from psutil.tests import CI_TESTING
- from psutil.tests import PYTEST_PARALLEL
- from psutil.tests import VALID_PROC_STATUSES
- from psutil.tests import PsutilTestCase
- from psutil.tests import check_connection_ntuple
- from psutil.tests import create_sockets
- from psutil.tests import is_namedtuple
- from psutil.tests import is_win_secure_system_proc
- from psutil.tests import process_namespace
- from psutil.tests import pytest
- # Cuts the time in half, but (e.g.) on macOS the process pool stays
- # alive after join() (multiprocessing bug?), messing up other tests.
- USE_PROC_POOL = LINUX and not CI_TESTING and not PYTEST_PARALLEL
- def proc_info(pid):
- tcase = PsutilTestCase()
- def check_exception(exc, proc, name, ppid):
- tcase.assertEqual(exc.pid, pid)
- if exc.name is not None:
- tcase.assertEqual(exc.name, name)
- if isinstance(exc, psutil.ZombieProcess):
- tcase.assertProcessZombie(proc)
- if exc.ppid is not None:
- tcase.assertGreaterEqual(exc.ppid, 0)
- tcase.assertEqual(exc.ppid, ppid)
- elif isinstance(exc, psutil.NoSuchProcess):
- tcase.assertProcessGone(proc)
- str(exc)
- repr(exc)
- def do_wait():
- if pid != 0:
- try:
- proc.wait(0)
- except psutil.Error as exc:
- check_exception(exc, proc, name, ppid)
- try:
- proc = psutil.Process(pid)
- except psutil.NoSuchProcess:
- tcase.assertPidGone(pid)
- return {}
- try:
- d = proc.as_dict(['ppid', 'name'])
- except psutil.NoSuchProcess:
- tcase.assertProcessGone(proc)
- else:
- name, ppid = d['name'], d['ppid']
- info = {'pid': proc.pid}
- ns = process_namespace(proc)
- # We don't use oneshot() because in order not to fool
- # check_exception() in case of NSP.
- for fun, fun_name in ns.iter(ns.getters, clear_cache=False):
- try:
- info[fun_name] = fun()
- except psutil.Error as exc:
- check_exception(exc, proc, name, ppid)
- continue
- do_wait()
- return info
- class TestFetchAllProcesses(PsutilTestCase):
- """Test which iterates over all running processes and performs
- some sanity checks against Process API's returned values.
- Uses a process pool to get info about all processes.
- """
- def setUp(self):
- psutil._set_debug(False)
- # Using a pool in a CI env may result in deadlock, see:
- # https://github.com/giampaolo/psutil/issues/2104
- if USE_PROC_POOL:
- self.pool = multiprocessing.Pool()
- def tearDown(self):
- psutil._set_debug(True)
- if USE_PROC_POOL:
- self.pool.terminate()
- self.pool.join()
- def iter_proc_info(self):
- # Fixes "can't pickle <function proc_info>: it's not the
- # same object as test_process_all.proc_info".
- from psutil.tests.test_process_all import proc_info
- if USE_PROC_POOL:
- return self.pool.imap_unordered(proc_info, psutil.pids())
- else:
- ls = [proc_info(pid) for pid in psutil.pids()]
- return ls
- def test_all(self):
- failures = []
- for info in self.iter_proc_info():
- for name, value in info.items():
- meth = getattr(self, name)
- try:
- meth(value, info)
- except Exception: # noqa: BLE001
- s = '\n' + '=' * 70 + '\n'
- s += (
- "FAIL: name=test_{}, pid={}, ret={}\ninfo={}\n".format(
- name,
- info['pid'],
- repr(value),
- info,
- )
- )
- s += '-' * 70
- s += f"\n{traceback.format_exc()}"
- s = "\n".join((" " * 4) + i for i in s.splitlines()) + "\n"
- failures.append(s)
- else:
- if value not in (0, 0.0, [], None, '', {}):
- assert value, value
- if failures:
- raise self.fail(''.join(failures))
- def cmdline(self, ret, info):
- assert isinstance(ret, list)
- for part in ret:
- assert isinstance(part, str)
- def exe(self, ret, info):
- assert isinstance(ret, str)
- assert ret.strip() == ret
- if ret:
- if WINDOWS and not ret.endswith('.exe'):
- return # May be "Registry", "MemCompression", ...
- assert os.path.isabs(ret), ret
- # Note: os.stat() may return False even if the file is there
- # hence we skip the test, see:
- # http://stackoverflow.com/questions/3112546/os-path-exists-lies
- if POSIX and os.path.isfile(ret):
- if hasattr(os, 'access') and hasattr(os, "X_OK"):
- # XXX: may fail on MACOS
- try:
- assert os.access(ret, os.X_OK)
- except AssertionError:
- if os.path.exists(ret) and not CI_TESTING:
- raise
- def pid(self, ret, info):
- assert isinstance(ret, int)
- assert ret >= 0
- def ppid(self, ret, info):
- assert isinstance(ret, int)
- assert ret >= 0
- proc_info(ret)
- def name(self, ret, info):
- assert isinstance(ret, str)
- if WINDOWS and not ret and is_win_secure_system_proc(info['pid']):
- # https://github.com/giampaolo/psutil/issues/2338
- return
- # on AIX, "<exiting>" processes don't have names
- if not AIX:
- assert ret, repr(ret)
- def create_time(self, ret, info):
- assert isinstance(ret, float)
- try:
- assert ret >= 0
- except AssertionError:
- # XXX
- if OPENBSD and info['status'] == psutil.STATUS_ZOMBIE:
- pass
- else:
- raise
- # this can't be taken for granted on all platforms
- # self.assertGreaterEqual(ret, psutil.boot_time())
- # make sure returned value can be pretty printed
- # with strftime
- time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret))
- def uids(self, ret, info):
- assert is_namedtuple(ret)
- for uid in ret:
- assert isinstance(uid, int)
- assert uid >= 0
- def gids(self, ret, info):
- assert is_namedtuple(ret)
- # note: testing all gids as above seems not to be reliable for
- # gid == 30 (nodoby); not sure why.
- for gid in ret:
- assert isinstance(gid, int)
- if not MACOS and not NETBSD:
- assert gid >= 0
- def username(self, ret, info):
- assert isinstance(ret, str)
- assert ret.strip() == ret
- assert ret.strip()
- def status(self, ret, info):
- assert isinstance(ret, str)
- assert ret, ret
- assert ret != '?' # XXX
- assert ret in VALID_PROC_STATUSES
- def io_counters(self, ret, info):
- assert is_namedtuple(ret)
- for field in ret:
- assert isinstance(field, int)
- if field != -1:
- assert field >= 0
- def ionice(self, ret, info):
- if LINUX:
- assert isinstance(ret.ioclass, int)
- assert isinstance(ret.value, int)
- assert ret.ioclass >= 0
- assert ret.value >= 0
- else: # Windows, Cygwin
- choices = [
- psutil.IOPRIO_VERYLOW,
- psutil.IOPRIO_LOW,
- psutil.IOPRIO_NORMAL,
- psutil.IOPRIO_HIGH,
- ]
- assert isinstance(ret, int)
- assert ret >= 0
- assert ret in choices
- def num_threads(self, ret, info):
- assert isinstance(ret, int)
- if WINDOWS and ret == 0 and is_win_secure_system_proc(info['pid']):
- # https://github.com/giampaolo/psutil/issues/2338
- return
- assert ret >= 1
- def threads(self, ret, info):
- assert isinstance(ret, list)
- for t in ret:
- assert is_namedtuple(t)
- assert t.id >= 0
- assert t.user_time >= 0
- assert t.system_time >= 0
- for field in t:
- assert isinstance(field, (int, float))
- def cpu_times(self, ret, info):
- assert is_namedtuple(ret)
- for n in ret:
- assert isinstance(n, float)
- assert n >= 0
- # TODO: check ntuple fields
- def cpu_percent(self, ret, info):
- assert isinstance(ret, float)
- assert 0.0 <= ret <= 100.0, ret
- def cpu_num(self, ret, info):
- assert isinstance(ret, int)
- if FREEBSD and ret == -1:
- return
- assert ret >= 0
- if psutil.cpu_count() == 1:
- assert ret == 0
- assert ret in list(range(psutil.cpu_count()))
- def memory_info(self, ret, info):
- assert is_namedtuple(ret)
- for value in ret:
- assert isinstance(value, int)
- assert value >= 0
- if WINDOWS:
- assert ret.peak_wset >= ret.wset
- assert ret.peak_paged_pool >= ret.paged_pool
- assert ret.peak_nonpaged_pool >= ret.nonpaged_pool
- assert ret.peak_pagefile >= ret.pagefile
- def memory_full_info(self, ret, info):
- assert is_namedtuple(ret)
- total = psutil.virtual_memory().total
- for name in ret._fields:
- value = getattr(ret, name)
- assert isinstance(value, int)
- assert value >= 0
- if LINUX or (OSX and name in {'vms', 'data'}):
- # On Linux there are processes (e.g. 'goa-daemon') whose
- # VMS is incredibly high for some reason.
- continue
- assert value <= total, name
- if LINUX:
- assert ret.pss >= ret.uss
- def open_files(self, ret, info):
- assert isinstance(ret, list)
- for f in ret:
- assert isinstance(f.fd, int)
- assert isinstance(f.path, str)
- assert f.path.strip() == f.path
- if WINDOWS:
- assert f.fd == -1
- elif LINUX:
- assert isinstance(f.position, int)
- assert isinstance(f.mode, str)
- assert isinstance(f.flags, int)
- assert f.position >= 0
- assert f.mode in {'r', 'w', 'a', 'r+', 'a+'}
- assert f.flags > 0
- elif BSD and not f.path:
- # XXX see: https://github.com/giampaolo/psutil/issues/595
- continue
- assert os.path.isabs(f.path), f
- try:
- st = os.stat(f.path)
- except FileNotFoundError:
- pass
- else:
- assert stat.S_ISREG(st.st_mode), f
- def num_fds(self, ret, info):
- assert isinstance(ret, int)
- assert ret >= 0
- def net_connections(self, ret, info):
- with create_sockets():
- assert len(ret) == len(set(ret))
- for conn in ret:
- assert is_namedtuple(conn)
- check_connection_ntuple(conn)
- def cwd(self, ret, info):
- assert isinstance(ret, str)
- assert ret.strip() == ret
- if ret:
- assert os.path.isabs(ret), ret
- try:
- st = os.stat(ret)
- except OSError as err:
- if WINDOWS and psutil._psplatform.is_permission_err(err):
- pass
- # directory has been removed in mean time
- elif err.errno != errno.ENOENT:
- raise
- else:
- assert stat.S_ISDIR(st.st_mode)
- def memory_percent(self, ret, info):
- assert isinstance(ret, float)
- assert 0 <= ret <= 100, ret
- def is_running(self, ret, info):
- assert isinstance(ret, bool)
- def cpu_affinity(self, ret, info):
- assert isinstance(ret, list)
- assert ret != []
- cpus = list(range(psutil.cpu_count()))
- for n in ret:
- assert isinstance(n, int)
- assert n in cpus
- def terminal(self, ret, info):
- assert isinstance(ret, (str, type(None)))
- if ret is not None:
- assert os.path.isabs(ret), ret
- assert os.path.exists(ret), ret
- def memory_maps(self, ret, info):
- for nt in ret:
- assert isinstance(nt.addr, str)
- assert isinstance(nt.perms, str)
- assert isinstance(nt.path, str)
- for fname in nt._fields:
- value = getattr(nt, fname)
- if fname == 'path':
- if value.startswith(("[", "anon_inode:")): # linux
- continue
- if BSD and value == "pvclock": # seen on FreeBSD
- continue
- assert os.path.isabs(nt.path), nt.path
- # commented as on Linux we might get
- # '/foo/bar (deleted)'
- # assert os.path.exists(nt.path), nt.path
- elif fname == 'addr':
- assert value, repr(value)
- elif fname == 'perms':
- if not WINDOWS:
- assert value, repr(value)
- else:
- assert isinstance(value, int)
- assert value >= 0
- def num_handles(self, ret, info):
- assert isinstance(ret, int)
- assert ret >= 0
- def nice(self, ret, info):
- assert isinstance(ret, int)
- if POSIX:
- assert -20 <= ret <= 20, ret
- else:
- priorities = [
- getattr(psutil, x)
- for x in dir(psutil)
- if x.endswith('_PRIORITY_CLASS')
- ]
- assert ret in priorities
- assert isinstance(ret, enum.IntEnum)
- def num_ctx_switches(self, ret, info):
- assert is_namedtuple(ret)
- for value in ret:
- assert isinstance(value, int)
- assert value >= 0
- def rlimit(self, ret, info):
- assert isinstance(ret, tuple)
- assert len(ret) == 2
- assert ret[0] >= -1
- assert ret[1] >= -1
- def environ(self, ret, info):
- assert isinstance(ret, dict)
- for k, v in ret.items():
- assert isinstance(k, str)
- assert isinstance(v, str)
- class TestPidsRange(PsutilTestCase):
- """Given pid_exists() return value for a range of PIDs which may or
- may not exist, make sure that psutil.Process() and psutil.pids()
- agree with pid_exists(). This guarantees that the 3 APIs are all
- consistent with each other. See:
- https://github.com/giampaolo/psutil/issues/2359
- XXX - Note about Windows: it turns out there are some "hidden" PIDs
- which are not returned by psutil.pids() and are also not revealed
- by taskmgr.exe and ProcessHacker, still they can be instantiated by
- psutil.Process() and queried. One of such PIDs is "conhost.exe".
- Running as_dict() for it reveals that some Process() APIs
- erroneously raise NoSuchProcess, so we know we have problem there.
- Let's ignore this for now, since it's quite a corner case (who even
- imagined hidden PIDs existed on Windows?).
- """
- def setUp(self):
- psutil._set_debug(False)
- def tearDown(self):
- psutil._set_debug(True)
- def test_it(self):
- def is_linux_tid(pid):
- try:
- f = open(f"/proc/{pid}/status", "rb") # noqa: SIM115
- except FileNotFoundError:
- return False
- else:
- with f:
- for line in f:
- if line.startswith(b"Tgid:"):
- tgid = int(line.split()[1])
- # If tgid and pid are different then we're
- # dealing with a process TID.
- return tgid != pid
- raise ValueError("'Tgid' line not found")
- def check(pid):
- # In case of failure retry up to 3 times in order to avoid
- # race conditions, especially when running in a CI
- # environment where PIDs may appear and disappear at any
- # time.
- x = 3
- while True:
- exists = psutil.pid_exists(pid)
- try:
- if exists:
- psutil.Process(pid)
- if not WINDOWS: # see docstring
- assert pid in psutil.pids()
- else:
- # On OpenBSD thread IDs can be instantiated,
- # and oneshot() succeeds, but other APIs fail
- # with EINVAL.
- if not OPENBSD:
- with pytest.raises(psutil.NoSuchProcess):
- psutil.Process(pid)
- if not WINDOWS: # see docstring
- assert pid not in psutil.pids()
- except (psutil.Error, AssertionError):
- x -= 1
- if x == 0:
- raise
- else:
- return
- for pid in range(1, 3000):
- if LINUX and is_linux_tid(pid):
- # On Linux a TID (thread ID) can be passed to the
- # Process class and is querable like a PID (process
- # ID). Skip it.
- continue
- with self.subTest(pid=pid):
- check(pid)
|