def testTypes(self):
families = [socket.AF_INET, socket.AF_INET6]
types = [socket.SOCK_STREAM, socket.SOCK_DGRAM]
for f in families:
for t in types:
try:
source = socket.socket(f, t)
except OSError:
continue # This combination is not supported
try:
data = source.share(os.getpid())
shared = socket.fromshare(data)
try:
self.compareSockets(source, shared)
finally:
shared.close()
finally:
source.close()
python类fromshare()的实例源码
def runpool():
parsecli()
try:
httpd = ThreadedTCPServer((LISTEN, PORT), Proxy)
except OSError as e:
print(e)
return
mainsock = httpd.socket
if hasattr(socket, "fromshare"):
workers = MAX_WORKERS
for i in range(workers-1):
(pipeout, pipein) = multiprocessing.Pipe()
p = multiprocessing.Process(target=start_worker, args=(pipeout,))
p.daemon = True
p.start()
while p.pid == None:
time.sleep(1)
pipein.send(mainsock.share(p.pid))
serve_forever(httpd)
def testTypes(self):
families = [socket.AF_INET, socket.AF_INET6]
types = [socket.SOCK_STREAM, socket.SOCK_DGRAM]
for f in families:
for t in types:
try:
source = socket.socket(f, t)
except OSError:
continue # This combination is not supported
try:
data = source.share(os.getpid())
shared = socket.fromshare(data)
try:
self.compareSockets(source, shared)
finally:
shared.close()
finally:
source.close()
def testTypes(self):
families = [socket.AF_INET, socket.AF_INET6]
types = [socket.SOCK_STREAM, socket.SOCK_DGRAM]
for f in families:
for t in types:
try:
source = socket.socket(f, t)
except OSError:
continue # This combination is not supported
try:
data = source.share(os.getpid())
shared = socket.fromshare(data)
try:
self.compareSockets(source, shared)
finally:
shared.close()
finally:
source.close()
def fromshare(*args, **kwargs):
return from_stdlib_socket(_stdlib_socket.fromshare(*args, **kwargs))
def remoteProcessServer(cls, q):
# Recreate socket from shared data
sdata = q.get()
message = q.get()
s = socket.fromshare(sdata)
s2, c = s.accept()
# Send the message
s2.sendall(message)
s2.close()
s.close()
def testShareLength(self):
data = self.serv.share(os.getpid())
self.assertRaises(ValueError, socket.fromshare, data[:-1])
self.assertRaises(ValueError, socket.fromshare, data+b"foo")
def testShareLocal(self):
data = self.serv.share(os.getpid())
s = socket.fromshare(data)
try:
self.compareSockets(self.serv, s)
finally:
s.close()
def start_worker(pipeout):
parsecli()
httpd = ThreadedTCPServer((LISTEN, PORT), Proxy, bind_and_activate=False)
mainsock = socket.fromshare(pipeout.recv())
httpd.socket = mainsock
serve_forever(httpd)
def detach(self):
'''Get the socket. This should only be called once.'''
with _resource_sharer.get_connection(self._id) as conn:
share = conn.recv_bytes()
return socket.fromshare(share)
def remoteProcessServer(cls, q):
# Recreate socket from shared data
sdata = q.get()
message = q.get()
s = socket.fromshare(sdata)
s2, c = s.accept()
# Send the message
s2.sendall(message)
s2.close()
s.close()
def testShareLength(self):
data = self.serv.share(os.getpid())
self.assertRaises(ValueError, socket.fromshare, data[:-1])
self.assertRaises(ValueError, socket.fromshare, data+b"foo")
def testShareLocal(self):
data = self.serv.share(os.getpid())
s = socket.fromshare(data)
try:
self.compareSockets(self.serv, s)
finally:
s.close()
def detach(self):
'''Get the socket. This should only be called once.'''
with _resource_sharer.get_connection(self._id) as conn:
share = conn.recv_bytes()
return socket.fromshare(share)
def remoteProcessServer(cls, q):
# Recreate socket from shared data
sdata = q.get()
message = q.get()
s = socket.fromshare(sdata)
s2, c = s.accept()
# Send the message
s2.sendall(message)
s2.close()
s.close()
def testShareLength(self):
data = self.serv.share(os.getpid())
self.assertRaises(ValueError, socket.fromshare, data[:-1])
self.assertRaises(ValueError, socket.fromshare, data+b"foo")
def testShareLocal(self):
data = self.serv.share(os.getpid())
s = socket.fromshare(data)
try:
self.compareSockets(self.serv, s)
finally:
s.close()