def test_del_shutdown(self):
executor = futures.ProcessPoolExecutor(max_workers=5)
list(executor.map(abs, range(-5, 5)))
queue_management_thread = executor._queue_management_thread
processes = executor._processes
del executor
gc.collect()
queue_management_thread.join()
for p in processes:
p.join()
python类ProcessPoolExecutor()的实例源码
def with_process_pool_executor():
with ProcessPoolExecutor(10) as executor:
return list(executor.map(is_prime, PRIMES))
def __to_dp_matrix_mt(self, value_matrix):
from concurrent import futures
col_data_mapping = {}
try:
with futures.ProcessPoolExecutor(self.max_workers) as executor:
future_list = [
executor.submit(
_to_dp_list_helper, self, col_idx,
data_list, self.__get_col_type_hint(col_idx),
self.strip_str_value
)
for col_idx, data_list
in enumerate(zip(*value_matrix))
]
for future in futures.as_completed(future_list):
col_idx, value_dp_list = future.result()
col_data_mapping[col_idx] = value_dp_list
finally:
logger.debug("shutdown ProcessPoolExecutor")
executor.shutdown()
return list(zip(*[
col_data_mapping[col_idx] for col_idx in sorted(col_data_mapping)
]))
processPoolExe.py 文件源码
项目:Learning-Concurrency-in-Python
作者: PacktPublishing
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def main():
print("Starting ThreadPoolExecutor")
with ProcessPoolExecutor(max_workers=3) as executor:
future = executor.submit(task, (2))
future = executor.submit(task, (3))
future = executor.submit(task, (4))
print("All tasks complete")
processPool.py 文件源码
项目:Learning-Concurrency-in-Python
作者: PacktPublishing
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def main():
executor = ProcessPoolExecutor(max_workers=3)
task1 = executor.submit(task)
task2 = executor.submit(task)
def future_flush(self):
with ProcessPoolExecutor() as executor:
executor.submit(self.flush_to_file())
def flatten_json(file, proc_func):
'''A multi-processing wrapper for loading SQuAD data file.'''
with open(file) as f:
data = json.load(f)['data']
with ProcessPoolExecutor(max_workers=args.threads) as executor:
rows = executor.map(proc_func, data)
rows = sum(rows, [])
return rows
def executor():
return ProcessPoolExecutor()
def main(workers=None):
if workers:
workers = int(workers)
t0 = time.time()
with futures.ProcessPoolExecutor(workers) as executor:
actual_workers = executor._max_workers
to_do = (executor.submit(sha, SIZE) for i in range(JOBS))
for future in futures.as_completed(to_do):
res = future.result()
print(res)
print(STATUS.format(actual_workers, time.time() - t0))
def download_many(cc_list):
with futures.ProcessPoolExecutor() as executor: # <1>
res = executor.map(download_one, sorted(cc_list))
return len(list(res))
def process_messages(self, messages):
future_to_message = {}
processed = []
self.logger.debug('processing %d messages', len(messages))
for message in messages:
# ThreadPoolExecutor/ProcessPoolExecutor will throw a
# RuntimeException if we try to submit while it's shutting down.
# If we encounter a RuntimeError, immediately stop trying to
# submit new tasks; they will get requeued after the interval
# configured on the queue's policy.
try:
future = self.pool.submit(self.func, message)
except RuntimeError:
self.logger.exception('cannot submit jobs to pool')
raise
else:
future_to_message[future] = message
for future in futures.as_completed(future_to_message,
timeout=self.timeout):
message = future_to_message.pop(future)
try:
future.result()
except:
self.logger.exception('exception processing message %s',
message.message_id)
else:
processed.append(message)
return processed
def __init__(self, func, pool_size=4, timeout=None):
super(ProcessPoolWorker, self).__init__(func, pool_size=pool_size,
timeout=timeout)
self.pool = futures.ProcessPoolExecutor(max_workers=pool_size)
self.logger = get_logger(__name__)
def __init__(self):
self.crawl = CrawlProcess()
self.output = OutPutProcess()
self.crawl_pool = ProcessPoolExecutor(max_workers=8)
self.crawl_deep = 100 #????
self.crawl_cur_count = 0
def combine_preds_2_csv(preds1, preds2, fn_out='/tmp/combo.csv', wgts=None):
t1 = datetime.now()
print('start combination at ', t1)
executor = futures.ProcessPoolExecutor(max_workers=6)
add_pred_series_wgts = functools.partial(add_pred_series, wgts=wgts)
preds1 = sorted(preds1, key=lambda x: x[0])
preds2 = sorted(preds2, key=lambda x: x[0])
t2 = datetime.now()
print('sorted preds2 at ', t2)
lines = executor.map(add_pred_series_wgts, zip(preds1, preds2))
t2 = datetime.now()
print('finished adding lines at ', t2)
#print('Lines processed: {}'.format(len(lines)))
cnt = 0
with open(fn_out, 'w') as fout:
fout.write('VideoId,LabelConfidencePairs\n')
for line in lines:
fout.write(line+'\n')
cnt += 1
print('{} prediction lines were written to {}'.format(cnt, fn_out))
t3 = datetime.now()
print('finished combination at', t3)
print('Total run time: {}'.format(t3 - t1))
return None
def main(unused_argv):
print("tensorflow version: %s" % tf.__version__)
all_frame_files = gfile.Glob(FLAGS.input_data_pattern)
f_fullpath = all_frame_files[FLAGS.file_from : FLAGS.file_to]
f_fns = [x.split('/')[-1] for x in f_fullpath]
exist_files = gfile.Glob(FLAGS.output_path + "C*tfrecord")
exist_fn = [x.split('/')[-1].replace('CAtr', 'Atr') for x in exist_files]
yet_2_split = [x for x,y in zip(f_fullpath, f_fns) if y not in exist_fn]
vf = [FLAGS.output_path + 'C' + x.split('/')[-1] for x in yet_2_split]
mylog('number of files suggested: %d'%len(f_fullpath))
mylog('number of files yet to process: %d'%len(yet_2_split))
if FLAGS.parallel:
from concurrent import futures
executor = futures.ProcessPoolExecutor(max_workers=2)
executor.map(process_one_file, zip(yet_2_split, vf))
else:
for filenames in zip(yet_2_split, vf):
#mylog('processing: {}'.format(filenames))
process_one_file(filenames)
mylog("done")
def _benchmark_par(self):
n_jobs = config.n_jobs
with ProcessPoolExecutor(max_workers=n_jobs) as ex:
fs = [ex.submit(_dispatch_wrap, run) for run in self.runs]
self.runs = [f.result() for f in fs]
def find_all_robots(logdir):
'''
Find all hosts across and entire sequence of files
'''
files = glob.glob(logdir+"/*.log.gz")
all_robots = set()
with futures.ProcessPoolExecutor() as pool:
for robots in pool.map(find_robots, files):
all_robots.update(robots)
return all_robots
def test_context_manager_shutdown(self):
with futures.ProcessPoolExecutor(max_workers=5) as e:
processes = e._processes
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
for p in processes:
p.join()
def test_del_shutdown(self):
executor = futures.ProcessPoolExecutor(max_workers=5)
list(executor.map(abs, range(-5, 5)))
queue_management_thread = executor._queue_management_thread
processes = executor._processes
del executor
test.support.gc_collect()
queue_management_thread.join()
for p in processes:
p.join()
def main():
os.makedirs(localstor, exist_ok=True)
with ProcessPoolExecutor() as executor:
with ftputil.FTPHost('files.dlink.com.au', 'anonymous', '') as host:
with open('au_dlink_ftp_filelist.csv', 'w') as fout:
cw = csv.writer(fout)
cw.writerow(["ftpurl", "fsize", "fdate", "file_sha1", "file_md5"])
models = host.listdir('/Products/')
for model in models:
if not host.path.isdir('/Products/%(model)s'%locals()):
continue
if not re.match(r'[A-Z]+', model, re.I):
continue
revs = host.listdir('/Products/%(model)s/'%locals())
for rev in revs:
if not re.match(r'REV_\w+', rev, re.I):
continue
try:
fwitems = host.listdir('/Products/%(model)s/%(rev)s/Firmware/'%locals())
except:
continue
try:
for fwitem in fwitems:
print('visiting /Products/%(model)s/%(rev)s/Firmware/%(fwitem)s/'%locals())
try:
fw_files = host.path.listdir('/Products/%(model)s/%(rev)s/Firmware/%(fwitem)s/'%locals())
for fw_file in fw_files:
host.keep_alive()
executor.submit(download, '/Products/%(model)s/%(rev)s/Firmware/%(fwitem)s/%(fw_file)s'%locals())
except:
if host.path.isfile('/Products/%(model)s/%(rev)s/Firmware/%(fwitem)s'%locals()):
executor.submit(download,'/Products/%(model)s/%(rev)s/Firmware/%(fwitem)s'%locals())
except Exception as ex:
print(ex)