def recvfds(sock, size):
'''Receive an array of fds over an AF_UNIX socket.'''
a = array.array('i')
bytes_size = a.itemsize * size
msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
if not msg and not ancdata:
raise EOFError
try:
if ACKNOWLEDGE:
sock.send(b'A')
if len(ancdata) != 1:
raise RuntimeError('received %d items of ancdata' %
len(ancdata))
cmsg_level, cmsg_type, cmsg_data = ancdata[0]
if (cmsg_level == socket.SOL_SOCKET and
cmsg_type == socket.SCM_RIGHTS):
if len(cmsg_data) % a.itemsize != 0:
raise ValueError
a.frombytes(cmsg_data)
assert len(a) % 256 == msg[0]
return list(a)
except (ValueError, IndexError):
pass
raise RuntimeError('Invalid data received')
python类SCM_RIGHTS的实例源码
def DupFd(fd):
'''Return a wrapper for an fd.'''
popen_obj = context.get_spawning_popen()
if popen_obj is not None:
return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
elif HAVE_SEND_HANDLE:
from . import resource_sharer
return resource_sharer.DupFd(fd)
else:
raise ValueError('SCM_RIGHTS appears not to be available')
#
# Try making some callable types picklable
#
def closeRecvmsgFDs(self, recvmsg_result):
# Close all file descriptors specified in the ancillary data
# of the given return value from recvmsg() or recvmsg_into().
for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]:
if (cmsg_level == socket.SOL_SOCKET and
cmsg_type == socket.SCM_RIGHTS):
fds = array.array("i")
fds.frombytes(cmsg_data[:
len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
for fd in fds:
os.close(fd)
def createAndSendFDs(self, n):
# Send n new file descriptors created by newFDs() to the
# server, with the constant MSG as the non-ancillary data.
self.assertEqual(
self.sendmsgToServer([MSG],
[(socket.SOL_SOCKET,
socket.SCM_RIGHTS,
array.array("i", self.newFDs(n)))]),
len(MSG))
def _testFDPassSimple(self):
self.assertEqual(
self.sendmsgToServer(
[MSG],
[(socket.SOL_SOCKET,
socket.SCM_RIGHTS,
array.array("i", self.newFDs(1)).tobytes())]),
len(MSG))
def _testFDPassSeparate(self):
fd0, fd1 = self.newFDs(2)
self.assertEqual(
self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
socket.SCM_RIGHTS,
array.array("i", [fd0])),
(socket.SOL_SOCKET,
socket.SCM_RIGHTS,
array.array("i", [fd1]))]),
len(MSG))
def _testFDPassSeparateMinSpace(self):
fd0, fd1 = self.newFDs(2)
self.assertEqual(
self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
socket.SCM_RIGHTS,
array.array("i", [fd0])),
(socket.SOL_SOCKET,
socket.SCM_RIGHTS,
array.array("i", [fd1]))]),
len(MSG))
def _testFDPassEmpty(self):
self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET,
socket.SCM_RIGHTS,
b"")])
def testFDPassPartialInt(self):
# Try to pass a truncated FD array.
msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
len(MSG), 10240)
self.assertEqual(msg, MSG)
self.checkRecvmsgAddress(addr, self.cli_addr)
self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
self.assertLessEqual(len(ancdata), 1)
for cmsg_level, cmsg_type, cmsg_data in ancdata:
self.assertEqual(cmsg_level, socket.SOL_SOCKET)
self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
self.assertLess(len(cmsg_data), SIZEOF_INT)
def _testFDPassPartialIntInMiddle(self):
fd0, fd1 = self.newFDs(2)
self.sendAncillaryIfPossible(
MSG,
[(socket.SOL_SOCKET,
socket.SCM_RIGHTS,
array.array("i", [fd0, self.badfd]).tobytes()[:-1]),
(socket.SOL_SOCKET,
socket.SCM_RIGHTS,
array.array("i", [fd1]))])
def run_frontend(self, tcp_address, backlog=10, timeout=0.1):
log.info('Running frontend loop')
address, port = tcp_address.split(':')
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.bind((address, int(port)))
sock.listen(backlog)
sock.settimeout(timeout)
while not self.stop_event.is_set():
try:
conn, address = await self.loop.sock_accept(sock)
except socket.timeout:
await asyncio.sleep(timeout)
continue
service = await self.loop.sock_recv(conn, BUFFER_SIZE)
if not service:
break
if service not in self.services:
self.loop.sock_sendall(conn, b'-Service not found\r\n')
continue
# detach and pack FD into a array
fd = conn.detach()
fds = array.array("I", [fd])
try:
# Send FD to server connection
server_conn = self.services[service]
server_conn.sendmsg([b'1'], [(socket.SOL_SOCKET,
socket.SCM_RIGHTS, fds)])
except BrokenPipeError: # NOQA
# If connections is broken, the server is gone
# so we need to remove it from services
del self.services[service]
conn = socket.fromfd(fd, socket.AF_INET,
socket.SOCK_STREAM)
conn.sendall(b'-Service not found\r\n')
conn.close()