def run(self):
"""Loads plugins, and initiates polling schedules."""
reactor.callWhenRunning(self.install_sighandlers)
if self.options.netbox:
self.setup_single_job()
elif self.options.multiprocess:
self.setup_multiprocess(self.options.multiprocess,
self.options.max_jobs)
elif self.options.worker:
self.setup_worker()
else:
self.setup_scheduling()
reactor.suggestThreadPoolSize(self.options.threadpoolsize)
reactor.addSystemEventTrigger("after", "shutdown", self.shutdown)
reactor.run()
python类suggestThreadPoolSize()的实例源码
def testSuggestThreadPoolSize(self):
# XXX Uh, how about some asserts?
reactor.suggestThreadPoolSize(34)
reactor.suggestThreadPoolSize(4)
def setUp(self):
reactor.suggestThreadPoolSize(8)
def tearDown(self):
reactor.suggestThreadPoolSize(0)
def do_cleanThreads(cls):
from twisted.internet import reactor
if interfaces.IReactorThreads.providedBy(reactor):
reactor.suggestThreadPoolSize(0)
if hasattr(reactor, 'threadpool') and reactor.threadpool:
reactor.threadpool.stop()
reactor.threadpool = None
# *Put it back* and *start it up again*. The
# reactor's threadpool is *private*: we cannot just
# rape it and walk away.
reactor.threadpool = threadpool.ThreadPool(0, 10)
reactor.threadpool.start()
def testSuggestThreadPoolSize(self):
# XXX Uh, how about some asserts?
reactor.suggestThreadPoolSize(34)
reactor.suggestThreadPoolSize(4)
def setUp(self):
reactor.suggestThreadPoolSize(8)
def tearDown(self):
reactor.suggestThreadPoolSize(0)
def do_cleanThreads(cls):
from twisted.internet import reactor
if interfaces.IReactorThreads.providedBy(reactor):
reactor.suggestThreadPoolSize(0)
if hasattr(reactor, 'threadpool') and reactor.threadpool:
reactor.threadpool.stop()
reactor.threadpool = None
# *Put it back* and *start it up again*. The
# reactor's threadpool is *private*: we cannot just
# rape it and walk away.
reactor.threadpool = threadpool.ThreadPool(0, 10)
reactor.threadpool.start()
def test_suggestThreadPoolSize(self):
"""
Try to change maximum number of threads.
"""
reactor.suggestThreadPoolSize(34)
self.assertEqual(reactor.threadpool.max, 34)
reactor.suggestThreadPoolSize(4)
self.assertEqual(reactor.threadpool.max, 4)
def setUp(self):
reactor.suggestThreadPoolSize(8)
def tearDown(self):
reactor.suggestThreadPoolSize(0)
def startService(self):
from channels import DEFAULT_CHANNEL_LAYER, channel_layers
from channels.staticfiles import StaticFilesConsumer
from channels.worker import Worker
reactor.suggestThreadPoolSize(self.worker_count + 3)
channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER]
channel_layer.router.check_default(http_consumer=StaticFilesConsumer())
for _ in range(self.worker_count):
w = Worker(channel_layer, signal_handlers=False)
self._workers.append((w, threads.deferToThread(w.run)))
def main():
preferences = Preferences()
task_factory = SimpleTaskFactory(PythonCollectionTask)
task_splitter = PerDataSourceInstanceTaskSplitter(task_factory)
daemon = CollectorDaemon(preferences, task_splitter)
pool_size = preferences.options.threadPoolSize
# The Twisted version shipped with Zenoss 4.1 doesn't have this.
if hasattr(reactor, 'suggestThreadPoolSize'):
reactor.suggestThreadPoolSize(pool_size)
daemon.run()
def __init__(self, config, component_builder=None, testing=False):
logging.basicConfig(filename=config['log_file'],
level=config['log_level'])
logging.captureWarnings(True)
logger.debug("Configuration: " + config.view())
logger.debug("Creating a new data router")
self.config = config
self.data_router = self._create_data_router(config, component_builder)
self._testing = testing
reactor.suggestThreadPoolSize(config['num_threads'] * 5)
def start_search_tasks():
"""
Before everything, kill if there is any running search tasks. Then start the search tasks
concurrently.
"""
global SEARCH_TASKS
logging.info("(Re)populated config collections from config file. "
"Cancelling previous loops and restarting them again with the new config.")
for looping_task in SEARCH_TASKS:
logging.info("Cancelling this loop: %r", looping_task)
looping_task.stop()
SEARCH_TASKS = []
searches = CONFIG['Searches'].values()
search_count = len(searches)
logging.info("Search count: %d", search_count)
reactor.suggestThreadPoolSize(search_count)
try:
for search in searches:
search_obj = Search(SERVICE_CLASS_MAPPER.get(search['destination']['service']), search,
CONFIG)
do_search_concurrently(search_obj)
except Exception as exception:
logging.exception("Exception occurred while processing search. %s", exception.message)
def __init__(self, config, component_builder=None, testing=False):
logging.basicConfig(filename=config['log_file'],
level=config['log_level'])
logging.captureWarnings(True)
logger.debug("Configuration: " + config.view())
logger.debug("Creating a new data router")
self.config = config
self.data_router = self._create_data_router(config, component_builder)
self._testing = testing
reactor.suggestThreadPoolSize(config['num_threads'] * 5)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-m", "--mainnet", action="store_true", default=False,
help="Use MainNet instead of the default TestNet")
parser.add_argument("-p", "--privnet", action="store_true", default=False,
help="Use PrivNet instead of the default TestNet")
parser.add_argument("-c", "--config", action="store", help="Use a specific config file")
parser.add_argument("-t", "--set-default-theme", dest="theme",
choices=["dark", "light"], help="Set the default theme to be loaded from the config file. Default: 'dark'")
parser.add_argument('--version', action='version',
version='neo-python v{version}'.format(version=__version__))
args = parser.parse_args()
if args.config and (args.mainnet or args.privnet):
print("Cannot use both --config and --mainnet/--privnet arguments, please use only one.")
exit(1)
if args.mainnet and args.privnet:
print("Cannot use both --mainnet and --privnet arguments")
exit(1)
# Setup depending on command line arguments. By default, the testnet settings are already loaded.
if args.config:
settings.setup(args.config)
elif args.mainnet:
settings.setup_mainnet()
elif args.privnet:
settings.setup_privnet()
if args.theme:
preferences.set_theme(args.theme)
# Instantiate the blockchain and subscribe to notifications
blockchain = LevelDBBlockchain(settings.LEVELDB_PATH)
Blockchain.RegisterBlockchain(blockchain)
# Start the prompt interface
cli = PromptInterface()
# Run
reactor.suggestThreadPoolSize(15)
reactor.callInThread(cli.run)
NodeLeader.Instance().Start()
reactor.run()