def connect_ip(factory, config, ip_local_default=True, ip_version_default=4):
'''
Internal implementation that opens IP connections for factory based on
config. See connect() for details.
'''
# upgrade things to lists if they have been passed as single items
config = _listify(config)
ip_version_default = _listify(ip_version_default)
# now process the list
connectors = []
for item in config:
if not validate_connection_config(item,
must_have_ip=(not ip_local_default)):
# warn but skip invalid configs
continue
if 'port' in item:
port = int(item['port'])
if 'ip' in item:
ip = item['ip']
connectors.append(reactor.connectTCP(ip, port, factory))
elif ip_local_default:
for ip_version in ip_version_default:
ip = IP_CONNECT_DEFAULT[ip_version]
connectors.append(reactor.connectTCP(ip, port, factory))
return _unlistify(connectors)
python类connectTCP()的实例源码
def downloadPage(url, file, contextFactory=None, *args, **kwargs):
"""Download a web page to a file.
@param file: path to file on filesystem, or file-like object.
See HTTPDownloader to see what extra args can be passed.
"""
scheme, host, port, path = _parse(url)
factory = HTTPDownloader(url, file, *args, **kwargs)
if scheme == 'https':
from twisted.internet import ssl
if contextFactory is None:
contextFactory = ssl.ClientContextFactory()
reactor.connectSSL(host, port, factory, contextFactory)
else:
reactor.connectTCP(host, port, factory)
return factory.deferred
def render(self, request):
"""Render this request, from my server.
This will always be asynchronous, and therefore return NOT_DONE_YET.
It spins off a request to the pb client, and either adds it to the list
of pending issues or requests it immediately, depending on if the
client is already connected.
"""
if not self.publisher:
self.pending.append(request)
if not self.waiting:
self.waiting = 1
bf = pb.PBClientFactory()
timeout = 10
if self.host == "unix":
reactor.connectUNIX(self.port, bf, timeout)
else:
reactor.connectTCP(self.host, self.port, bf, timeout)
d = bf.getRootObject()
d.addCallbacks(self.connected, self.notConnected)
else:
i = Issue(request)
self.publisher.callRemote('request', request).addCallbacks(i.finished, i.failed)
return NOT_DONE_YET
def process(self):
parsed = urlparse.urlparse(self.uri)
protocol = parsed[0]
host = parsed[1]
port = self.ports[protocol]
if ':' in host:
host, port = host.split(':')
port = int(port)
rest = urlparse.urlunparse(('','')+parsed[2:])
if not rest:
rest = rest+'/'
class_ = self.protocols[protocol]
headers = self.getAllHeaders().copy()
if not headers.has_key('host'):
headers['host'] = host
self.content.seek(0, 0)
s = self.content.read()
clientFactory = class_(self.method, rest, self.clientproto, headers,
s, self)
reactor.connectTCP(host, port, clientFactory)
def testTcpNoDelay(self):
f = MyServerFactory()
port = reactor.listenTCP(0, f, interface="127.0.0.1")
self.n = port.getHost().port
self.ports.append(port)
clientF = MyClientFactory()
reactor.connectTCP("127.0.0.1", self.n, clientF)
d = loopUntil(lambda: (f.called > 0 and
getattr(clientF, 'protocol', None) is not None))
def check(x):
for p in clientF.protocol, f.protocol:
transport = p.transport
self.assertEquals(transport.getTcpNoDelay(), 0)
transport.setTcpNoDelay(1)
self.assertEquals(transport.getTcpNoDelay(), 1)
transport.setTcpNoDelay(0)
self.assertEquals(transport.getTcpNoDelay(), 0)
d.addCallback(check)
d.addBoth(lambda _: self.cleanPorts(clientF.protocol.transport, port))
return d
def testClientStartStop(self):
f = ClosingFactory()
p = reactor.listenTCP(0, f, interface="127.0.0.1")
self.n = p.getHost().port
self.ports.append(p)
f.port = p
d = loopUntil(lambda :p.connected)
def check(ignored):
factory = ClientStartStopFactory()
reactor.connectTCP("127.0.0.1", self.n, factory)
self.assert_(factory.started)
return loopUntil(lambda :factory.stopped)
d.addCallback(check)
d.addBoth(lambda _: self.cleanPorts(*self.ports))
return d
def testReconnect(self):
f = ClosingFactory()
p = reactor.listenTCP(0, f, interface="127.0.0.1")
n = p.getHost().port
self.ports.append(p)
f.port = p
factory = MyClientFactory()
d = loopUntil(lambda :p.connected)
def step1(ignored):
def clientConnectionLost(c, reason):
c.connect()
factory.clientConnectionLost = clientConnectionLost
reactor.connectTCP("127.0.0.1", n, factory)
return loopUntil(lambda :factory.failed)
def step2(ignored):
p = factory.protocol
self.assertEquals((p.made, p.closed), (1, 1))
factory.reason.trap(error.ConnectionRefusedError)
self.assertEquals(factory.stopped, 1)
return self.cleanPorts(*self.ports)
return d.addCallback(step1).addCallback(step2)
def testHostAddress(self):
f1 = MyServerFactory()
p1 = reactor.listenTCP(0, f1, interface='127.0.0.1')
n = p1.getHost().port
self.ports.append(p1)
f2 = MyOtherClientFactory()
p2 = reactor.connectTCP('127.0.0.1', n, f2)
d = loopUntil(lambda :p2.state == "connected")
def check(ignored):
self.assertEquals(p1.getHost(), f2.address)
self.assertEquals(p1.getHost(), f2.protocol.transport.getPeer())
return p1.stopListening()
def cleanup(ignored):
self.ports.append(p2.transport)
return self.cleanPorts(*self.ports)
return d.addCallback(check).addCallback(cleanup)
def testWriter(self):
f = protocol.Factory()
f.protocol = WriterProtocol
f.done = 0
f.problem = 0
wrappedF = WiredFactory(f)
p = reactor.listenTCP(0, wrappedF, interface="127.0.0.1")
n = p.getHost().port
self.ports.append(p)
clientF = WriterClientFactory()
wrappedClientF = WiredFactory(clientF)
reactor.connectTCP("127.0.0.1", n, wrappedClientF)
def check(ignored):
self.failUnless(f.done, "writer didn't finish, it probably died")
self.failUnless(f.problem == 0, "writer indicated an error")
self.failUnless(clientF.done,
"client didn't see connection dropped")
expected = "".join(["Hello Cleveland!\n",
"Goodbye", " cruel", " world", "\n"])
self.failUnless(clientF.data == expected,
"client didn't receive all the data it expected")
d = defer.gatherResults([wrappedF.onDisconnect,
wrappedClientF.onDisconnect])
return d.addCallback(check)
def _testBuildProtocol(self, portno):
f = AServerFactory(self, IPv4Address('TCP', '127.0.0.1', portno))
wrappedF = FireOnListenFactory(f)
p = reactor.listenTCP(0, wrappedF)
self.ports.append(p)
def client(ignored):
acf = AClientFactory(self, IPv4Address("TCP", "127.0.0.1",
p.getHost().port))
wired = WiredFactory(acf)
reactor.connectTCP("127.0.0.1", p.getHost().port, wired,
bindAddress=("127.0.0.1", portno))
d = wired.onConnect
def _onConnect(ignored):
self.ports.append(acf.protocol.transport)
self.assert_(hasattr(self, "ran"))
return wired.onDisconnect
def _onDisconnect(ignored):
del self.ran
d.addCallback(_onConnect)
d.addCallback(_onDisconnect)
return d
return wrappedF.deferred.addCallback(client)
def testStopTrying(self):
f = Factory()
f.protocol = In
f.connections = 0
f.allMessages = []
f.goal = 2
f.d = defer.Deferred()
c = ReconnectingClientFactory()
c.initialDelay = c.delay = 0.2
c.protocol = Out
c.howManyTimes = 2
port = self.port = reactor.listenTCP(0, f)
PORT = port.getHost().port
reactor.connectTCP('127.0.0.1', PORT, c)
f.d.addCallback(self._testStopTrying_1, f, c)
return f.d
def ftp_PORT(self, address):
addr = map(int, address.split(','))
ip = '%d.%d.%d.%d' % tuple(addr[:4])
port = addr[4] << 8 | addr[5]
# if we have a DTP port set up, lose it.
if self.dtpFactory is not None:
self.cleanupDTP()
self.dtpFactory = DTPFactory(pi=self, peerHost=self.transport.getPeer().host)
self.dtpFactory.setTimeout(self.dtpTimeout)
self.dtpPort = reactor.connectTCP(ip, port, self.dtpFactory)
def connected(ignored):
return ENTERING_PORT_MODE
def connFailed(err):
err.trap(PortConnectionError)
return CANT_OPEN_DATA_CNX
return self.dtpFactory.deferred.addCallbacks(connected, connFailed)
def lookupZone(self, name, timeout = 10):
"""
Perform an AXFR request. This is quite different from usual
DNS requests. See http://cr.yp.to/djbdns/axfr-notes.html for
more information.
"""
address = self.pickServer()
if address is None:
return defer.fail(IOError('No domain name servers available'))
host, port = address
d = defer.Deferred()
controller = AXFRController(name, d)
factory = DNSClientFactory(controller, timeout)
factory.noisy = False #stfu
from twisted.internet import reactor
connector = reactor.connectTCP(host, port, factory)
controller.timeoutCall = reactor.callLater(timeout or 10,
self._timeoutZone,
d, controller,
connector,
timeout or 10)
return d.addCallback(self._cbLookupZone, connector)
def __init__(self, url, outputfile, contextFactory=None, *args, **kwargs):
if hasattr(client, '_parse'):
scheme, host, port, path = client._parse(url)
else:
try:
from twisted.web.client import _URI as URI
except ImportError:
from twisted.web.client import URI
uri = URI.fromBytes(url)
scheme = uri.scheme
host = uri.host
port = uri.port or (443 if scheme == 'https' else 80)
path = uri.path
self.factory = HTTPProgressDownloader(url, outputfile, *args, **kwargs)
if scheme == "https":
from twisted.internet import ssl
if contextFactory is None:
contextFactory = ssl.ClientContextFactory()
self.connection = reactor.connectSSL(host, port, self.factory, contextFactory)
else:
self.connection = reactor.connectTCP(host, port, self.factory)
def getPage(url, contextFactory=None, *args, **kwargs):
scheme, host, port, path, username, password = _parse(url)
if username and password:
url = scheme + '://' + host + ':' + str(port) + path
basicAuth = encodestring("%s:%s" % (username, password))
authHeader = "Basic " + basicAuth.strip()
AuthHeaders = {"Authorization": authHeader}
if kwargs.has_key("headers"):
kwargs["headers"].update(AuthHeaders)
else:
kwargs["headers"] = AuthHeaders
factory = HTTPClientFactory(url, *args, **kwargs)
reactor.connectTCP(host, port, factory)
return factory.deferred
def _think(self):
try:
if len(self.conns) < self.desired_conns and len(self.attempts) < self.max_attempts and self.node.addr_store:
(host, port), = self.node.get_good_peers(1)
if self._host_to_ident(host) in self.attempts:
pass
elif host in self.node.bans and self.node.bans[host] > time.time():
pass
else:
#print 'Trying to connect to', host, port
reactor.connectTCP(host, port, self, timeout=5)
except:
log.err()
return random.expovariate(1/1)
def push_to_member(self, member: Member, ignore_for_statistics=False) -> None:
"""Push to the specified member."""
bptc.logger.debug('Push to {}... ({}, {})'.format(member.verify_key[:6], member.address.host, member.address.port))
with self.hashgraph.lock:
data_string = self.generate_data_string(self.hashgraph.me,
self.hashgraph.get_unknown_events_of(member),
filter_members_with_address(self.hashgraph.known_members.values()))
if not ignore_for_statistics:
factory = PushClientFactory(data_string, network=self, receiver=member)
else:
factory = PushClientFactory(data_string, network=None, receiver=member)
def push():
if member.address is not None:
reactor.connectTCP(member.address.host, member.address.port, factory)
threads.blockingCallFromThread(reactor, push)
def main():
hostname = raw_input('IMAP4 Server Hostname: ')
port = raw_input('IMAP4 Server Port (the default is 143): ')
username = raw_input('IMAP4 Username: ')
password = util.getPassword('IMAP4 Password: ')
onConn = defer.Deferred(
).addCallback(cbServerGreeting, username, password
).addErrback(ebConnection
).addBoth(cbClose)
factory = SimpleIMAP4ClientFactory(username, onConn)
from twisted.internet import reactor
conn = reactor.connectTCP(hostname, int(port), factory)
reactor.run()
def _think(self):
try:
if len(self.conns) < self.desired_conns and len(self.attempts) < self.max_attempts and self.node.addr_store:
(host, port), = self.node.get_good_peers(1)
if self._host_to_ident(host) in self.attempts:
pass
elif host in self.node.bans and self.node.bans[host] > time.time():
pass
else:
#print 'Trying to connect to', host, port
reactor.connectTCP(host, port, self, timeout=5)
except:
log.err()
return random.expovariate(1/1)
def __init__(self, url, outputfile, contextFactory=None, *args, **kwargs):
if hasattr(client, '_parse'):
scheme, host, port, path = client._parse(url)
else:
try:
from twisted.web.client import _URI as URI
except ImportError:
from twisted.web.client import URI
uri = URI.fromBytes(url)
scheme = uri.scheme
host = uri.host
port = uri.port
path = uri.path
self.factory = HTTPProgressDownloader(url, outputfile, *args, **kwargs)
if scheme == 'https':
from twisted.internet import ssl
if contextFactory is None:
contextFactory = ssl.ClientContextFactory()
self.connection = reactor.connectSSL(host, port, self.factory, contextFactory)
else:
self.connection = reactor.connectTCP(host, port, self.factory)
def getPage(url, contextFactory=None, *args, **kwargs):
scheme, host, port, path, username, password = _parse(url)
if username and password:
url = scheme + '://' + host + ':' + str(port) + path
basicAuth = encodestring("%s:%s" % (username, password))
authHeader = "Basic " + basicAuth.strip()
AuthHeaders = {"Authorization": authHeader}
if kwargs.has_key("headers"):
kwargs["headers"].update(AuthHeaders)
else:
kwargs["headers"] = AuthHeaders
factory = HTTPClientFactory(url, *args, **kwargs)
reactor.connectTCP(host, port, factory)
return factory.deferred
def _think(self):
try:
if len(self.conns) < self.desired_conns and len(self.attempts) < self.max_attempts and self.node.addr_store:
(host, port), = self.node.get_good_peers(1)
if self._host_to_ident(host) in self.attempts:
pass
elif host in self.node.bans and self.node.bans[host] > time.time():
pass
else:
#print 'Trying to connect to', host, port
reactor.connectTCP(host, port, self, timeout=5)
except:
log.err()
return random.expovariate(1/1)
def main():
f = EchoFactory() # ???EchoFactory
reactor.connectTCP("localhost", 8000, f)
# twisted.internet.selectreactor.SelectReactor
# ??????SelectReactor???twisted.internet.posixbase.PosixReactorBase???
# connectTCP(self, host, port, factory, timeout=30, bindAddress=None):??
#
#
# ???????twisted.internet.tcp.Connector(),????????client?ClientFactory,
#
reactor.run() # ?????????????
# run?????????startRunning??,startRunning???ReactorBase??startRunning??
# run?????????mainLoop??
# mainLoop?????????SelectReactor.doIteration(t)??,???????????select.select????
# ???????,??self._doReadOrWrite??,?????????,????????client,????????????,
# ??twisted.internet.tcp.BaseClient().doConnect,???self._connectDone(),?????self.protocol.makeConnection(self)
# ?????????self.connectionMade(),??????????????,??????EchoClient().connectionMade()
def downloadPage(url, file, contextFactory=None, *args, **kwargs):
"""Download a web page to a file.
@param file: path to file on filesystem, or file-like object.
See HTTPDownloader to see what extra args can be passed.
"""
scheme, host, port, path = _parse(url)
factory = HTTPDownloader(url, file, *args, **kwargs)
if scheme == 'https':
from twisted.internet import ssl
if contextFactory is None:
contextFactory = ssl.ClientContextFactory()
reactor.connectSSL(host, port, factory, contextFactory)
else:
reactor.connectTCP(host, port, factory)
return factory.deferred
def render(self, request):
"""Render this request, from my server.
This will always be asynchronous, and therefore return NOT_DONE_YET.
It spins off a request to the pb client, and either adds it to the list
of pending issues or requests it immediately, depending on if the
client is already connected.
"""
if not self.publisher:
self.pending.append(request)
if not self.waiting:
self.waiting = 1
bf = pb.PBClientFactory()
timeout = 10
if self.host == "unix":
reactor.connectUNIX(self.port, bf, timeout)
else:
reactor.connectTCP(self.host, self.port, bf, timeout)
d = bf.getRootObject()
d.addCallbacks(self.connected, self.notConnected)
else:
i = Issue(request)
self.publisher.callRemote('request', request).addCallbacks(i.finished, i.failed)
return NOT_DONE_YET
def process(self):
parsed = urlparse.urlparse(self.uri)
protocol = parsed[0]
host = parsed[1]
port = self.ports[protocol]
if ':' in host:
host, port = host.split(':')
port = int(port)
rest = urlparse.urlunparse(('','')+parsed[2:])
if not rest:
rest = rest+'/'
class_ = self.protocols[protocol]
headers = self.getAllHeaders().copy()
if not headers.has_key('host'):
headers['host'] = host
self.content.seek(0, 0)
s = self.content.read()
clientFactory = class_(self.method, rest, self.clientproto, headers,
s, self)
reactor.connectTCP(host, port, clientFactory)
def testTcpNoDelay(self):
f = MyServerFactory()
port = reactor.listenTCP(0, f, interface="127.0.0.1")
self.n = port.getHost().port
self.ports.append(port)
clientF = MyClientFactory()
reactor.connectTCP("127.0.0.1", self.n, clientF)
d = loopUntil(lambda: (f.called > 0 and
getattr(clientF, 'protocol', None) is not None))
def check(x):
for p in clientF.protocol, f.protocol:
transport = p.transport
self.assertEquals(transport.getTcpNoDelay(), 0)
transport.setTcpNoDelay(1)
self.assertEquals(transport.getTcpNoDelay(), 1)
transport.setTcpNoDelay(0)
self.assertEquals(transport.getTcpNoDelay(), 0)
d.addCallback(check)
d.addBoth(lambda _: self.cleanPorts(clientF.protocol.transport, port))
return d
def testClientStartStop(self):
f = ClosingFactory()
p = reactor.listenTCP(0, f, interface="127.0.0.1")
self.n = p.getHost().port
self.ports.append(p)
f.port = p
d = loopUntil(lambda :p.connected)
def check(ignored):
factory = ClientStartStopFactory()
reactor.connectTCP("127.0.0.1", self.n, factory)
self.assert_(factory.started)
return loopUntil(lambda :factory.stopped)
d.addCallback(check)
d.addBoth(lambda _: self.cleanPorts(*self.ports))
return d
def testReconnect(self):
f = ClosingFactory()
p = reactor.listenTCP(0, f, interface="127.0.0.1")
n = p.getHost().port
self.ports.append(p)
f.port = p
factory = MyClientFactory()
d = loopUntil(lambda :p.connected)
def step1(ignored):
def clientConnectionLost(c, reason):
c.connect()
factory.clientConnectionLost = clientConnectionLost
reactor.connectTCP("127.0.0.1", n, factory)
return loopUntil(lambda :factory.failed)
def step2(ignored):
p = factory.protocol
self.assertEquals((p.made, p.closed), (1, 1))
factory.reason.trap(error.ConnectionRefusedError)
self.assertEquals(factory.stopped, 1)
return self.cleanPorts(*self.ports)
return d.addCallback(step1).addCallback(step2)
def testHostAddress(self):
f1 = MyServerFactory()
p1 = reactor.listenTCP(0, f1, interface='127.0.0.1')
n = p1.getHost().port
self.ports.append(p1)
f2 = MyOtherClientFactory()
p2 = reactor.connectTCP('127.0.0.1', n, f2)
d = loopUntil(lambda :p2.state == "connected")
def check(ignored):
self.assertEquals(p1.getHost(), f2.address)
self.assertEquals(p1.getHost(), f2.protocol.transport.getPeer())
return p1.stopListening()
def cleanup(ignored):
self.ports.append(p2.transport)
return self.cleanPorts(*self.ports)
return d.addCallback(check).addCallback(cleanup)