def setUp(self):
from twisted.internet import reactor
self.serverFactory = protocol.ServerFactory()
self.serverFactory.protocol = self.serverProto
self.clientFactory = protocol.ClientFactory()
self.clientFactory.protocol = self.clientProto
self.clientFactory.onMade = defer.Deferred()
self.serverFactory.onMade = defer.Deferred()
self.serverPort = reactor.listenTCP(0, self.serverFactory)
self.clientConn = reactor.connectTCP(
'127.0.0.1', self.serverPort.getHost().port,
self.clientFactory)
def getProtos(rlst):
self.cli = self.clientFactory.theProto
self.svr = self.serverFactory.theProto
dl = defer.DeferredList([self.clientFactory.onMade,
self.serverFactory.onMade])
return dl.addCallback(getProtos)
python类ServerFactory()的实例源码
def testPrivileged(self):
factory = protocol.ServerFactory()
factory.protocol = TestEcho
TestEcho.d = defer.Deferred()
t = internet.TCPServer(0, factory)
t.privileged = 1
t.privilegedStartService()
num = t._port.getHost().port
factory = protocol.ClientFactory()
factory.d = defer.Deferred()
factory.protocol = Foo
factory.line = None
c = internet.TCPClient('127.0.0.1', num, factory)
c.startService()
factory.d.addCallback(self.assertEqual, 'lalala')
factory.d.addCallback(lambda x : c.stopService())
factory.d.addCallback(lambda x : t.stopService())
factory.d.addCallback(lambda x : TestEcho.d)
return factory.d
def testUNIX(self):
# FIXME: This test is far too dense. It needs comments.
# -- spiv, 2004-11-07
if not interfaces.IReactorUNIX(reactor, None):
raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
s = service.MultiService()
s.startService()
factory = protocol.ServerFactory()
factory.protocol = TestEcho
TestEcho.d = defer.Deferred()
t = internet.UNIXServer('echo.skt', factory)
t.setServiceParent(s)
factory = protocol.ClientFactory()
factory.protocol = Foo
factory.d = defer.Deferred()
factory.line = None
internet.UNIXClient('echo.skt', factory).setServiceParent(s)
factory.d.addCallback(self.assertEqual, 'lalala')
factory.d.addCallback(lambda x : s.stopService())
factory.d.addCallback(lambda x : TestEcho.d)
factory.d.addCallback(self._cbTestUnix, factory, s)
return factory.d
def testVolatile(self):
if not interfaces.IReactorUNIX(reactor, None):
raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
factory = protocol.ServerFactory()
factory.protocol = wire.Echo
t = internet.UNIXServer('echo.skt', factory)
t.startService()
self.failIfIdentical(t._port, None)
t1 = copy.copy(t)
self.assertIdentical(t1._port, None)
t.stopService()
self.assertIdentical(t._port, None)
self.failIf(t.running)
factory = protocol.ClientFactory()
factory.protocol = wire.Echo
t = internet.UNIXClient('echo.skt', factory)
t.startService()
self.failIfIdentical(t._connection, None)
t1 = copy.copy(t)
self.assertIdentical(t1._connection, None)
t.stopService()
self.assertIdentical(t._connection, None)
self.failIf(t.running)
def testOpenSSLBuffering(self):
serverProto = self.serverProto = SingleLineServerProtocol()
clientProto = self.clientProto = RecordingClientProtocol()
server = protocol.ServerFactory()
client = self.client = protocol.ClientFactory()
server.protocol = lambda: serverProto
client.protocol = lambda: clientProto
client.buffer = []
sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath)
cCTX = ssl.ClientContextFactory()
port = self.port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1')
reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX)
i = 0
while i < 5000 and not client.buffer:
i += 1
reactor.iterate()
self.assertEquals(client.buffer, ["+OK <some crap>\r\n"])
def testImmediateDisconnect(self):
org = "twisted.test.test_ssl"
self.setupServerAndClient(
(org, org + ", client"), {},
(org, org + ", server"), {})
# Set up a server, connect to it with a client, which should work since our verifiers
# allow anything, then disconnect.
serverProtocolFactory = protocol.ServerFactory()
serverProtocolFactory.protocol = protocol.Protocol
self.serverPort = serverPort = reactor.listenSSL(0,
serverProtocolFactory, self.serverCtxFactory)
clientProtocolFactory = protocol.ClientFactory()
clientProtocolFactory.protocol = ImmediatelyDisconnectingProtocol
clientProtocolFactory.connectionDisconnected = defer.Deferred()
clientConnector = reactor.connectSSL('127.0.0.1',
serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)
return clientProtocolFactory.connectionDisconnected.addCallback(
lambda ignoredResult: self.serverPort.stopListening())
def loopback(self, serverCertOpts, clientCertOpts,
onServerLost=None, onClientLost=None, onData=None):
if onServerLost is None:
self.onServerLost = onServerLost = defer.Deferred()
if onClientLost is None:
self.onClientLost = onClientLost = defer.Deferred()
if onData is None:
onData = defer.Deferred()
serverFactory = protocol.ServerFactory()
serverFactory.protocol = DataCallbackProtocol
serverFactory.onLost = onServerLost
serverFactory.onData = onData
clientFactory = protocol.ClientFactory()
clientFactory.protocol = WritingProtocol
clientFactory.onLost = onClientLost
self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts)
self.clientConn = reactor.connectSSL('127.0.0.1', self.serverPort.getHost().port,
clientFactory, clientCertOpts)
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), advertise_ip=True, external_ip=None):
self.best_share_hash_func = best_share_hash_func
self.port = port
self.net = net
self.addr_store = dict(addr_store)
self.connect_addrs = connect_addrs
self.preferred_storage = preferred_storage
self.known_txs_var = known_txs_var
self.mining_txs_var = mining_txs_var
self.advertise_ip = advertise_ip
self.external_ip = external_ip
self.traffic_happened = variable.Event()
self.nonce = random.randrange(2**64)
self.peers = {}
self.bans = {} # address -> end_time
self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
self.serverfactory = ServerFactory(self, max_incoming_conns)
self.running = False
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), advertise_ip=True):
self.best_share_hash_func = best_share_hash_func
self.port = port
self.net = net
self.addr_store = dict(addr_store)
self.connect_addrs = connect_addrs
self.preferred_storage = preferred_storage
self.known_txs_var = known_txs_var
self.mining_txs_var = mining_txs_var
self.advertise_ip = advertise_ip
self.traffic_happened = variable.Event()
self.nonce = random.randrange(2**64)
self.peers = {}
self.bans = {} # address -> end_time
self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
self.serverfactory = ServerFactory(self, max_incoming_conns)
self.running = False
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.VariableDict({}), mining_txs_var=variable.VariableDict({}), advertise_ip=True, external_ip=None):
self.best_share_hash_func = best_share_hash_func
self.port = port
self.net = net
self.addr_store = dict(addr_store)
self.connect_addrs = connect_addrs
self.preferred_storage = preferred_storage
self.known_txs_var = known_txs_var
self.mining_txs_var = mining_txs_var
self.advertise_ip = advertise_ip
self.external_ip = external_ip
self.traffic_happened = variable.Event()
self.nonce = random.randrange(2**64)
self.peers = {}
self.bans = {} # address -> end_time
self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
self.serverfactory = ServerFactory(self, max_incoming_conns)
self.running = False
def main(): # ???????
factory = protocol.ServerFactory() # ???ServerFactory?,ServerFactory???factory
factory.protocol = EchoServer # ??factory??protocol??,?EchoServer??????protocol
reactor.listenTCP(8000, factory, interface="127.0.0.1")
# print(type(reactor)) # ??type???reactor???
# twisted.internet.selectreactor.SelectReactor
# ??????SelectReactor???twisted.internet.posixbase.PosixReactorBase????
# listenTCP??(port, factory, backlog=50, interface=''),backlog????listen???50
# listenTCP???twisted.internet.tcp.Port?
# PosixReactorBase??????twisted.internet.base._SignalReactorMixin,?????????run??
reactor.run()
# run?????????startRunning??,startRunning???ReactorBase??startRunning??
# run?????????mainLoop??
# mainLoop?????????SelectReactor.doIteration(t)??,???????????select.select????
# ???????,??self._doReadOrWrite??,??????????twisted.internet.tcp.Connection?doRead??,?????
# ??self._dataReceived(data),??????self.protocol.dataReceived(data),??self.protocol????
# ?????protocol.ServerFactory().protocol,????dataReceived(data),????????????,?????listenTCP???factory
# ??factory.protocol.dataReceived(data) ????EchoServer().dataReceived(data)??
def setUp(self):
from twisted.internet import reactor
self.serverFactory = protocol.ServerFactory()
self.serverFactory.protocol = self.serverProto
self.clientFactory = protocol.ClientFactory()
self.clientFactory.protocol = self.clientProto
self.clientFactory.onMade = defer.Deferred()
self.serverFactory.onMade = defer.Deferred()
self.serverPort = reactor.listenTCP(0, self.serverFactory)
self.clientConn = reactor.connectTCP(
'127.0.0.1', self.serverPort.getHost().port,
self.clientFactory)
def getProtos(rlst):
self.cli = self.clientFactory.theProto
self.svr = self.serverFactory.theProto
dl = defer.DeferredList([self.clientFactory.onMade,
self.serverFactory.onMade])
return dl.addCallback(getProtos)
def testPrivileged(self):
factory = protocol.ServerFactory()
factory.protocol = TestEcho
TestEcho.d = defer.Deferred()
t = internet.TCPServer(0, factory)
t.privileged = 1
t.privilegedStartService()
num = t._port.getHost().port
factory = protocol.ClientFactory()
factory.d = defer.Deferred()
factory.protocol = Foo
factory.line = None
c = internet.TCPClient('127.0.0.1', num, factory)
c.startService()
factory.d.addCallback(self.assertEqual, 'lalala')
factory.d.addCallback(lambda x : c.stopService())
factory.d.addCallback(lambda x : t.stopService())
factory.d.addCallback(lambda x : TestEcho.d)
return factory.d
def testUNIX(self):
# FIXME: This test is far too dense. It needs comments.
# -- spiv, 2004-11-07
if not interfaces.IReactorUNIX(reactor, None):
raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
s = service.MultiService()
s.startService()
factory = protocol.ServerFactory()
factory.protocol = TestEcho
TestEcho.d = defer.Deferred()
t = internet.UNIXServer('echo.skt', factory)
t.setServiceParent(s)
factory = protocol.ClientFactory()
factory.protocol = Foo
factory.d = defer.Deferred()
factory.line = None
internet.UNIXClient('echo.skt', factory).setServiceParent(s)
factory.d.addCallback(self.assertEqual, 'lalala')
factory.d.addCallback(lambda x : s.stopService())
factory.d.addCallback(lambda x : TestEcho.d)
factory.d.addCallback(self._cbTestUnix, factory, s)
return factory.d
def testVolatile(self):
if not interfaces.IReactorUNIX(reactor, None):
raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
factory = protocol.ServerFactory()
factory.protocol = wire.Echo
t = internet.UNIXServer('echo.skt', factory)
t.startService()
self.failIfIdentical(t._port, None)
t1 = copy.copy(t)
self.assertIdentical(t1._port, None)
t.stopService()
self.assertIdentical(t._port, None)
self.failIf(t.running)
factory = protocol.ClientFactory()
factory.protocol = wire.Echo
t = internet.UNIXClient('echo.skt', factory)
t.startService()
self.failIfIdentical(t._connection, None)
t1 = copy.copy(t)
self.assertIdentical(t1._connection, None)
t.stopService()
self.assertIdentical(t._connection, None)
self.failIf(t.running)
def testOpenSSLBuffering(self):
serverProto = self.serverProto = SingleLineServerProtocol()
clientProto = self.clientProto = RecordingClientProtocol()
server = protocol.ServerFactory()
client = self.client = protocol.ClientFactory()
server.protocol = lambda: serverProto
client.protocol = lambda: clientProto
client.buffer = []
sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath)
cCTX = ssl.ClientContextFactory()
port = self.port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1')
reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX)
i = 0
while i < 5000 and not client.buffer:
i += 1
reactor.iterate()
self.assertEquals(client.buffer, ["+OK <some crap>\r\n"])
def testImmediateDisconnect(self):
org = "twisted.test.test_ssl"
self.setupServerAndClient(
(org, org + ", client"), {},
(org, org + ", server"), {})
# Set up a server, connect to it with a client, which should work since our verifiers
# allow anything, then disconnect.
serverProtocolFactory = protocol.ServerFactory()
serverProtocolFactory.protocol = protocol.Protocol
self.serverPort = serverPort = reactor.listenSSL(0,
serverProtocolFactory, self.serverCtxFactory)
clientProtocolFactory = protocol.ClientFactory()
clientProtocolFactory.protocol = ImmediatelyDisconnectingProtocol
clientProtocolFactory.connectionDisconnected = defer.Deferred()
clientConnector = reactor.connectSSL('127.0.0.1',
serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)
return clientProtocolFactory.connectionDisconnected.addCallback(
lambda ignoredResult: self.serverPort.stopListening())
def loopback(self, serverCertOpts, clientCertOpts,
onServerLost=None, onClientLost=None, onData=None):
if onServerLost is None:
self.onServerLost = onServerLost = defer.Deferred()
if onClientLost is None:
self.onClientLost = onClientLost = defer.Deferred()
if onData is None:
onData = defer.Deferred()
serverFactory = protocol.ServerFactory()
serverFactory.protocol = DataCallbackProtocol
serverFactory.onLost = onServerLost
serverFactory.onData = onData
clientFactory = protocol.ClientFactory()
clientFactory.protocol = WritingProtocol
clientFactory.onLost = onClientLost
self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts)
self.clientConn = reactor.connectSSL('127.0.0.1', self.serverPort.getHost().port,
clientFactory, clientCertOpts)
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), advertise_ip=True, external_ip=None):
self.best_share_hash_func = best_share_hash_func
self.port = port
self.net = net
self.addr_store = dict(addr_store)
self.connect_addrs = connect_addrs
self.preferred_storage = preferred_storage
self.known_txs_var = known_txs_var
self.mining_txs_var = mining_txs_var
self.advertise_ip = advertise_ip
self.external_ip = external_ip
self.traffic_happened = variable.Event()
self.nonce = random.randrange(2**64)
self.peers = {}
self.bans = {} # address -> end_time
self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
self.serverfactory = ServerFactory(self, max_incoming_conns)
self.running = False
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), advertise_ip=True):
self.best_share_hash_func = best_share_hash_func
self.port = port
self.net = net
self.addr_store = dict(addr_store)
self.connect_addrs = connect_addrs
self.preferred_storage = preferred_storage
self.known_txs_var = known_txs_var
self.mining_txs_var = mining_txs_var
self.advertise_ip = advertise_ip
self.traffic_happened = variable.Event()
self.nonce = random.randrange(2**64)
self.peers = {}
self.bans = {} # address -> end_time
self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
self.serverfactory = ServerFactory(self, max_incoming_conns)
self.running = False
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), advertise_ip=True):
self.best_share_hash_func = best_share_hash_func
self.port = port
self.net = net
self.addr_store = dict(addr_store)
self.connect_addrs = connect_addrs
self.preferred_storage = preferred_storage
self.known_txs_var = known_txs_var
self.mining_txs_var = mining_txs_var
self.advertise_ip = advertise_ip
self.traffic_happened = variable.Event()
self.nonce = random.randrange(2**64)
self.peers = {}
self.bans = {} # address -> end_time
self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts)
self.serverfactory = ServerFactory(self, max_incoming_conns)
self.running = False
def __init__(self, port=RESOURCE_MANAGER_PORT,
parser=DEFAULT_PARSER, log_to_screen=True):
"""Initialize the resource manager server.
Args:
port (number): client listener port.
parser (object): messages parser of type `AbstractParser`.
log_to_screen (bool): Enable log prints to screen.
"""
self.logger = get_logger(log_to_screen)
self._factory = ServerFactory()
self._factory.protocol = Worker
self._factory.logger = self.logger
self._factory.protocol.parser = parser()
self._port = port
self._reactor = SelectReactor()
self._reactor.listenTCP(port, self._factory)
self._resource_manager = ManagerThread(self._reactor, self.logger)
self._factory.request_queue = self._resource_manager.request_queue
def test_invalidDescriptor(self):
"""
An implementation of L{IReactorSocket.adoptStreamPort} raises
L{socket.error} if passed an integer which is not associated with a
socket.
"""
reactor = self.buildReactor()
probe = socket.socket()
fileno = probe.fileno()
probe.close()
exc = self.assertRaises(
socket.error,
reactor.adoptStreamPort, fileno, socket.AF_INET, ServerFactory())
if platform.isWindows() and _PY3:
self.assertEqual(exc.args[0], errno.WSAENOTSOCK)
else:
self.assertEqual(exc.args[0], errno.EBADF)
def test_invalidAddressFamily(self):
"""
An implementation of L{IReactorSocket.adoptStreamPort} raises
L{UnsupportedAddressFamily} if passed an address family it does not
support.
"""
reactor = self.buildReactor()
port = socket.socket()
port.bind(("127.0.0.1", 0))
port.listen(1)
self.addCleanup(port.close)
arbitrary = 2 ** 16 + 7
self.assertRaises(
UnsupportedAddressFamily,
reactor.adoptStreamPort, port.fileno(), arbitrary, ServerFactory())
def test_invalidAddressFamily(self):
"""
An implementation of L{IReactorSocket.adoptStreamConnection} raises
L{UnsupportedAddressFamily} if passed an address family it does not
support.
"""
reactor = self.buildReactor()
connection = socket.socket()
self.addCleanup(connection.close)
arbitrary = 2 ** 16 + 7
self.assertRaises(
UnsupportedAddressFamily,
reactor.adoptStreamConnection, connection.fileno(), arbitrary,
ServerFactory())
def _makeDataConnection(self, ignored=None):
# Establish an active data connection (i.e. server connecting to
# client).
deferred = defer.Deferred()
class DataFactory(protocol.ServerFactory):
protocol = _BufferingProtocol
def buildProtocol(self, addr):
p = protocol.ServerFactory.buildProtocol(self, addr)
reactor.callLater(0, deferred.callback, p)
return p
dataPort = reactor.listenTCP(0, DataFactory(), interface='127.0.0.1')
self.dataPorts.append(dataPort)
cmd = 'PORT ' + ftp.encodeHostPort('127.0.0.1',
dataPort.getHost().port)
self.client.queueStringCommand(cmd)
return deferred
def testTCP(self):
s = service.MultiService()
s.startService()
factory = protocol.ServerFactory()
factory.protocol = TestEcho
TestEcho.d = defer.Deferred()
t = internet.TCPServer(0, factory)
t.setServiceParent(s)
num = t._port.getHost().port
factory = protocol.ClientFactory()
factory.d = defer.Deferred()
factory.protocol = Foo
factory.line = None
internet.TCPClient('127.0.0.1', num, factory).setServiceParent(s)
factory.d.addCallback(self.assertEqual, b'lalala')
factory.d.addCallback(lambda x : s.stopService())
factory.d.addCallback(lambda x : TestEcho.d)
return factory.d
def testPrivileged(self):
factory = protocol.ServerFactory()
factory.protocol = TestEcho
TestEcho.d = defer.Deferred()
t = internet.TCPServer(0, factory)
t.privileged = 1
t.privilegedStartService()
num = t._port.getHost().port
factory = protocol.ClientFactory()
factory.d = defer.Deferred()
factory.protocol = Foo
factory.line = None
c = internet.TCPClient('127.0.0.1', num, factory)
c.startService()
factory.d.addCallback(self.assertEqual, b'lalala')
factory.d.addCallback(lambda x : c.stopService())
factory.d.addCallback(lambda x : t.stopService())
factory.d.addCallback(lambda x : TestEcho.d)
return factory.d
def testUNIX(self):
# FIXME: This test is far too dense. It needs comments.
# -- spiv, 2004-11-07
s = service.MultiService()
s.startService()
factory = protocol.ServerFactory()
factory.protocol = TestEcho
TestEcho.d = defer.Deferred()
t = internet.UNIXServer('echo.skt', factory)
t.setServiceParent(s)
factory = protocol.ClientFactory()
factory.protocol = Foo
factory.d = defer.Deferred()
factory.line = None
internet.UNIXClient('echo.skt', factory).setServiceParent(s)
factory.d.addCallback(self.assertEqual, b'lalala')
factory.d.addCallback(lambda x : s.stopService())
factory.d.addCallback(lambda x : TestEcho.d)
factory.d.addCallback(self._cbTestUnix, factory, s)
return factory.d
def testVolatile(self):
factory = protocol.ServerFactory()
factory.protocol = wire.Echo
t = internet.UNIXServer('echo.skt', factory)
t.startService()
self.failIfIdentical(t._port, None)
t1 = copy.copy(t)
self.assertIsNone(t1._port)
t.stopService()
self.assertIsNone(t._port)
self.assertFalse(t.running)
factory = protocol.ClientFactory()
factory.protocol = wire.Echo
t = internet.UNIXClient('echo.skt', factory)
t.startService()
self.failIfIdentical(t._connection, None)
t1 = copy.copy(t)
self.assertIsNone(t1._connection)
t.stopService()
self.assertIsNone(t._connection)
self.assertFalse(t.running)