def get_pid_by_connection(src_addr, src_p, dst_addr, dst_p, proto='tcp'):
# We always take the first element as we assume it contains only one
# It should not be possible to keep two connections which are the same.
for conn in psutil.net_connections(kind=proto):
if proto == 'tcp':
if conn.laddr != (src_addr, int(src_p)):
continue
if conn.raddr != (dst_addr, int(dst_p)):
continue
# UDP gives us a very limited dataset to work with
elif proto == 'udp':
if conn.laddr[1] != int(src_p):
continue
return conn.pid
logging.warning("Could not find process for %s connection %s:%s -> %s:%s",
proto,
src_addr,
src_p,
dst_addr,
dst_p)
return None
python类net_connections()的实例源码
def already_listening(port, renewer=False):
"""Check if a process is already listening on the port.
If so, also tell the user via a display notification.
.. warning::
On some operating systems, this function can only usefully be
run as root.
:param int port: The TCP port in question.
:returns: True or False.
"""
try:
net_connections = psutil.net_connections()
except psutil.AccessDenied as error:
logger.info("Access denied when trying to list network "
"connections: %s. Are you root?", error)
# this function is just a pre-check that often causes false
# positives and problems in testing (c.f. #680 on Mac, #255
# generally); we will fail later in bind() anyway
return False
listeners = [conn.pid for conn in net_connections
if conn.status == 'LISTEN' and
conn.type == socket.SOCK_STREAM and
conn.laddr[1] == port]
try:
if listeners and listeners[0] is not None:
# conn.pid may be None if the current process doesn't have
# permission to identify the listening process! Additionally,
# listeners may have more than one element if separate
# sockets have bound the same port on separate interfaces.
# We currently only have UI to notify the user about one
# of them at a time.
pid = listeners[0]
name = psutil.Process(pid).name()
display = zope.component.getUtility(interfaces.IDisplay)
extra = ""
if renewer:
extra = (
" For automated renewal, you may want to use a script that stops"
" and starts your webserver. You can find an example at"
" https://letsencrypt.org/howitworks/#writing-your-own-renewal-script"
". Alternatively you can use the webroot plugin to renew without"
" needing to stop and start your webserver.")
display.notification(
"The program {0} (process ID {1}) is already listening "
"on TCP port {2}. This will prevent us from binding to "
"that port. Please stop the {0} program temporarily "
"and then try again.{3}".format(name, pid, port, extra),
height=13)
return True
except (psutil.NoSuchProcess, psutil.AccessDenied):
# Perhaps the result of a race where the process could have
# exited or relinquished the port (NoSuchProcess), or the result
# of an OS policy where we're not allowed to look up the process
# name (AccessDenied).
pass
return False
def _getDebugData():
try:
connections = psutil.net_connections()
# You need to be root for OSX
except psutil.AccessDenied:
connections = None
try:
addrs = ["* {}: {}".format(key, val) for key, val in psutil.net_if_addrs().items()]
except UnicodeDecodeError:
addrs = ["INVALID ADDR WITH UNICODE CHARACTERS"]
data = """Version: {version}
OS: {os}
Python: {python}
CPU: {cpu}
Memory: {memory}
Networks:
{addrs}
Open connections:
{connections}
Processus:
""".format(
version=__version__,
os=platform.platform(),
python=platform.python_version(),
memory=psutil.virtual_memory(),
cpu=psutil.cpu_times(),
connections=connections,
addrs="\n".join(addrs)
)
for proc in psutil.process_iter():
try:
psinfo = proc.as_dict(attrs=["name", "exe"])
data += "* {} {}\n".format(psinfo["name"], psinfo["exe"])
except psutil.NoSuchProcess:
pass
data += "\n\nProjects"
for project in Controller.instance().projects.values():
data += "\n\nProject name: {}\nProject ID: {}\n".format(project.name, project.id)
for link in project.links.values():
data += "Link {}: {}".format(link.id, link.debug_link_data)
return data
def get_connections(self):
listening = []
connections = psutil.net_connections()
for conn in connections:
if not conn.pid:
continue
if conn.status == psutil.CONN_LISTEN:
ip, port = conn.laddr
# instance = '%s:%s' % (ip, port)
if port not in listening:
listening.append(port)
res = []
clients = {}
for conn in connections:
if not conn.pid:
continue
if not conn.status == psutil.CONN_ESTABLISHED:
continue
ip, port = conn.laddr
# instance = '%s:%s' % (ip, port)
if port in listening:
continue
try:
proc = psutil.Process(conn.pid)
except psutil.NoSuchProcess:
continue
current = time.time()
create_time = proc.create_time()
if current - create_time < 60 * 5:
continue
rip, rport = conn.raddr
record_id = '%s-%s:%s' % (conn.pid, rip, rport)
client = clients.setdefault(record_id, {})
if not client:
client['client_pid'] = conn.pid
client['client_process_create_time'] = int(create_time)
client['client_pname'] = proc.name()
client['client_cwd'] = proc.cwd()
client['client_ip'] = ip
# client['client_ports'] = '%s' % port
client['client_port_num'] = 1
client['server_ip'] = rip
client['server_port'] = rport
client['server_pid'] = ''
client['server_pname'] = ''
else:
# client['client_ports'] += ', %s' % port
client['client_port_num'] += 1
res = clients.values()
res.sort(key=lambda x: (x['client_pid'], x['server_port']))
return res