def test_crash(self):
"""
reactor.crash should NOT fire shutdown triggers
"""
events = []
self.addTrigger(
"before", "shutdown",
lambda: events.append(("before", "shutdown")))
# reactor.crash called from an "after-startup" trigger is too early
# for the gtkreactor: gtk_mainloop is not yet running. Same is true
# when called with reactor.callLater(0). Must be >0 seconds in the
# future to let gtk_mainloop start first.
reactor.callWhenRunning(
reactor.callLater, 0, reactor.crash)
reactor.run()
self.failIf(events, "reactor.crash invoked shutdown triggers, but it "
"isn't supposed to.")
# XXX Test that reactor.stop() invokes shutdown triggers
python类callWhenRunning()的实例源码
def makeService(self, options):
"""
Construct a Tribler service.
"""
tribler_service = MultiService()
tribler_service.setName("Market")
manhole_namespace = {}
if options["manhole"] > 0:
port = options["manhole"]
manhole = manhole_tap.makeService({
'namespace': manhole_namespace,
'telnetPort': 'tcp:%d:interface=127.0.0.1' % port,
'sshPort': None,
'passwd': os.path.join(os.path.dirname(__file__), 'passwd'),
})
tribler_service.addService(manhole)
reactor.callWhenRunning(self.start_tribler, options)
return tribler_service
def _processClientActions(self):
log.debug("Processing Client Actions...")
while self.clientActions:
session, action = self.clientActions.pop(0)
servername = action['server'][0]
role, handler = self.actionHandlers.get(action['action'][0], (None, None))
if handler:
if self.authRequired:
if role in self.authUsers[session.username].servers.get(servername):
reactor.callWhenRunning(handler, session, action)
else:
self.http._addUpdate(servername = servername, sessid = session.uid, action = "RequestError", message = "You do not have permission to execute this action.")
else:
reactor.callWhenRunning(handler, session, action)
else:
log.error("ClientActionHandler for action %s does not exixts..." % action['action'][0])
def test_run_apt_update_report_timestamp(self):
"""
The package-report-result message includes a timestamp of the apt
update run.
"""
message_store = self.broker_service.message_store
message_store.set_accepted_types(["package-reporter-result"])
self._make_fake_apt_update(err="")
deferred = Deferred()
def do_test():
self.reactor.advance(10)
result = self.reporter.run_apt_update()
def callback(ignore):
self.assertMessages(
message_store.get_pending_messages(),
[{"type": "package-reporter-result",
"report-timestamp": 10.0, "code": 0, "err": u""}])
result.addCallback(callback)
self.reactor.advance(0)
result.chainDeferred(deferred)
reactor.callWhenRunning(do_test)
return deferred
def test_run_apt_update_report_apt_failure(self):
"""
If L{PackageReporter.run_apt_update} fails, a message is sent to the
server reporting the error, to be able to fix the problem centrally.
"""
message_store = self.broker_service.message_store
message_store.set_accepted_types(["package-reporter-result"])
self._make_fake_apt_update(code=2)
deferred = Deferred()
def do_test():
result = self.reporter.run_apt_update()
def callback(ignore):
self.assertMessages(
message_store.get_pending_messages(),
[{"type": "package-reporter-result",
"report-timestamp": 0.0, "code": 2, "err": u"error"}])
result.addCallback(callback)
self.reactor.advance(0)
result.chainDeferred(deferred)
reactor.callWhenRunning(do_test)
return deferred
def test_run_apt_update_report_success(self):
"""
L{PackageReporter.run_apt_update} also reports success to be able to
know the proper state of the client.
"""
message_store = self.broker_service.message_store
message_store.set_accepted_types(["package-reporter-result"])
self._make_fake_apt_update(err="message")
deferred = Deferred()
def do_test():
result = self.reporter.run_apt_update()
def callback(ignore):
self.assertMessages(
message_store.get_pending_messages(),
[{"type": "package-reporter-result",
"report-timestamp": 0.0, "code": 0, "err": u"message"}])
result.addCallback(callback)
self.reactor.advance(0)
result.chainDeferred(deferred)
reactor.callWhenRunning(do_test)
return deferred
def test_run_apt_update_touches_stamp_file(self):
"""
The L{PackageReporter.run_apt_update} method touches a stamp file
after running the apt-update wrapper.
"""
self.reporter.sources_list_filename = "/I/Dont/Exist"
self._make_fake_apt_update()
deferred = Deferred()
def do_test():
result = self.reporter.run_apt_update()
def callback(ignored):
self.assertTrue(
os.path.exists(self.config.update_stamp_filename))
result.addCallback(callback)
self.reactor.advance(0)
result.chainDeferred(deferred)
reactor.callWhenRunning(do_test)
return deferred
def test_crash(self):
"""
reactor.crash should NOT fire shutdown triggers
"""
events = []
self.addTrigger(
"before", "shutdown",
lambda: events.append(("before", "shutdown")))
# reactor.crash called from an "after-startup" trigger is too early
# for the gtkreactor: gtk_mainloop is not yet running. Same is true
# when called with reactor.callLater(0). Must be >0 seconds in the
# future to let gtk_mainloop start first.
reactor.callWhenRunning(
reactor.callLater, 0, reactor.crash)
reactor.run()
self.failIf(events, "reactor.crash invoked shutdown triggers, but it "
"isn't supposed to.")
# XXX Test that reactor.stop() invokes shutdown triggers
def main():
parser = argparse.ArgumentParser()
parser.add_argument('resource')
args = parser.parse_args()
module_name, name = args.resource.rsplit('.', 1)
sys.path.append('.')
resource = getattr(import_module(module_name), name)()
http_port = reactor.listenTCP(PORT, Site(resource))
def print_listening():
host = http_port.getHost()
print('Mock server {} running at http://{}:{}'.format(
resource, host.host, host.port))
reactor.callWhenRunning(print_listening)
reactor.run()
def runcase(alice_class, carol_class, fail_alice_state=None, fail_carol_state=None):
options_server = Options()
wallets = make_wallets(num_alices + 1,
wallet_structures=wallet_structures,
mean_amt=funding_amount)
args_server = ["dummy"]
test_data_server = (wallets[num_alices]['seed'], args_server, options_server,
False, None, carol_class, None, fail_carol_state)
carol_bbmb = main_cs(test_data_server)
options_alice = Options()
options_alice.serve = False
alices = []
for i in range(num_alices):
args_alice = ["dummy", amounts[i]]
if dest_addr:
args_alice.append(dest_addr)
test_data_alice = (wallets[i]['seed'], args_alice, options_alice, False,
alice_class, None, fail_alice_state, None)
alices.append(main_cs(test_data_alice))
l = task.LoopingCall(miner)
reactor.callWhenRunning(start_mining, l)
reactor.run()
return (alices, carol_bbmb, wallets[num_alices]['wallet'])
def run(self):
"""Loads plugins, and initiates polling schedules."""
reactor.callWhenRunning(self.install_sighandlers)
if self.options.netbox:
self.setup_single_job()
elif self.options.multiprocess:
self.setup_multiprocess(self.options.multiprocess,
self.options.max_jobs)
elif self.options.worker:
self.setup_worker()
else:
self.setup_scheduling()
reactor.suggestThreadPoolSize(self.options.threadpoolsize)
reactor.addSystemEventTrigger("after", "shutdown", self.shutdown)
reactor.run()
def setup_scheduling(self):
"Sets up regular job scheduling according to config"
# NOTE: This is locally imported because it will in turn import
# twistedsnmp. Twistedsnmp is stupid enough to call
# logging.basicConfig(). If imported before our own loginit, this
# causes us to have two StreamHandlers on the root logger, duplicating
# every log statement.
self._logger.info("Starting scheduling in single process")
from .schedule import JobScheduler
plugins.import_plugins()
self.work_pool = pool.InlinePool()
reactor.callWhenRunning(JobScheduler.initialize_from_config_and_run,
self.work_pool, self.options.onlyjob)
def log_scheduler_jobs():
JobScheduler.log_active_jobs(logging.INFO)
self.job_loggers.append(log_scheduler_jobs)
def reload_netboxes():
JobScheduler.reload()
self.reloaders.append(reload_netboxes)
def setup_multiprocess(self, process_count, max_jobs):
self._logger.info("Starting multi-process setup")
from .schedule import JobScheduler
plugins.import_plugins()
self.work_pool = pool.WorkerPool(process_count,
max_jobs,
self.options.threadpoolsize)
reactor.callWhenRunning(JobScheduler.initialize_from_config_and_run,
self.work_pool, self.options.onlyjob)
def log_scheduler_jobs():
JobScheduler.log_active_jobs(logging.INFO)
self.job_loggers.append(log_scheduler_jobs)
self.job_loggers.append(self.work_pool.log_summary)
def reload_netboxes():
JobScheduler.reload()
self.reloaders.append(reload_netboxes)
def test_synchronousStop(self):
"""
L{task.react} handles when the reactor is stopped just before the
returned L{Deferred} fires.
"""
def main(reactor):
d = defer.Deferred()
def stop():
reactor.stop()
d.callback(None)
reactor.callWhenRunning(stop)
return d
r = _FakeReactor()
exitError = self.assertRaises(
SystemExit, task.react, main, [], _reactor=r)
self.assertEqual(0, exitError.code)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('resource')
args = parser.parse_args()
module_name, name = args.resource.rsplit('.', 1)
sys.path.append('.')
resource = getattr(import_module(module_name), name)()
http_port = reactor.listenTCP(PORT, Site(resource))
def print_listening():
host = http_port.getHost()
print('Mock server {} running at http://{}:{}'.format(
resource, host.host, host.port))
reactor.callWhenRunning(print_listening)
reactor.run()
def twistedinteract(self):
from twisted.internet import reactor
from twisted.internet.abstract import FileDescriptor
import signal
outerself = self
class Me(FileDescriptor):
def fileno(self):
""" We want to select on FD 0 """
return 0
def doRead(self):
"""called when input is ready"""
try:
outerself.handle1()
except EOFError:
reactor.stop()
reactor.addReader(Me())
reactor.callWhenRunning(signal.signal,
signal.SIGINT,
signal.default_int_handler)
self.prepare()
try:
reactor.run()
finally:
self.restore()
def configure():
params, pipes, args = command.configure(rest_server=True)
port = args.rest_api_port
reactor.listenTCP(port, create_site(params, pipes))
@reactor.callWhenRunning
def startup_message():
print 'Packet Queue is running. Configure at http://localhost:%i' % port
sys.stdout.flush()
def run(self):
def append_from_queue():
while self.spin:
if len(self.queue) > 0:
reactor.listenUDP(0, ClientProtocol(self.ip, self.port, self.queue.pop()))
reactor.stop()
reactor.callWhenRunning(append_from_queue)
reactor.run(installSignalHandlers=False)
def startService(self):
zf = ZmqFactory()
e = ZmqEndpoint(ZmqEndpointType.bind, ENDPOINT)
self._conn = _DispatcherREPConnection(zf, e, self._core)
reactor.callWhenRunning(self._conn.do_greet)
service.Service.startService(self)
def main():
def signal_handler(signal, frame):
if reactor.running:
reactor.stop()
sys.exit(0)
reactor.callWhenRunning(reactor.callLater, 0, execute)
signal.signal(signal.SIGINT, signal_handler)
reactor.run()
def doAction(self, request):
session = request.getSession()
self.monast.clientActions.append((session, request.args))
reactor.callWhenRunning(self.monast._processClientActions)
request.write("OK")
request.finish()
##
## Monast AMI
##
def __start(self):
log.info("Starting Monast Services...")
for servername in self.servers:
reactor.callWhenRunning(self.connect, servername)
def test_gpg_verify_with_non_zero_gpg_exit_code(self):
"""
L{gpg_verify} runs the given gpg binary and returns C{False} if the
provided signature is not valid.
"""
gpg = self.makeFile("#!/bin/sh\necho out; echo err >&2; exit 1\n")
os.chmod(gpg, 0o755)
gpg_home = self.makeDir()
deferred = Deferred()
@mock.patch("tempfile.mkdtemp")
def do_test(mkdtemp_mock):
mkdtemp_mock.return_value = gpg_home
result = gpg_verify("/some/file", "/some/signature", gpg=gpg)
def check_failure(failure):
self.assertEqual(str(failure.value),
"%s failed (out='out\n', err='err\n', "
"code='1')" % gpg)
self.assertFalse(os.path.exists(gpg_home))
result.addCallback(self.fail)
result.addErrback(check_failure)
result.chainDeferred(deferred)
reactor.callWhenRunning(do_test)
return deferred
def test_upgrade_with_failure(self):
"""
The L{ReleaseUpgrader.upgrade} sends a message with failed status
field if the upgrade-tool exits with non-zero code.
"""
self.upgrader.logs_directory = self.makeDir()
upgrade_tool_directory = self.config.upgrade_tool_directory
upgrade_tool_filename = os.path.join(upgrade_tool_directory, "karmic")
fd = open(upgrade_tool_filename, "w")
fd.write("#!/bin/sh\n"
"echo out\n"
"echo err >&2\n"
"exit 3")
fd.close()
os.chmod(upgrade_tool_filename, 0o755)
deferred = Deferred()
def do_test():
result = self.upgrader.upgrade("karmic", 100)
def check_result(ignored):
result_text = (u"=== Standard output ===\n\nout\n\n\n"
"=== Standard error ===\n\nerr\n\n\n")
self.assertMessages(self.get_pending_messages(),
[{"type": "operation-result",
"operation-id": 100,
"status": FAILED,
"result-text": result_text,
"result-code": 3}])
result.addCallback(check_result)
result.chainDeferred(deferred)
reactor.callWhenRunning(do_test)
return deferred
def test_run_apt_update(self, warning_mock):
"""
The L{PackageReporter.run_apt_update} method should run apt-update.
"""
self.reporter.sources_list_filename = "/I/Dont/Exist"
self.reporter.sources_list_directory = "/I/Dont/Exist"
self._make_fake_apt_update()
debug_patcher = mock.patch.object(reporter.logging, "debug")
debug_mock = debug_patcher.start()
self.addCleanup(debug_patcher.stop)
deferred = Deferred()
def do_test():
result = self.reporter.run_apt_update()
def callback(args):
out, err, code = args
self.assertEqual("output", out)
self.assertEqual("error", err)
self.assertEqual(0, code)
self.assertFalse(warning_mock.called)
debug_mock.assert_has_calls([
mock.call(
"Checking if ubuntu-release-upgrader is running."),
mock.call(
"'%s' exited with status 0 (out='output', err='error')"
% self.reporter.apt_update_filename)
])
result.addCallback(callback)
self.reactor.advance(0)
result.chainDeferred(deferred)
reactor.callWhenRunning(do_test)
return deferred
def test_run_apt_update_report_no_sources(self):
"""
L{PackageReporter.run_apt_update} reports a failure if apt succeeds but
there are no APT sources defined. APT doesn't fail if there are no
sources, but we fake a failure in order to re-use the
PackageReporterAlert on the server.
"""
self.facade.reset_channels()
message_store = self.broker_service.message_store
message_store.set_accepted_types(["package-reporter-result"])
self._make_fake_apt_update()
deferred = Deferred()
def do_test():
result = self.reporter.run_apt_update()
def callback(ignore):
error = "There are no APT sources configured in %s or %s." % (
self.reporter.sources_list_filename,
self.reporter.sources_list_directory)
self.assertMessages(
message_store.get_pending_messages(),
[{"type": "package-reporter-result",
"report-timestamp": 0.0, "code": 1, "err": error}])
result.addCallback(callback)
self.reactor.advance(0)
result.chainDeferred(deferred)
reactor.callWhenRunning(do_test)
return deferred
def test_run_apt_update_no_run_in_interval(self):
"""
The L{PackageReporter.run_apt_update} logs a debug message if
apt-update doesn't run because interval has not passed.
"""
self.reporter._apt_sources_have_changed = lambda: False
self.makeFile("", path=self.config.update_stamp_filename)
debug_patcher = mock.patch.object(reporter.logging, "debug")
debug_mock = debug_patcher.start()
self.addCleanup(debug_patcher.stop)
deferred = Deferred()
def do_test():
result = self.reporter.run_apt_update()
def callback(args):
out, err, code = args
self.assertEqual("", out)
self.assertEqual("", err)
self.assertEqual(0, code)
debug_mock.assert_called_once_with(
("'%s' didn't run, conditions not met"
) % self.reporter.apt_update_filename)
result.addCallback(callback)
self.reactor.advance(0)
result.chainDeferred(deferred)
reactor.callWhenRunning(do_test)
return deferred
def test_run_apt_update_no_run_update_notifier_stamp_in_interval(self):
"""
The L{PackageReporter.run_apt_update} doesn't runs apt-update if the
interval is passed but the stamp file from update-notifier-common
reports that 'apt-get update' has been run in the interval.
"""
self.reporter._apt_sources_have_changed = lambda: False
# The interval for the apt-update stamp file is expired.
self.makeFile("", path=self.config.update_stamp_filename)
expired_time = time.time() - self.config.apt_update_interval - 1
os.utime(
self.config.update_stamp_filename, (expired_time, expired_time))
# The interval for the update-notifier-common stamp file is not
# expired.
self.reporter.update_notifier_stamp = self.makeFile("")
debug_patcher = mock.patch.object(reporter.logging, "debug")
debug_mock = debug_patcher.start()
self.addCleanup(debug_patcher.stop)
deferred = Deferred()
def do_test():
result = self.reporter.run_apt_update()
def callback(args):
out, err, code = args
self.assertEqual("", out)
self.assertEqual("", err)
self.assertEqual(0, code)
debug_mock.assert_called_once_with(
("'%s' didn't run, conditions not met"
) % self.reporter.apt_update_filename)
result.addCallback(callback)
self.reactor.advance(0)
result.chainDeferred(deferred)
reactor.callWhenRunning(do_test)
return deferred
def test_run_apt_update_runs_interval_expired(self):
"""
L{PackageReporter.run_apt_update} runs if both apt-update and
update-notifier-common stamp files are present and the time
interval has passed.
"""
self.reporter._apt_sources_have_changed = lambda: False
expired_time = time.time() - self.config.apt_update_interval - 1
# The interval for both stamp files is expired.
self.makeFile("", path=self.config.update_stamp_filename)
os.utime(
self.config.update_stamp_filename, (expired_time, expired_time))
self.reporter.update_notifier_stamp = self.makeFile("")
os.utime(
self.reporter.update_notifier_stamp, (expired_time, expired_time))
message_store = self.broker_service.message_store
message_store.set_accepted_types(["package-reporter-result"])
self._make_fake_apt_update(err="message")
deferred = Deferred()
def do_test():
result = self.reporter.run_apt_update()
def callback(ignore):
self.assertMessages(
message_store.get_pending_messages(),
[{"type": "package-reporter-result",
"report-timestamp": 0.0, "code": 0, "err": u"message"}])
result.addCallback(callback)
self.reactor.advance(0)
result.chainDeferred(deferred)
reactor.callWhenRunning(do_test)
return deferred
def test_run_apt_update_error_no_cache_files(self):
"""
L{PackageReporter.run_apt_update} succeeds if the command fails because
cache files are not found.
"""
message_store = self.broker_service.message_store
message_store.set_accepted_types(["package-reporter-result"])
self._make_fake_apt_update(
code=2, out="not important",
err=("E: Problem renaming the file "
"/var/cache/apt/srcpkgcache.bin.Pw1Zxy to "
"/var/cache/apt/srcpkgcache.bin - rename (2: No such file "
"or directory)\n"
"E: Problem renaming the file "
"/var/cache/apt/pkgcache.bin.wz8ooS to "
"/var/cache/apt/pkgcache.bin - rename (2: No such file "
"or directory)\n"
"E: The package lists or status file could not be parsed "
"or opened."))
deferred = Deferred()
def do_test():
result = self.reporter.run_apt_update()
def callback(ignore):
self.assertMessages(
message_store.get_pending_messages(),
[{"type": "package-reporter-result",
"report-timestamp": 0.0, "code": 0, "err": u""}])
result.addCallback(callback)
self.reactor.advance(0)
result.chainDeferred(deferred)
reactor.callWhenRunning(do_test)
return deferred