123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566 |
- #!/usr/bin/env python3
- # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- """Tests for psutil.net_connections() and Process.net_connections() APIs."""
- import os
- import socket
- import textwrap
- from contextlib import closing
- from socket import AF_INET
- from socket import AF_INET6
- from socket import SOCK_DGRAM
- from socket import SOCK_STREAM
- import psutil
- from psutil import FREEBSD
- from psutil import LINUX
- from psutil import MACOS
- from psutil import NETBSD
- from psutil import OPENBSD
- from psutil import POSIX
- from psutil import SUNOS
- from psutil import WINDOWS
- from psutil._common import supports_ipv6
- from psutil.tests import AF_UNIX
- from psutil.tests import HAS_NET_CONNECTIONS_UNIX
- from psutil.tests import SKIP_SYSCONS
- from psutil.tests import PsutilTestCase
- from psutil.tests import bind_socket
- from psutil.tests import bind_unix_socket
- from psutil.tests import check_connection_ntuple
- from psutil.tests import create_sockets
- from psutil.tests import filter_proc_net_connections
- from psutil.tests import pytest
- from psutil.tests import reap_children
- from psutil.tests import retry_on_failure
- from psutil.tests import skip_on_access_denied
- from psutil.tests import tcp_socketpair
- from psutil.tests import unix_socketpair
- from psutil.tests import wait_for_file
- SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object())
- def this_proc_net_connections(kind):
- cons = psutil.Process().net_connections(kind=kind)
- if kind in {"all", "unix"}:
- return filter_proc_net_connections(cons)
- return cons
- @pytest.mark.xdist_group(name="serial")
- class ConnectionTestCase(PsutilTestCase):
- def setUp(self):
- assert this_proc_net_connections(kind='all') == []
- def tearDown(self):
- # Make sure we closed all resources.
- assert this_proc_net_connections(kind='all') == []
- def compare_procsys_connections(self, pid, proc_cons, kind='all'):
- """Given a process PID and its list of connections compare
- those against system-wide connections retrieved via
- psutil.net_connections.
- """
- try:
- sys_cons = psutil.net_connections(kind=kind)
- except psutil.AccessDenied:
- # On MACOS, system-wide connections are retrieved by iterating
- # over all processes
- if MACOS:
- return
- else:
- raise
- # Filter for this proc PID and exlucde PIDs from the tuple.
- sys_cons = [c[:-1] for c in sys_cons if c.pid == pid]
- sys_cons.sort()
- proc_cons.sort()
- assert proc_cons == sys_cons
- class TestBasicOperations(ConnectionTestCase):
- @pytest.mark.skipif(SKIP_SYSCONS, reason="requires root")
- def test_system(self):
- with create_sockets():
- for conn in psutil.net_connections(kind='all'):
- check_connection_ntuple(conn)
- def test_process(self):
- with create_sockets():
- for conn in this_proc_net_connections(kind='all'):
- check_connection_ntuple(conn)
- def test_invalid_kind(self):
- with pytest.raises(ValueError):
- this_proc_net_connections(kind='???')
- with pytest.raises(ValueError):
- psutil.net_connections(kind='???')
- @pytest.mark.xdist_group(name="serial")
- class TestUnconnectedSockets(ConnectionTestCase):
- """Tests sockets which are open but not connected to anything."""
- def get_conn_from_sock(self, sock):
- cons = this_proc_net_connections(kind='all')
- smap = {c.fd: c for c in cons}
- if NETBSD or FREEBSD:
- # NetBSD opens a UNIX socket to /var/log/run
- # so there may be more connections.
- return smap[sock.fileno()]
- else:
- assert len(cons) == 1
- if cons[0].fd != -1:
- assert smap[sock.fileno()].fd == sock.fileno()
- return cons[0]
- def check_socket(self, sock):
- """Given a socket, makes sure it matches the one obtained
- via psutil. It assumes this process created one connection
- only (the one supposed to be checked).
- """
- conn = self.get_conn_from_sock(sock)
- check_connection_ntuple(conn)
- # fd, family, type
- if conn.fd != -1:
- assert conn.fd == sock.fileno()
- assert conn.family == sock.family
- # see: http://bugs.python.org/issue30204
- assert conn.type == sock.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
- # local address
- laddr = sock.getsockname()
- if not laddr and isinstance(laddr, bytes):
- # See: http://bugs.python.org/issue30205
- laddr = laddr.decode()
- if sock.family == AF_INET6:
- laddr = laddr[:2]
- assert conn.laddr == laddr
- # XXX Solaris can't retrieve system-wide UNIX sockets
- if sock.family == AF_UNIX and HAS_NET_CONNECTIONS_UNIX:
- cons = this_proc_net_connections(kind='all')
- self.compare_procsys_connections(os.getpid(), cons, kind='all')
- return conn
- def test_tcp_v4(self):
- addr = ("127.0.0.1", 0)
- with closing(bind_socket(AF_INET, SOCK_STREAM, addr=addr)) as sock:
- conn = self.check_socket(sock)
- assert conn.raddr == ()
- assert conn.status == psutil.CONN_LISTEN
- @pytest.mark.skipif(not supports_ipv6(), reason="IPv6 not supported")
- def test_tcp_v6(self):
- addr = ("::1", 0)
- with closing(bind_socket(AF_INET6, SOCK_STREAM, addr=addr)) as sock:
- conn = self.check_socket(sock)
- assert conn.raddr == ()
- assert conn.status == psutil.CONN_LISTEN
- def test_udp_v4(self):
- addr = ("127.0.0.1", 0)
- with closing(bind_socket(AF_INET, SOCK_DGRAM, addr=addr)) as sock:
- conn = self.check_socket(sock)
- assert conn.raddr == ()
- assert conn.status == psutil.CONN_NONE
- @pytest.mark.skipif(not supports_ipv6(), reason="IPv6 not supported")
- def test_udp_v6(self):
- addr = ("::1", 0)
- with closing(bind_socket(AF_INET6, SOCK_DGRAM, addr=addr)) as sock:
- conn = self.check_socket(sock)
- assert conn.raddr == ()
- assert conn.status == psutil.CONN_NONE
- @pytest.mark.skipif(not POSIX, reason="POSIX only")
- def test_unix_tcp(self):
- testfn = self.get_testfn()
- with closing(bind_unix_socket(testfn, type=SOCK_STREAM)) as sock:
- conn = self.check_socket(sock)
- assert conn.raddr == ""
- assert conn.status == psutil.CONN_NONE
- @pytest.mark.skipif(not POSIX, reason="POSIX only")
- def test_unix_udp(self):
- testfn = self.get_testfn()
- with closing(bind_unix_socket(testfn, type=SOCK_STREAM)) as sock:
- conn = self.check_socket(sock)
- assert conn.raddr == ""
- assert conn.status == psutil.CONN_NONE
- @pytest.mark.xdist_group(name="serial")
- class TestConnectedSocket(ConnectionTestCase):
- """Test socket pairs which are actually connected to
- each other.
- """
- # On SunOS, even after we close() it, the server socket stays around
- # in TIME_WAIT state.
- @pytest.mark.skipif(SUNOS, reason="unreliable on SUONS")
- def test_tcp(self):
- addr = ("127.0.0.1", 0)
- assert this_proc_net_connections(kind='tcp4') == []
- server, client = tcp_socketpair(AF_INET, addr=addr)
- try:
- cons = this_proc_net_connections(kind='tcp4')
- assert len(cons) == 2
- assert cons[0].status == psutil.CONN_ESTABLISHED
- assert cons[1].status == psutil.CONN_ESTABLISHED
- # May not be fast enough to change state so it stays
- # commenteed.
- # client.close()
- # cons = this_proc_net_connections(kind='all')
- # self.assertEqual(len(cons), 1)
- # self.assertEqual(cons[0].status, psutil.CONN_CLOSE_WAIT)
- finally:
- server.close()
- client.close()
- @pytest.mark.skipif(not POSIX, reason="POSIX only")
- def test_unix(self):
- testfn = self.get_testfn()
- server, client = unix_socketpair(testfn)
- try:
- cons = this_proc_net_connections(kind='unix')
- assert not (cons[0].laddr and cons[0].raddr), cons
- assert not (cons[1].laddr and cons[1].raddr), cons
- if NETBSD or FREEBSD:
- # On NetBSD creating a UNIX socket will cause
- # a UNIX connection to /var/run/log.
- cons = [c for c in cons if c.raddr != '/var/run/log']
- assert len(cons) == 2
- if LINUX or FREEBSD or SUNOS or OPENBSD:
- # remote path is never set
- assert cons[0].raddr == ""
- assert cons[1].raddr == ""
- # one local address should though
- assert testfn == (cons[0].laddr or cons[1].laddr)
- else:
- # On other systems either the laddr or raddr
- # of both peers are set.
- assert (cons[0].laddr or cons[1].laddr) == testfn
- finally:
- server.close()
- client.close()
- class TestFilters(ConnectionTestCase):
- def test_filters(self):
- def check(kind, families, types):
- for conn in this_proc_net_connections(kind=kind):
- assert conn.family in families
- assert conn.type in types
- if not SKIP_SYSCONS:
- for conn in psutil.net_connections(kind=kind):
- assert conn.family in families
- assert conn.type in types
- with create_sockets():
- check(
- 'all',
- [AF_INET, AF_INET6, AF_UNIX],
- [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET],
- )
- check('inet', [AF_INET, AF_INET6], [SOCK_STREAM, SOCK_DGRAM])
- check('inet4', [AF_INET], [SOCK_STREAM, SOCK_DGRAM])
- check('tcp', [AF_INET, AF_INET6], [SOCK_STREAM])
- check('tcp4', [AF_INET], [SOCK_STREAM])
- check('tcp6', [AF_INET6], [SOCK_STREAM])
- check('udp', [AF_INET, AF_INET6], [SOCK_DGRAM])
- check('udp4', [AF_INET], [SOCK_DGRAM])
- check('udp6', [AF_INET6], [SOCK_DGRAM])
- if HAS_NET_CONNECTIONS_UNIX:
- check(
- 'unix',
- [AF_UNIX],
- [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET],
- )
- @skip_on_access_denied(only_if=MACOS)
- def test_combos(self):
- reap_children()
- def check_conn(proc, conn, family, type, laddr, raddr, status, kinds):
- all_kinds = (
- "all",
- "inet",
- "inet4",
- "inet6",
- "tcp",
- "tcp4",
- "tcp6",
- "udp",
- "udp4",
- "udp6",
- )
- check_connection_ntuple(conn)
- assert conn.family == family
- assert conn.type == type
- assert conn.laddr == laddr
- assert conn.raddr == raddr
- assert conn.status == status
- for kind in all_kinds:
- cons = proc.net_connections(kind=kind)
- if kind in kinds:
- assert cons != []
- else:
- assert cons == []
- # compare against system-wide connections
- # XXX Solaris can't retrieve system-wide UNIX
- # sockets.
- if HAS_NET_CONNECTIONS_UNIX:
- self.compare_procsys_connections(proc.pid, [conn])
- tcp_template = textwrap.dedent("""
- import socket, time
- s = socket.socket({family}, socket.SOCK_STREAM)
- s.bind(('{addr}', 0))
- s.listen(5)
- with open('{testfn}', 'w') as f:
- f.write(str(s.getsockname()[:2]))
- [time.sleep(0.1) for x in range(100)]
- """)
- udp_template = textwrap.dedent("""
- import socket, time
- s = socket.socket({family}, socket.SOCK_DGRAM)
- s.bind(('{addr}', 0))
- with open('{testfn}', 'w') as f:
- f.write(str(s.getsockname()[:2]))
- [time.sleep(0.1) for x in range(100)]
- """)
- # must be relative on Windows
- testfile = os.path.basename(self.get_testfn(dir=os.getcwd()))
- tcp4_template = tcp_template.format(
- family=int(AF_INET), addr="127.0.0.1", testfn=testfile
- )
- udp4_template = udp_template.format(
- family=int(AF_INET), addr="127.0.0.1", testfn=testfile
- )
- tcp6_template = tcp_template.format(
- family=int(AF_INET6), addr="::1", testfn=testfile
- )
- udp6_template = udp_template.format(
- family=int(AF_INET6), addr="::1", testfn=testfile
- )
- # launch various subprocess instantiating a socket of various
- # families and types to enrich psutil results
- tcp4_proc = self.pyrun(tcp4_template)
- tcp4_addr = eval(wait_for_file(testfile, delete=True))
- udp4_proc = self.pyrun(udp4_template)
- udp4_addr = eval(wait_for_file(testfile, delete=True))
- if supports_ipv6():
- tcp6_proc = self.pyrun(tcp6_template)
- tcp6_addr = eval(wait_for_file(testfile, delete=True))
- udp6_proc = self.pyrun(udp6_template)
- udp6_addr = eval(wait_for_file(testfile, delete=True))
- else:
- tcp6_proc = None
- udp6_proc = None
- tcp6_addr = None
- udp6_addr = None
- for p in psutil.Process().children():
- cons = p.net_connections()
- assert len(cons) == 1
- for conn in cons:
- # TCP v4
- if p.pid == tcp4_proc.pid:
- check_conn(
- p,
- conn,
- AF_INET,
- SOCK_STREAM,
- tcp4_addr,
- (),
- psutil.CONN_LISTEN,
- ("all", "inet", "inet4", "tcp", "tcp4"),
- )
- # UDP v4
- elif p.pid == udp4_proc.pid:
- check_conn(
- p,
- conn,
- AF_INET,
- SOCK_DGRAM,
- udp4_addr,
- (),
- psutil.CONN_NONE,
- ("all", "inet", "inet4", "udp", "udp4"),
- )
- # TCP v6
- elif p.pid == getattr(tcp6_proc, "pid", None):
- check_conn(
- p,
- conn,
- AF_INET6,
- SOCK_STREAM,
- tcp6_addr,
- (),
- psutil.CONN_LISTEN,
- ("all", "inet", "inet6", "tcp", "tcp6"),
- )
- # UDP v6
- elif p.pid == getattr(udp6_proc, "pid", None):
- check_conn(
- p,
- conn,
- AF_INET6,
- SOCK_DGRAM,
- udp6_addr,
- (),
- psutil.CONN_NONE,
- ("all", "inet", "inet6", "udp", "udp6"),
- )
- def test_count(self):
- with create_sockets():
- # tcp
- cons = this_proc_net_connections(kind='tcp')
- assert len(cons) == (2 if supports_ipv6() else 1)
- for conn in cons:
- assert conn.family in {AF_INET, AF_INET6}
- assert conn.type == SOCK_STREAM
- # tcp4
- cons = this_proc_net_connections(kind='tcp4')
- assert len(cons) == 1
- assert cons[0].family == AF_INET
- assert cons[0].type == SOCK_STREAM
- # tcp6
- if supports_ipv6():
- cons = this_proc_net_connections(kind='tcp6')
- assert len(cons) == 1
- assert cons[0].family == AF_INET6
- assert cons[0].type == SOCK_STREAM
- # udp
- cons = this_proc_net_connections(kind='udp')
- assert len(cons) == (2 if supports_ipv6() else 1)
- for conn in cons:
- assert conn.family in {AF_INET, AF_INET6}
- assert conn.type == SOCK_DGRAM
- # udp4
- cons = this_proc_net_connections(kind='udp4')
- assert len(cons) == 1
- assert cons[0].family == AF_INET
- assert cons[0].type == SOCK_DGRAM
- # udp6
- if supports_ipv6():
- cons = this_proc_net_connections(kind='udp6')
- assert len(cons) == 1
- assert cons[0].family == AF_INET6
- assert cons[0].type == SOCK_DGRAM
- # inet
- cons = this_proc_net_connections(kind='inet')
- assert len(cons) == (4 if supports_ipv6() else 2)
- for conn in cons:
- assert conn.family in {AF_INET, AF_INET6}
- assert conn.type in {SOCK_STREAM, SOCK_DGRAM}
- # inet6
- if supports_ipv6():
- cons = this_proc_net_connections(kind='inet6')
- assert len(cons) == 2
- for conn in cons:
- assert conn.family == AF_INET6
- assert conn.type in {SOCK_STREAM, SOCK_DGRAM}
- # Skipped on BSD becayse by default the Python process
- # creates a UNIX socket to '/var/run/log'.
- if HAS_NET_CONNECTIONS_UNIX and not (FREEBSD or NETBSD):
- cons = this_proc_net_connections(kind='unix')
- assert len(cons) == 3
- for conn in cons:
- assert conn.family == AF_UNIX
- assert conn.type in {SOCK_STREAM, SOCK_DGRAM}
- @pytest.mark.skipif(SKIP_SYSCONS, reason="requires root")
- class TestSystemWideConnections(ConnectionTestCase):
- """Tests for net_connections()."""
- def test_it(self):
- def check(cons, families, types_):
- for conn in cons:
- assert conn.family in families
- if conn.family != AF_UNIX:
- assert conn.type in types_
- check_connection_ntuple(conn)
- with create_sockets():
- from psutil._common import conn_tmap
- for kind, groups in conn_tmap.items():
- # XXX: SunOS does not retrieve UNIX sockets.
- if kind == 'unix' and not HAS_NET_CONNECTIONS_UNIX:
- continue
- families, types_ = groups
- cons = psutil.net_connections(kind)
- assert len(cons) == len(set(cons))
- check(cons, families, types_)
- @retry_on_failure()
- def test_multi_sockets_procs(self):
- # Creates multiple sub processes, each creating different
- # sockets. For each process check that proc.net_connections()
- # and psutil.net_connections() return the same results.
- # This is done mainly to check whether net_connections()'s
- # pid is properly set, see:
- # https://github.com/giampaolo/psutil/issues/1013
- with create_sockets() as socks:
- expected = len(socks)
- pids = []
- times = 10
- fnames = []
- for _ in range(times):
- fname = self.get_testfn()
- fnames.append(fname)
- src = textwrap.dedent(f"""\
- import time, os
- from psutil.tests import create_sockets
- with create_sockets():
- with open(r'{fname}', 'w') as f:
- f.write("hello")
- [time.sleep(0.1) for x in range(100)]
- """)
- sproc = self.pyrun(src)
- pids.append(sproc.pid)
- # sync
- for fname in fnames:
- wait_for_file(fname)
- syscons = [
- x for x in psutil.net_connections(kind='all') if x.pid in pids
- ]
- for pid in pids:
- assert len([x for x in syscons if x.pid == pid]) == expected
- p = psutil.Process(pid)
- assert len(p.net_connections('all')) == expected
- class TestMisc(PsutilTestCase):
- def test_net_connection_constants(self):
- ints = []
- strs = []
- for name in dir(psutil):
- if name.startswith('CONN_'):
- num = getattr(psutil, name)
- str_ = str(num)
- assert str_.isupper(), str_
- assert str not in strs
- assert num not in ints
- ints.append(num)
- strs.append(str_)
- if SUNOS:
- psutil.CONN_IDLE # noqa: B018
- psutil.CONN_BOUND # noqa: B018
- if WINDOWS:
- psutil.CONN_DELETE_TCB # noqa: B018
|