def test_mp():
pool = mppool.Pool(4)
start_time = time.time()
lengths = pool.map(worker, range(4))
finish_time = time.time()
print('Multiprocessing: total_length={}, time={:.2f}s.'.format(sum(lengths), finish_time - start_time))
python类Pool()的实例源码
def store_samples(self, directory, preprocess_fnc):
"""
Read audio files from `directory` and store the preprocessed version in preprocessed/`directory`
Args:
directory: the sub-directory to read from
preprocess_fnc: The preprocessing function to use
"""
out_directory = self._get_directory(preprocess_fnc, directory)
if not os.path.exists(out_directory):
os.makedirs(out_directory)
audio_files = list(iglob_recursive(self._data_directory + '/' + directory, '*.flac'))
with Pool(processes=multiprocessing.cpu_count()) as pool:
transcript_dict = self._transcript_dict
for audio_file in audio_files:
audio_id = self._extract_audio_id(audio_file)
transcript_entry = transcript_dict[audio_id]
transform_args = (audio_file, preprocess_fnc, transcript_entry, out_directory)
pool.apply_async(SpeechCorpusReader._transform_and_store_sample, transform_args,
error_callback=self._preprocessing_error_callback)
pool.close()
pool.join()
def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
'''
Returns a process pool object
'''
from multiprocessing.pool import Pool
return Pool(processes, initializer, initargs, maxtasksperchild)
def gen_set():
img_files = [files for _, _, files in os.walk(BASEDIR)]
with Pool(processes=pool_size) as pool:
for idx, image in enumerate(img_files):
image_path = os.path.join(BASEDIR, image)
for sample_options in ALG:
pool.apply(save_training_sampl, (image_path, idx, sample_options))
def store_training(out):
start = time.time()
img_files = [files for _, _, files in os.walk(TRAINDIR)]
train_combs = itertools.combinations(img_files, 2)
with Pool(processes=pool_size) as pool:
vectors = pool.map(get_images_diff_vectors, train_combs)
df = pd.DataFrame(vectors)
df.to_csv(out, index=False)
finished = time.time() - start
click.echo("Finised in %s seconds" % finished)
def process_mp(texts, args, pool=None):
if pool is None:
pool = mp.Pool(args.n_processes,
initializer=mp_initialize,
initargs=(args, ))
iterator = chunks(enumerate(texts),
n=args.n_processes,
k=args.n_processes * 1000)
if args.progress:
t = tqdm.tqdm()
else:
t = None
results = []
for batches in iterator:
n_items = sum(len(x) for x in batches)
result = pool.map_async(process, batches)
result = result.get()
result = [i for batch in result for i in batch]
result.sort(key=lambda x: x[0])
idx, result = zip(*result)
results.extend(result)
if args.progress:
t.update(n_items)
return results
def pool(md5s):
pool=Pool(processes=10)
pool.map(downloader,md5s)
pool.close()
pool.join()
def kill_tasks(tasks, tags=None):
#
import glob
from multiprocessing.pool import ThreadPool as Pool
if not tasks:
tasks = glob.glob(os.path.join(os.path.expanduser('~'), '.sos', 'tasks', '*.task'))
all_tasks = [os.path.basename(x)[:-5] for x in tasks]
else:
all_tasks = []
for t in tasks:
matched = glob.glob(os.path.join(os.path.expanduser('~'), '.sos', 'tasks', f'{t}*.task'))
matched = [os.path.basename(x)[:-5] for x in matched]
if not matched:
env.logger.warning(f'{t} does not match any existing task')
else:
all_tasks.extend(matched)
if tags:
all_tasks = [x for x in all_tasks if any(x in tags for x in taskTags(x).split(' '))]
if not all_tasks:
env.logger.warning('No task to kill')
return
all_tasks = sorted(list(set(all_tasks)))
p = Pool(len(all_tasks))
killed = p.map(kill_task, all_tasks)
for s, t in zip(killed, all_tasks):
print(f'{t}\t{s}')
def init_pool(self, worker_count):
return Pool(worker_count)
def add_extracted_text(xs):
with Pool() as pool:
for doc, features in zip(
xs, pool.imap(extract_features, xs, chunksize=10)):
doc.update(features)
def build(in_dir, out_file, pool_size):
with closing(AbstractDB(out_file, protocol=-1)) as db:
target_files = [f for f in sorted(os.listdir(in_dir)) if f.endswith('ttl.gz')]
with closing(Pool(pool_size)) as pool:
f = partial(_process_file, in_dir=in_dir)
for ret in pool.imap(f, target_files):
for (key, obj) in ret:
db[key] = obj
def pool(self):
return Pool(
processes=self.processes,
initializer=initializer,
initargs=self.initargs,
maxtasksperchild=self.maxtasks,
context=get_context('forkserver'),
)
def run_pp(n, body):
"""Process Pool.map"""
from multiprocessing.pool import Pool
global reused_pool, numthreads
global args
if 'reused_pool' not in globals():
log.debug("Creating Pool(%s)" % numthreads)
reused_pool = Pool(int(numthreads))
reused_pool.map(body, n)
def main(directory, convert_directory, test, crop_size, extension):
try:
os.mkdir(convert_directory)
except OSError:
pass
filenames = [os.path.join(dp, f) for dp, dn, fn in os.walk(directory)
for f in fn if f.endswith('jpeg') or f.endswith('tiff')]
filenames = sorted(filenames)
if test:
names = data.get_names(filenames)
y = data.get_labels(names)
for f, level in zip(filenames, y):
if level == 1:
try:
img = convert(f, crop_size)
img.show()
Image.open(f).show()
real_raw_input = vars(__builtins__).get('raw_input',input)
real_raw_input('enter for next')
except KeyboardInterrupt:
exit(0)
print("Resizing images in {} to {}, this takes a while."
"".format(directory, convert_directory))
n = len(filenames)
# process in batches, sometimes weird things happen with Pool on my machine
batchsize = 500
batches = n // batchsize + 1
pool = Pool(N_PROC)
args = []
for f in filenames:
args.append((convert, (directory, convert_directory, f, crop_size,
extension)))
for i in range(batches):
print("batch {:>2} / {}".format(i + 1, batches))
pool.map(process, args[i * batchsize: (i + 1) * batchsize])
pool.close()
print('done')
def query_all_tweets(query):
"""
Queries *all* tweets in the history of twitter for the given query. This
will run in parallel for each ~10 days.
:param query: A twitter advanced search query.
:return: A list of tweets.
"""
year = 2006
month = 3
limits = []
while date(year=year, month=month, day=1) < date.today():
nextmonth = month + 1 if month < 12 else 1
nextyear = year + 1 if nextmonth == 1 else year
limits.append(
(date(year=year, month=month, day=1),
date(year=year, month=month, day=10))
)
limits.append(
(date(year=year, month=month, day=10),
date(year=year, month=month, day=20))
)
limits.append(
(date(year=year, month=month, day=20),
date(year=nextyear, month=nextmonth, day=1))
)
year, month = nextyear, nextmonth
queries = ['{} since:{} until:{}'.format(query, since, until)
for since, until in reversed(limits)]
pool = Pool(20)
all_tweets = []
try:
for new_tweets in pool.imap_unordered(query_tweets_once, queries):
all_tweets.extend(new_tweets)
logging.info("Got {} tweets ({} new).".format(
len(all_tweets), len(new_tweets)))
except KeyboardInterrupt:
logging.info("Program interrupted by user. Returning all tweets "
"gathered so far.")
return sorted(all_tweets)
def main(directory, convert_directory, test, crop_size, extension):
try:
os.mkdir(convert_directory)
except OSError:
pass
supported_extensions = set(['jpg', 'png', 'tiff', 'jpeg', 'tif'])
filenames = [os.path.join(dp, f) for dp, dn, fn in os.walk(directory)
for f in fn if f.split('.')[-1].lower() in supported_extensions]
filenames = sorted(filenames)
if test:
names = data.get_names(filenames)
y = data.get_labels(names)
for f, level in zip(filenames, y):
if level == 1:
try:
img = convert(f, crop_size)
img.show()
Image.open(f).show()
real_raw_input = vars(__builtins__).get('raw_input', input)
real_raw_input('enter for next')
except KeyboardInterrupt:
exit(0)
print("Resizing images in {} to {}, this takes a while."
"".format(directory, convert_directory))
n = len(filenames)
# process in batches, sometimes weird things happen with Pool on my machine
batchsize = 500
batches = n // batchsize + 1
pool = Pool(N_PROC)
args = []
for f in filenames:
args.append((convert, (directory, convert_directory, f, crop_size,
extension)))
for i in range(batches):
print("batch {:>2} / {}".format(i + 1, batches))
pool.map(process, args[i * batchsize: (i + 1) * batchsize])
pool.close()
print('done')
def main(directory, convert_directory, test, crop_size, extension):
try:
os.mkdir(convert_directory)
except OSError:
pass
supported_extensions = set(['jpg', 'png', 'tiff', 'jpeg', 'tif'])
# filenames = [os.path.join(dp, f) for dp, dn, fn in os.walk(directory)
# for f in fn if f.split('.')[-1].lower() in supported_extensions]
filenames = [each for each in os.listdir(
directory) if each.endswith('.jpg')]
filenames = [os.path.join(directory, filename.strip(
'\n')) for filename in filenames]
# with open('/home/artelus_server/data/segment_artelus/train.txt', 'r') as f:
# filenames = f.readlines()
# filenames = [os.path.join(directory, filename.strip(
# '\n') + '.jpg') for filename in filenames]
filenames = sorted(filenames)
if test:
names = data.get_names(filenames)
y = data.get_labels(names)
for f, level in zip(filenames, y):
if level == 1:
try:
img = convert(f, crop_size)
img.show()
Image.open(f).show()
real_raw_input = vars(__builtins__).get('raw_input', input)
real_raw_input('enter for next')
except KeyboardInterrupt:
exit(0)
print("Resizing images in {} to {}, this takes a while."
"".format(directory, convert_directory))
n = len(filenames)
# process in batches, sometimes weird things happen with Pool on my machine
batchsize = 500
batches = n // batchsize + 1
pool = Pool(N_PROC)
args = []
for f in filenames:
label_f = f[:-4] + '_final_mask.png'
args.append((convert, (directory, convert_directory, f, label_f, crop_size,
extension)))
for i in range(batches):
print("batch {:>2} / {}".format(i + 1, batches))
pool.map(process, args[i * batchsize: (i + 1) * batchsize])
pool.close()
print('done')
def fit(self, X, y):
labels = list(set(y))
if len(labels) != 2:
raise Exception("A binary setup is required")
min_count = X.shape[0]
self._min_label = None
for label in labels:
count = list(y).count(label)
if count <= min_count:
min_count = count
self._min_label = label
if self._reference_label is None:
self._reference_label = self._min_label
if not self._reference_label in labels:
raise Exception("Reference label does not appear in training data")
if min_count >= self._n_folds:
cv = cross_validation.StratifiedKFold(y, n_folds=min(X.shape[0], self._n_folds), shuffle=True,
random_state=self._seed)
else:
cv = cross_validation.KFold(X.shape[0], n_folds=min(X.shape[0], self._n_folds), shuffle=True,
random_state=self._seed)
tp = 0
fp = 0
ptp = 0
pfn = 0
pfp = 0
ptn = 0
pool = Pool(processes=10)
requests = list()
for train_cv, test_cv in cv:
requests.append((X, y, train_cv, test_cv))
for tp, fp, ptp, pfn, pfp, ptn in pool.map(self._fit_fold, requests):
tp += tp
fp += fp
ptp += ptp
pfn += ptn
pfp += pfp
ptn += ptn
pool.close()
positives = min_count
negatives = X.shape[0] - positives
self._tpr = tp / positives
self._fpr = fp / negatives
self._ptpr = ptp / (ptp + pfn)
self._pfpr = pfp / (pfp + ptn)
self._clf.fit(X, y)
if self._clf.classes_[0] == self._min_label:
self._pos_idx = 0
self._neg_idx = 1
else:
self._neg_idx = 0
self._pos_idx = 1
def __call__(self, a):
m = _get_backing_memmap(a)
if m is not None:
# a is already backed by a memmap file, let's reuse it directly
return _reduce_memmap_backed(a, m)
if (not a.dtype.hasobject
and self._max_nbytes is not None
and a.nbytes > self._max_nbytes):
# check that the folder exists (lazily create the pool temp folder
# if required)
try:
os.makedirs(self._temp_folder)
os.chmod(self._temp_folder, FOLDER_PERMISSIONS)
except OSError as e:
if e.errno != errno.EEXIST:
raise e
# Find a unique, concurrent safe filename for writing the
# content of this array only once.
basename = "%d-%d-%s.pkl" % (
os.getpid(), id(threading.current_thread()), hash(a))
filename = os.path.join(self._temp_folder, basename)
# In case the same array with the same content is passed several
# times to the pool subprocess children, serialize it only once
# XXX: implement an explicit reference counting scheme to make it
# possible to delete temporary files as soon as the workers are
# done processing this data.
if not os.path.exists(filename):
if self.verbose > 0:
print("Memmaping (shape=%r, dtype=%s) to new file %s" % (
a.shape, a.dtype, filename))
for dumped_filename in dump(a, filename):
os.chmod(dumped_filename, FILE_PERMISSIONS)
if self._prewarm:
# Warm up the data to avoid concurrent disk access in
# multiple children processes
load(filename, mmap_mode=self._mmap_mode).max()
elif self.verbose > 1:
print("Memmaping (shape=%s, dtype=%s) to old file %s" % (
a.shape, a.dtype, filename))
# The worker process will use joblib.load to memmap the data
return (load, (filename, self._mmap_mode))
else:
# do not convert a into memmap, let pickler do its usual copy with
# the default system pickler
if self.verbose > 1:
print("Pickling array (shape=%r, dtype=%s)." % (
a.shape, a.dtype))
return (loads, (dumps(a, protocol=HIGHEST_PROTOCOL),))
###############################################################################
# Enable custom pickling in Pool queues
def multithread_failsafe(fn, args=[[]], pool_type=Pool,
processes=_cpus, maxtasksperchild=1, chunksize=1,
verbose=True):
'''Aynchronous multithreading that does not break on individual errors.
Instead, prints error and message, and the input is disregarded
Unfortunately, due to context-management restrictions, (as far as I can
tell) both generators are needed even though the only difference is the
maxtasksperchild arg'''
'''Generators that yield next completed task. While execution of individual
tasks is asynchronous, iterating through the results is not'''
def process_generator(pool_type):
with pool_type(processes, maxtasksperchild=maxtasksperchild) as pool:
result_objs = (pool.apply_async(fn, arg) for arg in args)
for r in result_objs:
try:
yield r.get()
except GeneratorExit as g:
raise g
except:
if verbose:
print('######BEGIN TRACEBACK######')
traceback.print_exc()
print('######END TRACEBACK######')
print()
continue
def thread_generator(pool_type):
with pool_type(processes) as pool:
result_objs = (pool.apply_async(fn, arg) for arg in args)
for r in result_objs:
try:
yield r.get()
except GeneratorExit as g:
raise g
except:
if verbose:
print('######BEGIN TRACEBACK######')
traceback.print_exc()
print('######END TRACEBACK######')
print()
continue
# ThreadPools do not take a maxtasksperchild argument,
# so we need to conditionally construct a generator
if issubclass(pool_type, ThreadPool):
return thread_generator(pool_type)
else:
return process_generator(pool_type)