def clientConnectionLost(self, connector, reason):
"""Handle notification from the lower layers of connection loss.
If we are shutting down, and twisted sends us the expected type of
error, eat the error. Otherwise, log it and pass it along.
Also, schedule notification of our subscribers at the next pass
through the reactor.
"""
if self.dDown and reason.check(ConnectionDone):
# We initiated the close, this is an expected close/lost
log.debug('%r: Connection Closed:%r:%r', self, connector, reason)
notifyReason = None # Not a failure
else:
log.debug('%r: clientConnectionLost:%r:%r', self, connector,
reason)
notifyReason = reason
# Reset our proto so we don't try to send to a down connection
self.proto = None
# Schedule notification of subscribers
self._get_clock().callLater(0, self._notify, False, notifyReason)
# Call our superclass's method to handle reconnecting
ReconnectingClientFactory.clientConnectionLost(
self, connector, reason)
python类reactor()的实例源码
def __init__(self, host, port, state, path):
"""
@param reactor: An L{IReactorTCP} provider
@param host: A hostname, used when connecting
@type host: str
@param port: The port number, used when connecting
@type port: int
@param path: A list of relay identities.
@type path: list
This endpoint will be routed through Tor over a circuit
defined by path.
"""
self.host = host
self.port = port
self.path = path
self.state = state
self.or_endpoint = get_orport_endpoint(state)
def start_tor(config):
"""
Launches tor with random TCP ports chosen for SocksPort and ControlPort,
and other options specified by a txtorcon.torconfig.TorConfig instance.
Returns a deferred that calls back with a txtorcon.torstate.TorState
instance.
"""
def get_random_tor_ports():
d2 = available_tcp_port(reactor)
d2.addCallback(lambda port: config.__setattr__('SocksPort', port))
d2.addCallback(lambda _: available_tcp_port(reactor))
d2.addCallback(lambda port: config.__setattr__('ControlPort', port))
return d2
def launch_and_get_state(ignore):
d2 = launch_tor(config, reactor, stdout=sys.stdout)
d2.addCallback(lambda tpp: TorState(tpp.tor_protocol).post_bootstrap)
return d2
return get_random_tor_ports().addCallback(launch_and_get_state)
def reExec(self):
"""
Removes pidfile, registers an exec to happen after shutdown, then
stops the reactor.
"""
self.log.warn("SIGHUP received - restarting")
try:
self.log.info("Removing pidfile: {log_source.pidfilePath}")
os.remove(self.pidfilePath)
except OSError:
pass
self.reactor.addSystemEventTrigger(
"after", "shutdown", os.execv,
sys.executable, [sys.executable] + sys.argv
)
self.reactor.stop()
def _getPort(self):
from twisted.internet import reactor
if self.inherit:
port = InheritedSSLPort(
self.args[0], self.args[1], self.args[2], reactor
)
else:
port = MaxAcceptSSLPort(
self.args[0], self.args[1], self.args[2],
self.backlog, self.interface, self.reactor
)
port.startListening()
self.myPort = port
return port
def __init__(self, reactor, transactionFactory, useWorkerPool=True, disableWorkProcessing=False):
"""
Initialize a L{ControllerQueue}.
@param transactionFactory: a 0- or 1-argument callable that produces an
L{IAsyncTransaction}
@param useWorkerPool: Whether to use a worker pool to manage load
or instead take on all work ourselves (e.g. in single process mode)
"""
super(ControllerQueue, self).__init__()
self.reactor = reactor
self.transactionFactory = transactionFactory
self.workerPool = WorkerConnectionPool() if useWorkerPool else None
self.disableWorkProcessing = disableWorkProcessing
self._lastMinPriority = WORK_PRIORITY_LOW
self._timeOfLastWork = time.time()
self._actualPollInterval = self.queuePollInterval
self._inWorkCheck = False
self._inOverdueCheck = False
def _overdueCheckLoop(self):
"""
While the service is running, keep checking for any overdue items.
"""
self._overdueCheckCall = None
if not self.running:
returnValue(None)
try:
yield self._overdueCheck()
except Exception as e:
log.error("_overdueCheckLoop: {exc}", exc=e)
if not self.running:
returnValue(None)
self._overdueCheckCall = self.reactor.callLater(
self.queueOverduePollInterval, self._overdueCheckLoop
)
def parseStreamServer(self, reactor, description, **options):
# The present endpoint plugin is intended to be used as in the
# following for running a streaming protocol over WebSocket over
# an underlying stream transport.
#
# endpoint = serverFromString(reactor,
# "autobahn:tcp\:9000\:interface\=0.0.0.0:url=ws\://localhost\:9000:compress=false"
#
# This will result in `parseStreamServer` to be called will
#
# description == tcp:9000:interface=0.0.0.0
#
# and
#
# options == {'url': 'ws://localhost:9000', 'compress': 'false'}
#
# Essentially, we are using the `\:` escape to coerce the endpoint descriptor
# of the underlying stream transport into one (first) positional argument.
#
# Note that the `\:` within "url" is another form of escaping!
#
opts = _parseOptions(options)
endpoint = serverFromString(reactor, description)
return AutobahnServerEndpoint(reactor, endpoint, opts)
def start_initial(game):
round_data, users_plots = get_round(game)
state = 'initial'
if round_data is None:
game.end_time = timezone.now()
game.save()
game.broadcast(action='redirect', url=reverse('interactive:exit'))
return
else:
cache.set(game.id, {'state': state,
'round_data': round_data,
'users_plots': users_plots,
})
initial(game, round_data, users_plots)
task.deferLater(reactor, 1, game_state_checker, game, state, round_data, users_plots).addErrback(twisted_error)
def throttled(func):
"""Decorator for AgentProxyMixIn.getTable to throttle requests"""
def _wrapper(*args, **kwargs):
self = args[0]
last_request = getattr(self, '_last_request')
delay = (last_request + self.throttle_delay) - time.time()
setattr(self, '_last_request', time.time())
if delay > 0:
_logger.debug("%sss delay due to throttling: %r", delay, self)
return deferLater(reactor, delay, func, *args, **kwargs)
else:
return func(*args, **kwargs)
return wraps(func)(_wrapper)
# pylint: disable=R0903
def service(description, factory, reactor=None):
"""
Return the service corresponding to a description.
@param description: The description of the listening port, in the syntax
described by L{twisted.internet.endpoints.serverFromString}.
@type description: C{str}
@param factory: The protocol factory which will build protocols for
connections to this service.
@type factory: L{twisted.internet.interfaces.IProtocolFactory}
@rtype: C{twisted.application.service.IService}
@return: the service corresponding to a description of a reliable stream
server.
@see: L{twisted.internet.endpoints.serverFromString}
"""
if reactor is None:
from twisted.internet import reactor
svc = StreamServerEndpointService(
endpoints.serverFromString(reactor, description), factory)
svc._raiseSynchronously = True
return svc
def listen(description, factory):
"""
Listen on a port corresponding to a description.
@param description: The description of the connecting port, in the syntax
described by L{twisted.internet.endpoints.serverFromString}.
@type description: L{str}
@param factory: The protocol factory which will build protocols on
connection.
@type factory: L{twisted.internet.interfaces.IProtocolFactory}
@rtype: L{twisted.internet.interfaces.IListeningPort}
@return: the port corresponding to a description of a reliable virtual
circuit server.
@see: L{twisted.internet.endpoints.serverFromString}
"""
from twisted.internet import reactor
name, args, kw = endpoints._parseServer(description, factory)
return getattr(reactor, 'listen' + name)(*args, **kw)
def test_storage_dir_required(self):
"""
When the program is run with no arguments, it should exit with code 2
because there is one required argument.
"""
with ExpectedException(SystemExit, MatchesStructure(code=Equals(2))):
main(reactor, raw_args=[])
def test_storage_dir_provided(self):
"""
When the program is run with an argument, it should start up and run.
The program is expected to fail because it is unable to connect to
Marathon.
This test takes a while because we have to let txacme go through it's
initial sync (registration + issuing of 0 certificates) before things
can be halted.
"""
temp_dir = self.useFixture(TempDir())
yield main(reactor, raw_args=[
temp_dir.path,
'--acme', LETSENCRYPT_STAGING_DIRECTORY.asText(),
'--marathon', 'http://localhost:28080' # An address we can't reach
])
# Expect a 'certs' directory to be created
self.assertThat(os.path.isdir(temp_dir.join('certs')), Equals(True))
# Expect a default certificate to be created
self.assertThat(os.path.isfile(temp_dir.join('default.pem')),
Equals(True))
# Expect to be unable to connect to Marathon
flush_logged_errors(ConnectionRefusedError)
def test_default_reactor(self):
"""
When default_reactor is passed a reactor it should return that reactor.
"""
clock = Clock()
assert_that(default_reactor(clock), Is(clock))
def test_default_reactor_not_provided(self):
"""
When default_reactor is not passed a reactor, it should return the
default reactor.
"""
assert_that(default_reactor(None), Is(reactor))
def test_default_client_not_provided(self):
"""
When default_agent is not passed an agent, it should return a default
agent.
"""
assert_that(default_client(None, reactor), IsInstance(treq_HTTPClient))
def setUp(self):
super(TestHTTPClientBase, self).setUp()
self.requests = DeferredQueue()
self.fake_server = FakeHttpServer(self.handle_request)
fake_client = treq_HTTPClient(self.fake_server.get_agent())
self.client = self.get_client(fake_client)
# Spin the reactor once at the end of each test to clean up any
# cancelled deferreds
self.addCleanup(wait0)
def default_reactor(reactor):
if reactor is None:
from twisted.internet import reactor
return reactor
def default_client(client, reactor):
"""
Set up a default client if one is not provided. Set up the default
``twisted.web.client.Agent`` using the provided reactor.
"""
if client is None:
from twisted.web.client import Agent
client = treq_HTTPClient(Agent(reactor))
return client