def test_pidFile(self):
"""
A lockfile is created and locked when L{IReactorUNIX.listenUNIX} is
called and released when the Deferred returned by the L{IListeningPort}
provider's C{stopListening} method is called back.
"""
filename = self.mktemp()
serverFactory = MyServerFactory()
serverConnMade = defer.Deferred()
serverFactory.protocolConnectionMade = serverConnMade
unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
self.assertTrue(lockfile.isLocked(filename + ".lock"))
# XXX This part would test something about the checkPID parameter, but
# it doesn't actually. It should be rewritten to test the several
# different possible behaviors. -exarkun
clientFactory = MyClientFactory()
clientConnMade = defer.Deferred()
clientFactory.protocolConnectionMade = clientConnMade
reactor.connectUNIX(filename, clientFactory, checkPID=1)
d = defer.gatherResults([serverConnMade, clientConnMade])
def _portStuff(args):
serverProtocol, clientProto = args
# Incidental assertion which may or may not be redundant with some
# other test. This probably deserves its own test method.
self.assertEqual(clientFactory.peerAddresses,
[address.UNIXAddress(filename)])
clientProto.transport.loseConnection()
serverProtocol.transport.loseConnection()
return unixPort.stopListening()
d.addCallback(_portStuff)
def _check(ignored):
self.assertFalse(lockfile.isLocked(filename + ".lock"), 'locked')
d.addCallback(_check)
return d
python类listenUNIX()的实例源码
def _uncleanSocketTest(self, callback):
self.filename = self.mktemp()
source = networkString((
"from twisted.internet import protocol, reactor\n"
"reactor.listenUNIX(%r, protocol.ServerFactory(),"
"wantPID=True)\n") % (self.filename,))
env = {b'PYTHONPATH': FilePath(
os.pathsep.join(sys.path)).asBytesMode().path}
pyExe = FilePath(sys.executable).asBytesMode().path
d = utils.getProcessValue(pyExe, (b"-u", b"-c", source), env=env)
d.addCallback(callback)
return d
def autostart(reason, **kwargs):
if reason == 0:
from twisted.internet import reactor
try:
os.remove("/tmp/hotplug.socket")
except OSError:
pass
factory = Factory()
factory.protocol = Hotplug
reactor.listenUNIX("/tmp/hotplug.socket", factory)
def run(self):
"""
Start the server and listen on host:port
"""
f = None
unix_prefix = 'unix://'
if self.http_enabled:
rpc = JsonRpcHttpResource()
rpc.rpcprocessor = self.rpcprocessor
rpc.tls_client_auth_enabled = self.tls_client_auth_enabled
if self.http_basic_auth_enabled:
checker = PasswordChecker(self.passwdCheckFunction)
realm = HttpPasswordRealm(rpc)
p = portal.Portal(realm, [checker])
realm_name = 'Reflect RPC'
if sys.version_info.major == 2:
realm_name = realm_name.encode('utf-8')
credentialFactory = BasicCredentialFactory(realm_name)
rpc = HTTPAuthSessionWrapper(p, [credentialFactory])
root = RootResource(rpc)
f = server.Site(root)
else:
f = JsonRpcProtocolFactory(self.rpcprocessor,
self.tls_client_auth_enabled)
if self.tls_enabled:
if not self.tls_client_auth_enabled:
reactor.listenSSL(self.port, f, self.cert.options(),
interface=self.host)
else:
reactor.listenSSL(self.port, f,
self.cert.options(self.client_auth_ca),
interface=self.host)
else:
if self.host.startswith(unix_prefix):
path = self.host[len(unix_prefix):]
reactor.listenUNIX(path, f, backlog=self.unix_socket_backlog,
mode=self.unix_socket_mode, wantPID=self.unix_socket_want_pid)
else:
reactor.listenTCP(self.port, f, interface=self.host)
if self.host.startswith(unix_prefix):
print("Listening on %s" % (self.host))
else:
print("Listening on %s:%d" % (self.host, self.port))
reactor.run()