def getSourceFileObjects(kwargs_list, workers=None):
"""
Gets source file objects by applying each item on kwargs_list as
kwargs on the source parser class. Uses kwargs['filename'] to
determine if the source is VHDL or Verilog/SystemVerilog
"""
pool = Pool(workers)
async_results = []
for kwargs in kwargs_list:
if _isVhdl(kwargs['filename']):
cls = VhdlParser
else:
cls = VerilogParser
async_results += [pool.apply_async(cls, kwds=kwargs)]
pool.close()
pool.join()
results = [x.get() for x in async_results]
return results
python类ThreadPool()的实例源码
def batch_geoparse(self, text_list):
"""
Batch geoparsing function. Take in a list of text documents and return a list of lists
of the geoparsed documents. The speed improvements come from using spaCy's `nlp.pipe` and by multithreading
calls to `geoparse`.
Parameters
----------
text_list : list of strs
List of documents. The documents should not have been pre-processed by spaCy.
Returns
-------
proced : list of list of dicts
The list is the same length as the input list of documents. Each element is a list of geolocated entities.
"""
nlped_docs = nlp.pipe(text_list, n_threads = self.n_threads)
pool = ThreadPool(self.n_threads)
processed = pool.map(self.geoparse, nlped_docs)
pool.close()
pool.join()
return processed
def process_method_on_list(method_to_run, items):
'''helper method that processes a method on each listitem with pooling if the system supports it'''
all_items = []
if SUPPORTS_POOL:
pool = ThreadPool()
try:
all_items = pool.map(method_to_run, items)
except Exception:
# catch exception to prevent threadpool running forever
log_msg(format_exc(sys.exc_info()))
log_msg("Error in %s" % method_to_run)
pool.close()
pool.join()
else:
all_items = [method_to_run(item) for item in items]
all_items = filter(None, all_items)
return all_items
def parse(d):
"""Check a dict keyed by the related calls against their expected values
Dict format:
Key:
tuple:
[0] - module from which the command is called
[1] - command which you are calling
[*] - index=x, where x is the index you wish
[*] - end=x, where x is the end of the range to return
[*] - all other args in the order the command is supposed
to receive it; keyed arguments are not supported
Value:
The expected return value
"""
if d == {} or d is None:
return True
if len(d) == 1:
return process(list(d.items())[0])
from multiprocessing.pool import ThreadPool
p = list(d.items())
r = ThreadPool().map(process, p)
return not (False in r)
def _ResolveTombstones(jobs, tombstones, tombstone_symbolizer):
"""Resolve a list of tombstones.
Args:
jobs: the number of jobs to use with multithread.
tombstones: a list of tombstones.
"""
if not tombstones:
logging.warning('No tombstones to resolve.')
return []
tombstone_symbolizer.UnzipAPKIfNecessary()
if len(tombstones) == 1:
data = [_ResolveTombstone([tombstones[0], tombstone_symbolizer])]
else:
pool = ThreadPool(jobs)
data = pool.map(
_ResolveTombstone,
[[tombstone, tombstone_symbolizer] for tombstone in tombstones])
resolved_tombstones = []
for tombstone in data:
resolved_tombstones.extend(tombstone)
return resolved_tombstones
def multiDownload(self):
#????????????????????????????
#????CPU?????????????????
starttime = datetime.datetime.now()
"""????????"""
self.get_symbol()
cx = self.Symbol_Db['equity'].find()
symbolSet = set([d['code'] for d in cx]) # ????????
p = ThreadPool(100)
p.map(self.downloadEquityAllData, symbolSet)
p.close()
p.join()
endtime = datetime.datetime.now()
print "??: " + str(endtime - starttime)
def run_per_file(config, ignore_paths=None, path=None, config_dir=None):
ignore_paths = ignore_paths or []
path = path or os.getcwd()
cmd = run_config(config, config_dir)
print(cmd)
run_cmds = []
patterns = PATTERNS.get(config.get('language'))
paths = all_filenames_in_dir(path=path, ignore_paths=ignore_paths)
for pattern in patterns:
for filepath in fnmatch.filter(paths, pattern):
run_cmds.append(cmd + [filepath])
pool = Pool()
def result(run_cmd):
_, out = run_command(run_cmd)
return run_cmd[-1], out
output = pool.map(result, run_cmds)
return output
def perform_normalisation(self, ori_file_list, output_file_list, label_type="state_align", dur_file_list=None):
logger = logging.getLogger("perform_normalisation")
logger.info('perform linguistic feature extraction')
self.utterance_num = len(ori_file_list)
if self.utterance_num != len(output_file_list):
logger.error('the number of input and output linguistic files should be the same!\n')
sys.exit(1)
def _perform_normalisation(i):
if not dur_file_list:
self.extract_linguistic_features(ori_file_list[i], output_file_list[i], label_type)
else:
self.extract_linguistic_features(ori_file_list[i], output_file_list[i], label_type, dur_file_list[i])
pool = Pool()
pool.map(_perform_normalisation, range(self.utterance_num))
pool.close()
pool.join()
## the exact function to do the work
## need to be implemented in the specific class
## the function will write the linguistic features directly to the output file
def __init__(self, n, probe_key, ignore_clock_skew=False, metadata_encoding=None, disable_action_probes=False):
# Each QR code takes about 1ms (and updates at 5fps). We do
# our best to ensure the QR is processed in time for the next
# step call (n/16 would put us right at the threshold).
self.pool = pool.ThreadPool(max(int(n/4), 1))
self.qr_pool = pool.ThreadPool(max(int(n/8), 1))
self.lock = threading.RLock()
self.instance_n = [None] * n
self.ignore_clock_skew = ignore_clock_skew
self.disable_action_probes = disable_action_probes
self.metadata_encoding = metadata_encoding
self.update(probe_key=probe_key, metadata_encoding=metadata_encoding)
# only used in flashgames right now
def lambda_handler(event, context):
LOGGER.debug("Received event: " + json.dumps(event, indent=2))
n_fail = 0
n_succ = 0
recs = event['Records']
tp = ThreadPool(min(len(recs),20))
try:
recs = [event_to_dynamo_images(x) for x in recs]
rc = tp.map_async(check_remove_queue,recs)
# identities can be bulk-deleted in groups of 60 via AWS API, so handle in this thread
check_remove_identities(recs)
rc.wait(max(context.get_remaining_time_in_millis()/1000.0 - 2.0,0))
if not rc.ready():
LOGGER.error("Timeout waiting on processors")
tp.terminate()
else:
n_del = len([x for x in rc.get() if x])
LOGGER.info("Processed {0} records, {1} queues deleted".format(len(recs),n_del))
finally:
tp.close()
def test_members_get(self):
with self.app.app_context():
# create collection, members
c_obj = self.mock.collection(description={'something':'abcdefghi123ö'})
m_objs = [self.mock.member() for i in range(5)]
# add collection, members
self.app.db.set_collection(c_obj)
# for m_obj in m_objs:
self.app.db.set_member(c_obj.id, m_objs)
# pool = ThreadPool(50)
# pool.map(lambda m_obj: self.app.db.set_member(c_obj.id, m_obj), m_objs)
# GET members
response = self.get("collections/"+urllib.parse.quote_plus(c_obj.id)+"/members")
# assert 200 OK
self.assertEqual(response.status_code, 200)
sortedResponse = [r.dict() for r in sorted(json.loads(response.data)['contents'], key=lambda x: x.id)]
sortedMocks = [m.dict() for m in sorted(m_objs, key=lambda x: x.id)]
for i in range(len(sortedMocks)):
self.assertDictEqual(sortedResponse[i], sortedMocks[i])
def _find_get_func_for_client(client):
'''Return the "get" function corresponding to client'''
if client is None:
return get_sync
elif Executor and isinstance(client, Executor):
def get(*args, **kwargs):
pbar = ProgressBar()
pbar.register()
out = client.get(*args, **kwargs)
pbar.unregister()
return out
return get
elif isinstance(client, ThreadPool):
return dask_threaded_get
else:
raise ValueError('client argument not a thread pool dask scheduler or None')
def client_context(dask_client=None, dask_scheduler=None):
'''client_context creates a dask distributed or threadpool client or None
Parameters:
dask_client: str from choices ("DISTRIBUTED", 'THREAD_POOL', 'SERIAL')
or None to take DASK_CLIENT from environment
dask_scheduler: Distributed scheduler url or None to take
DASK_SCHEDULER from environment
'''
env = parse_env_vars()
dask_client = dask_client or env.get('DASK_CLIENT', 'DISTRIBUTED')
dask_scheduler = dask_scheduler or env.get('DASK_SCHEDULER')
if dask_client == 'DISTRIBUTED':
client = Executor(dask_scheduler) if dask_scheduler else Executor()
elif dask_client == 'THREAD_POOL':
client = ThreadPool(env.get('DASK_THREADS'))
elif dask_client == 'SERIAL':
client = None
else:
raise ValueError('Did not expect DASK_CLIENT to be {}'.format(dask_client))
get_func = _find_get_func_for_client(client)
with da.set_options(pool=dask_client):
yield client
def parallel_apply_method(method, nodes, sample_rate=1, duration=1, leaves_only=False):
"""
Apply wrapped-method "method" to every node in "nodes", "sample_rate" times per second, for "duration" seconds.
Returns a list of results for each time slice. Each time slice result is a wrapped-method result tuple
(node, return value, exception)
"""
if leaves_only:
nodes = [x for x in nodes if x.get_property('#units') != "PathNode"]
if not nodes:
return {}
with ThreadPool(len(nodes)) as pool:
time_slice_results = queue.Queue()
def apply_time_slice():
time_slice_results.put(pool.map_async(method, nodes))
num_slices = int(duration * sample_rate)
slice_times = [slice_number / sample_rate for slice_number in range(num_slices)]
time_slice_threads = [threading.Timer(time, apply_time_slice) for time in slice_times]
complete_all_threads(time_slice_threads)
return consume_queue(time_slice_results)
def make_concurrent_calls(*calls):
"""
If you need to make multiple concurrent calls, potentially to
different functions, or with different kwargs each time.
Args:
*calls (Iterable[Union[function, str], dict]) - list of
(func or func path, kwargs) tuples to call concurrently
Returns:
List[Any] - return values from each call in `calls`
(results are returned in same order as supplied)
"""
pool = Pool(len(calls))
results = []
for func, kwargs in calls:
results.append(
pool.apply_async(test_call, args=(func,), kwds=kwargs)
)
pool.close()
pool.join()
# add a bit of extra timeout to allow process terminate cleanup to run
# (because we also have an inner timeout on our ProcessManager thread join)
return [result.get(timeout=SUBPROCESS_TIMEOUT + 2) for result in results]
def map_progress(func, targets, n_threads):
"""
Process targets in multi-threaded mode with progress bar
"""
progress.set_n_total(len(targets))
pool = ThreadPool(processes=n_threads)
ret = []
try:
ret = pool.map(func, targets)
except Exception as exc:
Logger.error('Unexpected exception while processing targets: {}'.format(exc), exc_info=True)
finally:
progress.finish()
return list(zip(targets, ret))
test_parsers.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_multithread_stringio_read_csv(self):
# GH 11786
max_row_range = 10000
num_files = 100
bytes_to_df = [
'\n'.join(
['%d,%d,%d' % (i, i, i) for i in range(max_row_range)]
).encode() for j in range(num_files)]
files = [BytesIO(b) for b in bytes_to_df]
# Read all files in many threads
pool = ThreadPool(8)
results = pool.map(pd.read_csv, files)
first_result = results[0]
for result in results:
tm.assert_frame_equal(first_result, result)
def process_method_on_list(method_to_run, items):
'''helper method that processes a method on each listitem with pooling if the system supports it'''
all_items = []
if SUPPORTS_POOL:
pool = ThreadPool()
try:
all_items = pool.map(method_to_run, items)
except Exception:
# catch exception to prevent threadpool running forever
log_msg(format_exc(sys.exc_info()))
log_msg("Error in %s" % method_to_run)
pool.close()
pool.join()
else:
all_items = [method_to_run(item) for item in items]
all_items = filter(None, all_items)
return all_items
def lanczosSubPixShiftStack( imageStack, translations, n_threads=16 ):
"""
Does subpixel translations shifts for a stack of images using a ThreadPool to distribute the load.
I could make this a general function utility by passing in the function handle.
"""
tPool = ThreadPool( n_threads )
if imageStack.ndim != 3:
raise ValueError( "lanczosSubPixShiftStack() only works on image stacks with Z-axis as the zero dimension" )
slices = imageStack.shape[0]
# Build parameters list for the threaded processeses, consisting of index
tArgs = [None] * slices
for J in np.arange(slices):
tArgs[J] = (J, imageStack, translations)
# All operations are done 'in-place'
tPool.map( lanczosIndexedShift, tArgs )
tPool.close()
tPool.join()
def get_page_torrents(page_links, workers, numbers):
""" given a list of links containing individual
torrent info pages,
return a list containing Torrent objects
"""
pool = Pool(processes=workers)
while len(page_links) > numbers:
page_links.pop()
assert (len(page_links) != 0), 'Number of torrent pages equals to 0!'
torrents = pool.map(get_torrent_info, page_links)
#torrents = map(get_torrent_info, page_links)
#torrents = [pool.apply(get_torrent_info, args=(x,)) for x in page_links]
pool.close()
pool.join()
return torrents
def re_run_everything(self):
if self.rerunning:
print 'already running...'
return
self.rerunning = True
utts = []
for sess in self.get_all_sessions():
utts.extend(self.get_session_utterances(sess['_id']))
# unleash the threads...
p = Pool(multiprocessing.cpu_count())
# TODO: would be good to have some sort of identifier so that
# these jobs can be cancelled if new commands are added.
print 'starting re_run_everything'
p.map(self.re_run, utts)
p.close()
self.rerunning = False
print 'finished'
def handle(self, *args, **options):
# Setup logger with levels and path
log_path = os.path.join(options['log'], 'riverscope', __name__ + '_log.txt')
if options['debug']:
LOG.set_print_handler_level(logging.DEBUG)
LOG.set_file_handler(log_path, logging.DEBUG)
else:
LOG.set_print_handler_level(logging.INFO)
LOG.set_file_handler(log_path, logging.DEBUG)
time_start = utils.start_timer()
pool = ThreadPool(100)
# TODO http://stackoverflow.com/questions/2632520/what-is-the-fastest-way-to-send-100-000-http-requests-in-python
results = pool.map(get_readings, zip(Stations.objects.all(), repeat(options['lastn'])))
clean_results = list(filter(None, results))
station_readings = [s for sl in clean_results for s in sl]
with transaction.atomic():
StationReadings.objects.all().delete()
StationReadings.objects.bulk_create(station_readings)
time_diff = utils.end_timer(time_start)
LOG.info('Added {} readings in {}'.format(len(station_readings), time_diff))
def setup(self,bottom,top):
#self.top_names = ['data_a', 'data_p', 'data_n', 'data_l']
self.top_names = ['data_a', 'data_p', 'data_n']
params = eval(self.param_str)
# Check the paramameters for validity.
check_params(params)
# store input as class variables
self.batch_loader = BatchLoader(params)
self.batch_size = params['batch_size']
self.pool = ThreadPool(processes=1)
self.thread_results = self.pool.apply_async(\
self.batch_loader.load_next_batch, ())
# reshape
top[0].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1])
top[1].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1])
top[2].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1])
#top[3].reshape(params['batch_size'], 3) #label of anchor,pos & neg example
print_info('Triplet data layer',params)
def setup(self,bottom,top):
#self.top_names = ['data_a', 'data_p', 'data_n', 'data_l']
self.top_names = ['data_s', 'data_i', 'label_s','label_i']
params = eval(self.param_str)
# Check the paramameters for validity.
check_params(params)
# store input as class variables
self.batch_loader = BatchLoader(params)
self.batch_size = params['batch_size']
#1
self.pool = ThreadPool(processes=1)
self.thread_results = self.pool.apply_async(\
self.batch_loader.load_next_batch, ())
# reshape
top[0].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1])
top[1].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1])
top[2].reshape(params['batch_size'], 1)
top[3].reshape(params['batch_size'], 1)
if 'verbose' not in params:
print_info('2-branch data layer',params)
def setup(self,bottom,top):
self.top_names = ['data', 'label']
params = eval(self.param_str)
# Check the paramameters for validity.
check_params(params)
# store input as class variables
self.batch_loader = BatchLoader(params)
self.batch_size = params['batch_size']
#1
self.pool = ThreadPool(processes=1)
self.thread_results = self.pool.apply_async(self.batch_loader.load_next_batch, ())
# reshape
top[0].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1])
top[1].reshape(params['batch_size'])
print_info('Data layer',params)
def setup(self,bottom,top):
#self.top_names = ['data_a', 'data_p', 'data_n', 'data_l']
self.top_names = ['data_a', 'data_p', 'data_n']
params = eval(self.param_str)
# Check the paramameters for validity.
check_params(params)
# store input as class variables
self.batch_loader = BatchLoader(params)
self.batch_size = params['batch_size']
#1
self.pool = ThreadPool(processes=1)
self.thread_results = self.pool.apply_async(\
self.batch_loader.load_next_batch, ())
# reshape
top[0].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1])
top[1].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1])
top[2].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1])
#top[3].reshape(params['batch_size'], 3) #label of anchor,pos & neg example
if 'verbose' not in params:
print_info('Triplet data layer',params)
def __init__(self, params):
self.batch_size = params['batch_size']
self.outshape = params['shape']
self.lmdb = lmdbs(params['source'])
self.labels = self.lmdb.get_label_list()
self.img_mean = biproto2py(params['mean_file']).squeeze()
self.NIMGS = len(self.labels)
assert self.NIMGS%self.batch_size==0,'NIMGS {} not dividible by batchsize {}'.format(
self.NIMGS,self.batch_size)
self.num_batches = self.NIMGS/self.batch_size
self._cur = 0 # current batch
self.labels_tab = self.labels.reshape((self.num_batches,self.batch_size))
# this class does some simple data-manipulations
self.img_augment = SimpleAugment(mean=self.img_mean,shape=params['shape'],
scale = params['scale'])
#create threadpools for parallel augmentation
#self.pool = ThreadPool() #4
def __init__(self, params):
self.batch_size = params['batch_size']
self.outshape = params['shape']
self.lmdb = lmdbs(params['source'])
self.labels = self.lmdb.get_label_list()
self.img_mean = biproto2py(params['mean_file']).squeeze()
self.NIMGS = len(self.labels)
self.num_batches = int(np.ceil(self.NIMGS/float(self.batch_size)))
self._cur = 0 # current batch
# this class does some simple data-manipulations
self.img_augment = SimpleAugment(mean=self.img_mean,shape=params['shape'],
scale = params['scale'])
#create threadpools for parallel augmentation
#self.pool = ThreadPool() #4
def setup(self,bottom,top):
#self.top_names = ['data_a', 'data_p', 'data_n', 'data_l']
self.top_names = ['data_a', 'data_p', 'label_a', 'label_p']
params = eval(self.param_str)
# Check the paramameters for validity.
check_params(params)
# store input as class variables
self.batch_loader = BatchLoader(params)
self.batch_size = params['batch_size']
#1
self.pool = ThreadPool(processes=1)
self.thread_results = self.pool.apply_async(\
self.batch_loader.load_next_batch, ())
# reshape
top[0].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1])
top[1].reshape(2*params['batch_size'], 1, params['shape'][0], params['shape'][1])
top[2].reshape(params['batch_size'], 1) #label of anchor
top[3].reshape(2*params['batch_size'], 1) #label of pos and neg
if 'verbose' not in params:
print_info('Triplet data layer',params)
def setup(self,bottom,top):
#self.top_names = ['data_a', 'data_p', 'data_n', 'data_l']
self.top_names = ['data_a', 'data_p', 'data_n']
params = eval(self.param_str)
# Check the paramameters for validity.
check_params(params)
# store input as class variables
self.batch_loader = BatchLoader(params)
self.batch_size = params['batch_size']
#1
self.pool = ThreadPool(processes=1)
self.thread_results = self.pool.apply_async(\
self.batch_loader.load_next_batch, ())
self.batch_loader_refresh = False
# reshape
top[0].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1])
top[1].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1])
top[2].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1])
#top[3].reshape(params['batch_size'], 3) #label of anchor,pos & neg example
if 'verbose' not in params:
print_info('Triplet data layer',params)