def test_runsUntilAsyncCallback(self):
"""
L{task.react} runs the reactor until the L{Deferred} returned by the
function it is passed is called back, then stops it.
"""
timePassed = []
def main(reactor):
finished = defer.Deferred()
reactor.callLater(1, timePassed.append, True)
reactor.callLater(2, finished.callback, None)
return finished
r = _FakeReactor()
exitError = self.assertRaises(
SystemExit, task.react, main, _reactor=r)
self.assertEqual(0, exitError.code)
self.assertEqual(timePassed, [True])
self.assertEqual(r.seconds(), 2)
python类react()的实例源码
def test_runsUntilAsyncErrback(self):
"""
L{task.react} runs the reactor until the L{defer.Deferred} returned by
the function it is passed is errbacked, then it stops the reactor and
reports the error.
"""
class ExpectedException(Exception):
pass
def main(reactor):
finished = defer.Deferred()
reactor.callLater(1, finished.errback, ExpectedException())
return finished
r = _FakeReactor()
exitError = self.assertRaises(
SystemExit, task.react, main, _reactor=r)
self.assertEqual(1, exitError.code)
errors = self.flushLoggedErrors(ExpectedException)
self.assertEqual(len(errors), 1)
def test_runsUntilSyncErrback(self):
"""
L{task.react} returns quickly if the L{defer.Deferred} returned by the
function it is passed has already been errbacked at the time it is
returned.
"""
class ExpectedException(Exception):
pass
def main(reactor):
return defer.fail(ExpectedException())
r = _FakeReactor()
exitError = self.assertRaises(
SystemExit, task.react, main, _reactor=r)
self.assertEqual(1, exitError.code)
self.assertEqual(r.seconds(), 0)
errors = self.flushLoggedErrors(ExpectedException)
self.assertEqual(len(errors), 1)
def test_singleStopCallback(self):
"""
L{task.react} doesn't try to stop the reactor if the L{defer.Deferred}
the function it is passed is called back after the reactor has already
been stopped.
"""
def main(reactor):
reactor.callLater(1, reactor.stop)
finished = defer.Deferred()
reactor.addSystemEventTrigger(
'during', 'shutdown', finished.callback, None)
return finished
r = _FakeReactor()
exitError = self.assertRaises(
SystemExit, task.react, main, _reactor=r)
self.assertEqual(r.seconds(), 1)
self.assertEqual(0, exitError.code)
def test_synchronousStop(self):
"""
L{task.react} handles when the reactor is stopped just before the
returned L{Deferred} fires.
"""
def main(reactor):
d = defer.Deferred()
def stop():
reactor.stop()
d.callback(None)
reactor.callWhenRunning(stop)
return d
r = _FakeReactor()
exitError = self.assertRaises(
SystemExit, task.react, main, [], _reactor=r)
self.assertEqual(0, exitError.code)
def main():
parser = argparse.ArgumentParser(
description="""Test connectivity for the plugins which support connectivity tests."""
)
parser.add_argument('--log-file', dest='logfile', default='connectivity.log')
parser.add_argument('--debug', dest="debug", action="store_true", default=False)
parser.add_argument('--namespaces', dest='namespaces', type=str, nargs='+',
default=DEFAULT_NAMESPACES,
help='Namespaces for plugins to test.')
parser.add_argument('--plugins', dest='plugin_names', type=str, nargs='+',
default=None,
help='Names of plugins to test.')
config = stethoscope.api.factory.get_config()
args = parser.parse_args()
config['LOGBOOK'] = stethoscope.utils.setup_logbook(args.logfile)
config['LOGBOOK'].push_application()
config['DEBUG'] = args.debug
config['TESTING'] = args.debug
task.react(_main, (args, config))
def adoptConnection(socket, address):
from twisted.internet import task
task.react(RLXProtocolFactory.buildConnection, (socket, address))
def test():
transports = [
{
'type': 'rawsocket',
'serializer': 'msgpack',
'endpoint': {
'type': 'unix',
'path': '/tmp/cb1.sock'
}
},
{
'type': 'websocket',
'url': 'ws://127.0.0.1:8080/ws',
'endpoint': {
'type': 'tcp',
'host': '127.0.0.1',
'port': 8080
}
}
]
connection1 = Connection(main1, transports=transports[0])
yield react(connection1.start)
connection2 = Connection(main2, transports=transports[1])
yield react(connection2.start)
def test_runsUntilSyncCallback(self):
"""
L{task.react} returns quickly if the L{Deferred} returned by the
function it is passed has already been called back at the time it is
returned.
"""
def main(reactor):
return defer.succeed(None)
r = _FakeReactor()
exitError = self.assertRaises(
SystemExit, task.react, main, _reactor=r)
self.assertEqual(0, exitError.code)
self.assertEqual(r.seconds(), 0)
def test_arguments(self):
"""
L{task.react} passes the elements of the list it is passed as
positional arguments to the function it is passed.
"""
args = []
def main(reactor, x, y, z):
args.extend((x, y, z))
return defer.succeed(None)
r = _FakeReactor()
exitError = self.assertRaises(
SystemExit, task.react, main, [1, 2, 3], _reactor=r)
self.assertEqual(0, exitError.code)
self.assertEqual(args, [1, 2, 3])
def test_defaultReactor(self):
"""
L{twisted.internet.reactor} is used if no reactor argument is passed to
L{task.react}.
"""
def main(reactor):
self.passedReactor = reactor
return defer.succeed(None)
reactor = _FakeReactor()
with NoReactor():
installReactor(reactor)
exitError = self.assertRaises(SystemExit, task.react, main, [])
self.assertEqual(0, exitError.code)
self.assertIs(reactor, self.passedReactor)
def test_exitWithDefinedCode(self):
"""
L{task.react} forwards the exit code specified by the C{SystemExit}
error returned by the passed function, if any.
"""
def main(reactor):
return defer.fail(SystemExit(23))
r = _FakeReactor()
exitError = self.assertRaises(
SystemExit, task.react, main, [], _reactor=r)
self.assertEqual(23, exitError.code)
def test_asynchronousStop(self):
"""
L{task.react} handles when the reactor is stopped and the
returned L{Deferred} doesn't fire.
"""
def main(reactor):
reactor.callLater(1, reactor.stop)
return defer.Deferred()
r = _FakeReactor()
exitError = self.assertRaises(
SystemExit, task.react, main, [], _reactor=r)
self.assertEqual(0, exitError.code)
def main():
parser = argparse.ArgumentParser(
description="""Pull records for a batch of users and submit to external services."""
)
parser.add_argument('--timeout', dest="timeout", type=int, default=10)
parser.add_argument('--limit', dest="limit", type=int, default=10,
help="""Retrieve data for at most this many users concurrently.""")
parser.add_argument('--log-file', dest='logfile', default='batch.log')
parser.add_argument('input', nargs='?', type=argparse.FileType('r'), default=None)
parser.add_argument('--collect-only', dest="collect_only", action="store_true")
parser.add_argument('--debug', dest="debug", action="store_true", default=False)
config = stethoscope.api.factory.get_config()
args = parser.parse_args()
for plugin in ['BITFIT', 'JAMF']:
config[plugin + '_TIMEOUT'] = args.timeout
config['LOGBOOK'] = stethoscope.utils.setup_logbook(args.logfile)
config['LOGBOOK'].push_application()
config['DEBUG'] = args.debug
config['TESTING'] = args.debug
yaml.add_representer(arrow.arrow.Arrow, arrow_representer)
yaml.SafeDumper.add_representer(arrow.arrow.Arrow, arrow_representer)
task.react(_main, (args, config))
def run():
task.react(oonideckgen)
def run():
task.react(oonireport)
def run():
task.react(ooniprobe)