def __init__(self, core):
self._address = os.environ.get('NOTIFY_SOCKET', None)
if self._address[0] == "@":
self._address = '\0' + self._address[1:]
self._socket = socket.socket(socket.AF_UNIX,
socket.SOCK_DGRAM | socket.SOCK_CLOEXEC)
self._watchdog_task = core.scheduler(self._watchdog, 5000,
single=False)
self._watchdog_task.enable(instant=True)
self._start_task = core.scheduler(self._start, 0, single=True)
self._core = core
python类SOCK_CLOEXEC的实例源码
def test_SOCK_CLOEXEC(self):
v = linux_version()
if v < (2, 6, 28):
self.skipTest("Linux kernel 2.6.28 or higher required, not %s"
% ".".join(map(str, v)))
with socket.socket(socket.AF_INET,
socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s:
self.assertTrue(s.type & socket.SOCK_CLOEXEC)
self.assertTrue(fcntl.fcntl(s, fcntl.F_GETFD) & fcntl.FD_CLOEXEC)
def test_SOCK_CLOEXEC(self):
with socket.socket(socket.AF_INET,
socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s:
self.assertTrue(s.type & socket.SOCK_CLOEXEC)
self.assertTrue(fcntl.fcntl(s, fcntl.F_GETFD) & fcntl.FD_CLOEXEC)
def notify_socket(clean_environment=True):
"""Return a tuple of address, socket for future use.
clean_environment removes the variables from env to prevent children
from inheriting it and doing something wrong.
"""
_empty = None, None
address = os.environ.get("NOTIFY_SOCKET", None)
if clean_environment:
address = os.environ.pop("NOTIFY_SOCKET", None)
if not address:
return _empty
if len(address) == 1:
return _empty
if address[0] not in ("@", "/"):
return _empty
if address[0] == "@":
address = "\0" + address[1:]
# SOCK_CLOEXEC was added in Python 3.2 and requires Linux >= 2.6.27.
# It means "close this socket after fork/exec()
try:
sock = socket.socket(socket.AF_UNIX,
socket.SOCK_DGRAM | socket.SOCK_CLOEXEC)
except AttributeError:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
return address, sock
def testNewAttributes(self):
# testing .family, .type and .protocol
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.assertEqual(sock.family, socket.AF_INET)
if hasattr(socket, 'SOCK_CLOEXEC'):
self.assertIn(sock.type,
(socket.SOCK_STREAM | socket.SOCK_CLOEXEC,
socket.SOCK_STREAM))
else:
self.assertEqual(sock.type, socket.SOCK_STREAM)
self.assertEqual(sock.proto, 0)
sock.close()
def test_SOCK_CLOEXEC(self):
with socket.socket(socket.AF_INET,
socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s:
self.assertTrue(s.type & socket.SOCK_CLOEXEC)
self.assertFalse(s.get_inheritable())
def test_create_socket(self):
s = asyncore.dispatcher()
s.create_socket(self.family)
self.assertEqual(s.socket.family, self.family)
SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0)
sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK
if hasattr(socket, 'SOCK_CLOEXEC'):
self.assertIn(s.socket.type,
(sock_type | socket.SOCK_CLOEXEC, sock_type))
else:
self.assertEqual(s.socket.type, sock_type)
def test_check_resolved_sock_type(self):
# Ensure we ignore extra flags in sock.type.
if hasattr(socket, 'SOCK_NONBLOCK'):
sock = socket.socket(type=socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
with sock:
base_events._check_resolved_address(sock, ('1.2.3.4', 1))
if hasattr(socket, 'SOCK_CLOEXEC'):
sock = socket.socket(type=socket.SOCK_STREAM | socket.SOCK_CLOEXEC)
with sock:
base_events._check_resolved_address(sock, ('1.2.3.4', 1))
def _check_resolved_address(sock, address):
# Ensure that the address is already resolved to avoid the trap of hanging
# the entire event loop when the address requires doing a DNS lookup.
family = sock.family
if family == socket.AF_INET:
host, port = address
elif family == socket.AF_INET6:
host, port = address[:2]
else:
return
type_mask = 0
if hasattr(socket, 'SOCK_NONBLOCK'):
type_mask |= socket.SOCK_NONBLOCK
if hasattr(socket, 'SOCK_CLOEXEC'):
type_mask |= socket.SOCK_CLOEXEC
# Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
# already resolved.
try:
socket.getaddrinfo(host, port,
family=family,
type=(sock.type & ~type_mask),
proto=sock.proto,
flags=socket.AI_NUMERICHOST)
except socket.gaierror as err:
raise ValueError("address must be resolved (IP address), got %r: %s"
% (address, err))
def testNewAttributes(self):
# testing .family, .type and .protocol
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.assertEqual(sock.family, socket.AF_INET)
if hasattr(socket, 'SOCK_CLOEXEC'):
self.assertIn(sock.type,
(socket.SOCK_STREAM | socket.SOCK_CLOEXEC,
socket.SOCK_STREAM))
else:
self.assertEqual(sock.type, socket.SOCK_STREAM)
self.assertEqual(sock.proto, 0)
sock.close()
def test_SOCK_CLOEXEC(self):
with socket.socket(socket.AF_INET,
socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s:
self.assertTrue(s.type & socket.SOCK_CLOEXEC)
self.assertFalse(s.get_inheritable())
def test_create_socket(self):
s = asyncore.dispatcher()
s.create_socket(self.family)
self.assertEqual(s.socket.family, self.family)
SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0)
sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK
if hasattr(socket, 'SOCK_CLOEXEC'):
self.assertIn(s.socket.type,
(sock_type | socket.SOCK_CLOEXEC, sock_type))
else:
self.assertEqual(s.socket.type, sock_type)
def formatSocketType(argument):
value = argument.value
text = []
if hasattr(socket, 'SOCK_CLOEXEC'):
cloexec = value & socket.SOCK_CLOEXEC
value &= ~socket.SOCK_CLOEXEC
else:
cloexec = False
text = SOCKET_TYPE.get(value, str(value))
if cloexec:
text += '|SOCK_CLOEXEC'
return text