def test_config_apt_update_interval(self):
"""
L{PackageReporter} uses the C{apt_update_interval} configuration
parameter to check the age of the update stamp file.
"""
self.config.apt_update_interval = 1234
message_store = self.broker_service.message_store
message_store.set_accepted_types(["package-reporter-result"])
intervals = []
def apt_update_timeout_expired(interval):
intervals.append(interval)
return False
deferred = Deferred()
self.reporter._apt_sources_have_changed = lambda: False
self.reporter._apt_update_timeout_expired = apt_update_timeout_expired
def do_test():
result = self.reporter.run_apt_update()
def callback(ignore):
self.assertMessages(message_store.get_pending_messages(), [])
self.assertEqual([1234], intervals)
result.addCallback(callback)
result.chainDeferred(deferred)
reactor.callWhenRunning(do_test)
return deferred
python类callWhenRunning()的实例源码
def test_store_messages(self):
"""
L{FakeGlobalReporter} stores messages which are sent.
"""
message_store = self.broker_service.message_store
message_store.set_accepted_types(["package-reporter-result"])
self.reporter.apt_update_filename = self.makeFile(
"#!/bin/sh\necho -n error >&2\necho -n output\nexit 0")
os.chmod(self.reporter.apt_update_filename, 0o755)
deferred = Deferred()
def do_test():
self.reporter.get_session_id()
result = self.reporter.run_apt_update()
self.reactor.advance(0)
def callback(ignore):
message = {"type": "package-reporter-result",
"report-timestamp": 0.0, "code": 0, "err": u"error"}
self.assertMessages(
message_store.get_pending_messages(), [message])
stored = list(self.store._db.execute(
"SELECT id, data FROM message").fetchall())
self.assertEqual(1, len(stored))
self.assertEqual(1, stored[0][0])
self.assertEqual(message, bpickle.loads(bytes(stored[0][1])))
result.addCallback(callback)
result.chainDeferred(deferred)
reactor.callWhenRunning(do_test)
return deferred
def setupFacade(config):
"""Get the L{Facade} instance to use in the API service."""
from fluiddb.api.facade import Facade
from fluiddb.util.transact import Transact
maxThreads = int(config.get('service', 'max-threads'))
threadpool = ThreadPool(minthreads=0, maxthreads=maxThreads)
reactor.callWhenRunning(threadpool.start)
reactor.addSystemEventTrigger('during', 'shutdown', threadpool.stop)
transact = Transact(threadpool)
factory = FluidinfoSessionFactory('API-%s' % config.get('service', 'port'))
return Facade(transact, factory)
def make(self, dependency_resources):
"""Create and start a new thread pool."""
from twisted.internet import reactor
global _threadPool
if _threadPool is None:
_threadPool = ThreadPool(minthreads=1, maxthreads=1)
reactor.callWhenRunning(_threadPool.start)
reactor.addSystemEventTrigger('during', 'shutdown',
_threadPool.stop)
return _threadPool
def main():
parser = argparse.ArgumentParser()
parser.add_argument('resource')
args = parser.parse_args()
module_name, name = args.resource.rsplit('.', 1)
sys.path.append('.')
resource = getattr(import_module(module_name), name)()
http_port = reactor.listenTCP(PORT, Site(resource))
def print_listening():
host = http_port.getHost()
print('Mock server {} running at http://{}:{}'.format(
resource, host.host, host.port))
reactor.callWhenRunning(print_listening)
reactor.run()
def __init__(self, configuration):
self.endpoint = UDPEndpoint(configuration['port'])
self.endpoint.open()
self.network = Network()
# Load/generate keys
self.keys = {}
for key_block in configuration['keys']:
if key_block['file'] and isfile(key_block['file']):
with open(key_block['file'], 'r') as f:
self.keys[key_block['alias']] = Peer(ECCrypto().key_from_private_bin(f.read()))
else:
self.keys[key_block['alias']] = Peer(ECCrypto().generate_key(key_block['generation']))
if key_block['file']:
with open(key_block['file'], 'w') as f:
f.write(self.keys[key_block['alias']].key.key_to_bin())
# Setup logging
logging.basicConfig(**configuration['logger'])
self.strategies = []
self.overlays = []
for overlay in configuration['overlays']:
overlay_class = _COMMUNITIES[overlay['class']]
my_peer = self.keys[overlay['key']]
overlay_instance = overlay_class(my_peer, self.endpoint, self.network, **overlay['initialize'])
self.overlays.append(overlay_instance)
for walker in overlay['walkers']:
strategy_class = _WALKERS[walker['strategy']]
args = walker['init']
target_peers = walker['peers']
self.strategies.append((strategy_class(overlay_instance, **args), target_peers))
for config in overlay['on_start']:
reactor.callWhenRunning(getattr(overlay_instance, config[0]), *config[1:])
self.state_machine_lc = LoopingCall(self.on_tick).start(configuration['walker_interval'], False)
def makeService(self, options):
"""
Construct a IPv8 service.
"""
ipv8_service = MultiService()
ipv8_service.setName("IPv8")
reactor.callWhenRunning(self.start_ipv8, options)
return ipv8_service
def runReactor(self):
from twisted.internet import reactor
reactor.callWhenRunning(self.whenRunning)
self.log.info("Starting reactor...")
reactor.run()
def twistedinteract(self):
from twisted.internet import reactor
from twisted.internet.abstract import FileDescriptor
import signal
outerself = self
class Me(FileDescriptor):
def fileno(self):
""" We want to select on FD 0 """
return 0
def doRead(self):
"""called when input is ready"""
try:
outerself.handle1()
except EOFError:
reactor.stop()
reactor.addReader(Me())
reactor.callWhenRunning(signal.signal,
signal.SIGINT,
signal.default_int_handler)
self.prepare()
try:
reactor.run()
finally:
self.restore()
def main():
parser = argparse.ArgumentParser()
parser.add_argument('resource')
args = parser.parse_args()
module_name, name = args.resource.rsplit('.', 1)
sys.path.append('.')
resource = getattr(import_module(module_name), name)()
http_port = reactor.listenTCP(PORT, Site(resource))
def print_listening():
host = http_port.getHost()
print('Mock server {} running at http://{}:{}'.format(
resource, host.host, host.port))
reactor.callWhenRunning(print_listening)
reactor.run()
def setup_worker(self):
"Sets up a worker process"
# NOTE: This is locally imported because it will in turn import
# twistedsnmp. Twistedsnmp is stupid enough to call
# logging.basicConfig(). If imported before our own loginit, this
# causes us to have two StreamHandlers on the root logger, duplicating
# every log statement.
self._logger.info("Starting worker process")
plugins.import_plugins()
def init():
handler = pool.initialize_worker()
self.job_loggers.append(handler.log_jobs)
reactor.callWhenRunning(init)
def setup_single_job(self):
"Sets up a single job run with exit when done"
from .jobs import JobHandler
from . import config
def _run_job():
descriptors = dict((d.name, d) for d in config.get_jobs())
job = descriptors[self.options.onlyjob]
self._log_context = dict(job=job.name,
sysname=self.options.netbox.sysname)
job_handler = JobHandler(job.name, self.options.netbox.id,
plugins=job.plugins,
interval=job.interval)
deferred = maybeDeferred(job_handler.run)
deferred.addBoth(_log_job, job_handler, interval=job.interval)
deferred.addBoth(lambda x: reactor.stop())
def _log_job(result, handler, interval):
success = not isinstance(result, Failure)
schedule.log_job_externally(handler, success if result else None,
interval)
plugins.import_plugins()
self._logger.info("Running single %r job for %s",
self.options.onlyjob, self.options.netbox)
reactor.callWhenRunning(_run_job)
def main():
logging.basicConfig(
format='%(asctime)s:%(name)s:' +
'%(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
args = parse_options()
dashd = DashDaemon(args.consul, args.kafka, args.grafana_url, args.topic)
reactor.callWhenRunning(dashd.start)
reactor.run()
log.info("completed!")
def start_reactor(self):
reactor.callWhenRunning(
lambda: self.log.info('twisted-reactor-started'))
reactor.addSystemEventTrigger('before', 'shutdown',
self.shutdown_components)
reactor.run()
def main():
logging.basicConfig(
format='%(asctime)s:%(name)s:' +
'%(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
args = parse_options()
consumer_example = ConsumerExample(args.consul, args.topic,
int(args.runtime))
reactor.callWhenRunning(consumer_example.start)
reactor.run()
log.info("completed!")
def start_reactor(self):
reactor.callWhenRunning(
lambda: self.log.info('twisted-reactor-started'))
reactor.addSystemEventTrigger('before', 'shutdown',
self.shutdown_components)
reactor.run()
def start_reactor(self):
reactor.callWhenRunning(
lambda: self.log.info('twisted-reactor-started'))
reactor.addSystemEventTrigger('before', 'shutdown',
self.shutdown_components)
reactor.run()