def concurrent_get_release_status(targets, timeout=4):
"""
Args:
target (list of tuples): a list of (host, target_path)
"""
if len(targets) == 0:
return []
# workaround for http://bugs.python.org/issue7980
import _strptime # noqa
pool = multiprocessing.dummy.Pool(min(20, len(targets)))
def _inner_get_release_status(target):
host, path = target
return get_release_status(host, path, timeout)
try:
return pool.map(_inner_get_release_status, targets, chunksize=1)
finally:
pool.close()
python类dummy()的实例源码
def dns_bulk_resolve(candidates, reverse=False, ip_version=None, threads=50):
"""
Resolve a list of host names to IPs or, if reverse is true, IPs to
host names. Return a map of each result keyed to its candidate.
WARNING: This function will create a pool of up to 'threads'
threads.
"""
# This is based loosely on http://stackoverflow.com/a/34377198
if reverse and ip_version is not None:
raise ValueError("Unable to force IP version when reverse-resolving")
if ip_version is None:
ip_version = 4
__check_ip_version__(ip_version)
result = {}
if len(candidates) == 0:
return result
# Work around a bug in 2.6
# TODO: Get rid of this when 2.6 is no longer in the picture.
if not hasattr(threading.current_thread(), "_children"):
threading.current_thread()._children = weakref.WeakKeyDictionary()
pool = multiprocessing.dummy.Pool(
processes=min(len(candidates), threads) )
candidate_args = [ (candidate, ip_version) for candidate in candidates ]
for ip, name in pool.imap(
__reverser__ if reverse else __forwarder__,
candidate_args,
chunksize=1):
result[ip] = name
pool.close()
return result
def api_ping_list(hosts, bind=None, timeout=None, threads=10):
"""
Ping a list of hosts and return a list of their statuses.
"""
if len(hosts) == 0:
return {}
# Work around a bug in 2.6
# TODO: Get rid of this when 2.6 is no longer in the picture.
if not hasattr(threading.current_thread(), "_children"):
threading.current_thread()._children = weakref.WeakKeyDictionary()
pool = multiprocessing.dummy.Pool(processes=min(len(hosts), threads))
pool_args = [(host, timeout) for host in hosts]
result = {}
def ping_one(arg):
host, timeout = arg
up, _ = api_ping(host, bind=bind, timeout=timeout)
return (host, up)
for host, state in pool.imap(
ping_one,
pool_args,
chunksize=1):
result[host] = state
pool.close()
return result
def api_has_services(hosts, timeout=5, bind=None, threads=10):
"""
Do a parallel rendition of the two functions above.
Returns a hash of host names and results
"""
# Work around a bug in 2.6
# TODO: Get rid of this when 2.6 is no longer in the picture.
if not hasattr(threading.current_thread(), "_children"):
threading.current_thread()._children = weakref.WeakKeyDictionary()
pool = multiprocessing.dummy.Pool(processes=min(len(hosts), threads))
def check_one(arg):
host, service, function = arg
return (host, service, function(host, timeout=timeout, bind=bind))
args = []
result = {}
for host in hosts:
args.extend([
(host, "bwctl", api_has_bwctl),
(host, "pscheduler", api_has_pscheduler)
])
result[host] = {
"bwctl": None,
"pscheduler": None
}
for host, service, state in pool.imap(check_one, args, chunksize=1):
result[host][service] = state
pool.close()
return result
def _all(func, hosts):
'''
Internal function that allow function to perform in all hosts
'''
all_instances = []
# threads should likely scale with cores or interfaces
cpus = multiprocessing.cpu_count()
threads = 4 * cpus
log.debug('multi._all cpus count={}, thread count={}'.format(cpus, threads))
pool = multiprocessing.dummy.Pool(threads)
for instance in pool.map(func, hosts):
all_instances.append(instance)
return all_instances
def __init__(self, dataset, feedin_shape, collate_fn=default_collate, threads=1, shuffle=False):
super(DataLoader, self).__init__()
self.dataset = dataset
self.threads = threads
self.collate_fn = collate_fn(feedin_shape)
# self.collate_fn = self.default_collate_fn
# shape related variables
self.data_shapes = feedin_shape['data']
self.label_shapes = feedin_shape['label']
self.batch_size = feedin_shape['batch_size']
# loader related variables
self.current = 0
self.total = len(self.dataset)
self.shuflle = shuffle
self.map_index = list(range(self.total))
# prepare for loading
self.get_batch = self.get_batch_single_thread
if self.threads > 1: # multi process read
from multiprocessing.dummy import Pool as ThreadPool
# self.pool = multiprocessing.Pool(self.threads)
self.pool = ThreadPool(self.threads)
self.get_batch = self.get_batch_multi_thread
self.reset()
def test_main(run=None):
if sys.platform.startswith("linux"):
try:
lock = multiprocessing.RLock()
except OSError:
raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
check_enough_semaphores()
if run is None:
from test.test_support import run_unittest as run
util.get_temp_dir() # creates temp directory for use by all processes
multiprocessing.get_logger().setLevel(LOG_LEVEL)
ProcessesMixin.pool = multiprocessing.Pool(4)
ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
ManagerMixin.manager.__init__()
ManagerMixin.manager.start()
ManagerMixin.pool = ManagerMixin.manager.Pool(4)
testcases = (
sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
testcases_other
)
loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
# (ncoghlan): Whether or not sys.exc_clear is executed by the threading
# module during these tests is at least platform dependent and possibly
# non-deterministic on any given platform. So we don't mind if the listed
# warnings aren't actually raised.
with test_support.check_py3k_warnings(
(".+__(get|set)slice__ has been removed", DeprecationWarning),
(r"sys.exc_clear\(\) not supported", DeprecationWarning),
quiet=True):
run(suite)
ThreadsMixin.pool.terminate()
ProcessesMixin.pool.terminate()
ManagerMixin.pool.terminate()
ManagerMixin.manager.shutdown()
del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
def test_main(run=None):
if sys.platform.startswith("linux"):
try:
lock = multiprocessing.RLock()
except OSError:
raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
check_enough_semaphores()
if run is None:
from test.test_support import run_unittest as run
util.get_temp_dir() # creates temp directory for use by all processes
multiprocessing.get_logger().setLevel(LOG_LEVEL)
ProcessesMixin.pool = multiprocessing.Pool(4)
ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
ManagerMixin.manager.__init__()
ManagerMixin.manager.start()
ManagerMixin.pool = ManagerMixin.manager.Pool(4)
testcases = (
sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
testcases_other
)
loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
# (ncoghlan): Whether or not sys.exc_clear is executed by the threading
# module during these tests is at least platform dependent and possibly
# non-deterministic on any given platform. So we don't mind if the listed
# warnings aren't actually raised.
with test_support.check_py3k_warnings(
(".+__(get|set)slice__ has been removed", DeprecationWarning),
(r"sys.exc_clear\(\) not supported", DeprecationWarning),
quiet=True):
run(suite)
ThreadsMixin.pool.terminate()
ProcessesMixin.pool.terminate()
ManagerMixin.pool.terminate()
ManagerMixin.manager.shutdown()
del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool