def convert_dataset(path, filemap, name, num_processes, max_num_support, max_tokens, is_web=True):
with open(path, 'rb') as f:
dataset = pickle.load(f)
if num_processes == 1:
instances = process((dataset, filemap, max_num_support, max_tokens, is_web), True)
else:
chunk_size = 1000
executor = ProcessPoolExecutor(num_processes)
instances = []
i = 0
for processed in executor.map(
process, [(dataset[i * chunk_size:(i + 1) * chunk_size], filemap, max_num_support, max_tokens, is_web)
for i in range(len(dataset) // chunk_size + 1)]):
instances.extend(processed)
i += chunk_size
print("%d/%d done" % (min(len(dataset), i), len(dataset)))
return {"meta": {"source": name}, 'instances': instances}
python类ProcessPoolExecutor()的实例源码
def run(self):
cls = MultiprocessRunner
started = False
if self not in cls.running_instances:
cls.running_instances.add(self)
if not cls.executor_instance:
self.executor = futures.ProcessPoolExecutor(
max_workers=self.max_workers)
cls.executor_instance = self
started = True
elif not cls.executor_instance.running:
cls.executor_instance.executor = futures.ProcessPoolExecutor(
max_workers=self.max_workers)
started = True
self.executor = cls.executor_instance.executor
if started:
workers = self.executor._max_workers # Derived from cpu_count()
logging.debug(
_("Starting the MultiprocessRunner executor with %s worker "
"processes.") % workers)
poolImprovement.py 文件源码
项目:Learning-Concurrency-in-Python
作者: PacktPublishing
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def main():
t1 = timeit.default_timer()
with ProcessPoolExecutor(max_workers=4) as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
print("{} Seconds Needed for ProcessPoolExecutor".format(timeit.default_timer() - t1))
t2 = timeit.default_timer()
with ThreadPoolExecutor(max_workers=4) as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
print("{} Seconds Needed for ThreadPoolExecutor".format(timeit.default_timer() - t2))
t3 = timeit.default_timer()
for number in PRIMES:
isPrime = is_prime(number)
print("{} is prime: {}".format(number, isPrime))
print("{} Seconds needed for single threaded execution".format(timeit.default_timer()-t3))
def run_concurrently( queue ):
start = time.time()
cpus = mp.cpu_count()
qsize = queue.qsize()
procs = []
with ProcessPoolExecutor( cpus ) as executor:
for n in xrange( qsize ):
proc = mp.Process( target=run_plugin, args=( queue.get(),) )
procs.append( proc )
proc.start()
time.sleep( 0.05 )
for proc in procs:
proc.join()
time.sleep( 0.05 )
#end = '[+] Ends {:30} {}: {:.2f}s'.format( 'Concurrency of', qsize, 'tasks',time.time() - start)
t = '{:.2f}s'.format( time.time() - start )
end = '[+] Ends [ {} ] Concurrent Tasks'.format( qsize )
print ('\033[1;32;40m' + '{:35}--> {}{}'.format(end, t, '\n'))
print '{}{}'.format( '-' * 48, '\n' )
#print '{}{}{}{}'.format( end, '\n', '-' * 48, '\n' )
return
def main(workers=None):
if workers:
workers = int(workers)
t0 = time.time()
with futures.ProcessPoolExecutor(workers) as executor:
actual_workers = executor._max_workers
to_do = []
for i in range(JOBS, 0, -1):
size = SIZE + int(SIZE / JOBS * (i - JOBS/2))
job = executor.submit(arcfour_test, size, KEY)
to_do.append(job)
for future in futures.as_completed(to_do):
res = future.result()
print('{:.1f} KB'.format(res/2**10))
print(STATUS.format(actual_workers, time.time() - t0))
def save_month(year_month, verbose):
year, month = [int(s) for s in year_month.split('-')]
total_size = 0
img_count = 0
dates = potd.list_days_of_month(year, month)
with futures.ProcessPoolExecutor(max_workers=100) as executor:
downloads = dict((executor.submit(potd.save_one, date, verbose), date)
for date in dates)
for future in futures.as_completed(downloads):
date = downloads[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (date,
future.exception()))
else:
img_size = future.result()
total_size += img_size
img_count += 1
print('%r OK: %r' % (date, img_size))
return img_count, total_size
demo_process_pool_executor.py 文件源码
项目:SmallReptileTraining
作者: yanbober
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def runner(self):
process_pool = ProcessPoolExecutor(max_workers=4)
futures = dict()
for url in self.urls:
future = process_pool.submit(self.get_web_content, url)
futures[future] = url
for future in concurrent.futures.as_completed(futures):
url = futures[future]
try:
data = future.result()
except Exception as e:
print('Run process url ('+url+') error. '+str(e))
else:
print(url+'Request data ok. size='+str(len(data)))
print('Finished!')
def combine_pred_csv(fn1, fn2, fn_out='/tmp/combo.csv', wgts=None):
""" linear add the probabilities from two prediction.csv files.
inputs:
fn1, fn2: files to be combined.
fn_out: output file name
wgts: a list of two values, for example, [0.5, 0.5]
output:
no return values
"""
executor = futures.ProcessPoolExecutor(max_workers=2)
t1 = datetime.now()
print('start combination at ', t1)
preds1, preds2 = executor.map(file_2_series, (fn1, fn2))
t2 = datetime.now()
print('files read by', t2)
return combine_preds_2_csv(preds1, preds2, fn_out, wgts)
def hard_blow():
# Note: This method might perform more tasks then `num_tasks`,
# if the values of `num_tasks`, `num_threads` are chosen such that they
# are not multiples of `num_procs`
# TODO: WIP
num_tasks = 10000
num_procs = 4
threads_per_proc = 10
tasks_per_proc = int(math.ceil(num_tasks / num_procs))
futrs = []
with ProcessPoolExecutor(max_workers=num_procs) as pe:
for _ in range(num_procs):
fut = pe.submit(_task_for_proc, (threads_per_proc, tasks_per_proc))
futrs.append(fut)
print('Waiting for futures: main')
concurrent.futures.wait(futrs)
def get_fisher_vectors_from_folder(self, folder, limit):
"""
:param str folder: Folder Name
:param int limit: Number of images to read from each folder
:return: fisher vectors for images in given folder
:rtype: np.array
"""
files = glob.glob(folder + "/*.jpg")[:limit]
with ProcessPoolExecutor() as pool:
futures = pool.map(self._worker, files)
desc = 'Creating Fisher Vectors {} images of folder {}'.format(len(files), os.path.split(folder)[-1])
futures = tqdm.tqdm(futures, total=len(files), desc=desc, unit='image', ncols=120)
vectors = [f for f in futures if f is not None and len(f) > 0]
max_shape = np.array([v.shape[0] for v in vectors]).max()
vectors = [v for v in vectors if v.shape[0] == max_shape]
# return np.array(vectors) # Can't do np.float32, because all images may not have same number of features
return np.float32(vectors)
def folder(self, folder, limit):
"""
:param folder: Name of the folder containing images
:type folder: str
:param limit: Number of images to be read from given folder
:type limit: int
:return: List of descriptors of the given images
:rtype: np.array
"""
files = glob.glob(folder + "/*.jpg")[:limit]
with ProcessPoolExecutor() as executor:
futures = executor.map(self.image_file, files)
futures = tqdm.tqdm(futures, total=len(files), desc='Calculating descriptors')
descriptors = [f for f in futures]
# descriptors = [self.image_file(file) for file in files]
descriptors = list(filter(lambda x: x is not None, descriptors))
return np.concatenate(descriptors)
def __call__(self):
os.makedirs(self.output_folder, exist_ok=True)
chunk_slices = calculate_chunk_slices(self.videos_per_chunk,
len(self.adapter))
gulp_directory = GulpDirectory(self.output_folder)
new_chunks = gulp_directory.new_chunks(len(chunk_slices))
chunk_writer = ChunkWriter(self.adapter)
with ProcessPoolExecutor(max_workers=self.num_workers) as executor:
result = executor.map(chunk_writer.write_chunk,
new_chunks,
chunk_slices)
for r in tqdm(result,
desc='Chunks finished',
unit='chunk',
dynamic_ncols=True,
total=len(chunk_slices)):
pass
def __init__(
self,
device_list=[],
connection_list=[],
loop: asyncio.AbstractEventLoop() = None,
executor: futures.Executor() = None
):
if loop:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
if executor:
self.executor = executor
else:
self.executor = futures.ProcessPoolExecutor()
self.loop.set_default_executor(self.executor)
self.device_list = device_list
self.connection_list = connection_list
def fit(self, features, targets):
"""
Trains self.nb_trees number of decision trees.
:param features: Array-like object of shape (nb_samples, nb_features)
containing the training examples
:param targets: Array-like object of shape (nb_samples) containing the
training targets.
"""
if not self.nb_samples:
self.nb_samples = int(len(features) / 10)
with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
random_features = []
for x in range(self.nb_trees):
idxs = np.random.choice(np.arange(len(features)), self.nb_samples, replace=True)
try:
chosen_features = itemgetter(*idxs)(features)
chosen_targets = itemgetter(*idxs)(targets)
except:
chosen_features = features.iloc[idxs].as_matrix()
chosen_targets = targets.iloc[idxs].as_matrix()
random_features.append((x, chosen_features, chosen_targets))
self.trees = list(executor.map(self.train_tree, random_features))
def sched_downloads(d_set,dl_dir,num_threads,vids):
d_set_dir = dl_dir+'/'+d_set+'/'
# Make the directory for this dataset
check_call(' '.join(['mkdir', '-p', d_set_dir]), shell=True)
# Tell the user when downloads were started
datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Download and cut in parallel threads giving
with futures.ProcessPoolExecutor(max_workers=num_threads) as executor:
fs = [executor.submit(dl_and_cut,vid) for vid in vids]
for i, f in enumerate(futures.as_completed(fs)):
# Write progress to error so that it can be seen
sys.stderr.write( \
"Downloaded video: {} / {} \r".format(i, len(vids)))
print( d_set+': All videos downloaded' )
def simulate_walks(self,num_walks,walk_length):
# for large graphs, it is serially executed, because of memory use.
if(len(self.G) > 500000):
with ProcessPoolExecutor(max_workers=1) as executor:
job = executor.submit(generate_random_walks_large_graphs,num_walks,walk_length,self.workers,self.G.keys())
job.result()
else:
with ProcessPoolExecutor(max_workers=1) as executor:
job = executor.submit(generate_random_walks,num_walks,walk_length,self.workers,self.G.keys())
job.result()
return
def repeat(f, reps, cpus, **kwargs):
if reps == 1:
f(**kwargs)
return
fname = f.__name__
print("Starting {} {} times with:".format(fname, reps))
print(kwargs)
if cpus == 1:
for _ in range(reps):
try:
f(**kwargs)
except Exception as e:
warnings.warn(str(e))
else:
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor, as_completed
if cpus < 1:
cpus = cpu_count()
with ProcessPoolExecutor(cpus) as executor:
futures = [executor.submit(f, **kwargs) for _ in range(reps)]
for fut in as_completed(futures):
if fut.exception():
warnings.warn(str(fut.exception()))
print("Finished")
def start(self, slave_addr, task):
self._task = task
def _start(id, slave_addr, task):
from multiprocessing import Process
import multiprocessing
#multiprocessing.set_start_method('spawn')
Process(target=_worker_main, args=(id, slave_addr, task)).start()
from concurrent.futures import ProcessPoolExecutor
print("[Worker {0}] Create".format(self.id))
_start(self.id, slave_addr, task)
#executor = ProcessPoolExecutor()
#loop = asyncio.get_event_loop()
#asyncio.ensure_future(loop.run_in_executor(ProcessPoolExecutor(), _worker_main, self.id, slave_addr, task))
#asyncio.ensure_future(_start(self.id, slave_addr, task))
#yield from asyncio.sleep(10)
print("***")
def start(self, arbiter_host, arbiter_port, start_port, cores):
logger = logging.getLogger('cluster.worker')
logger.addHandler(MPLogHandler('/tmp/node.log'))
cpus = multiprocessing.cpu_count()
if cores > 0:
cores = min(cpus, cores)
else:
cores = max(1, cpus - cores)
logger.info('starting {} workers'.format(cores))
with ProcessPoolExecutor(max_workers=cores) as executor:
port = start_port
for _ in range(cores):
executor.submit(start_worker, arbiter_host, arbiter_port, port)
port += 1
while True:
pass
def assign_fitnesses(self, targets):
self._params['evals'] = len(targets)
px_parameters = zip([self._params['specification']] * len(targets),
[self._params['sequence']] * len(targets),
[self.parse_individual(x) for x in targets])
if (self._params['processors'] == 1) or (sys.platform == 'win32'):
fitnesses = map(self.evaluation_function, px_parameters)
else:
with futures.ProcessPoolExecutor(
max_workers=self._params['processors']) as executor:
fitnesses = executor.map(
self.evaluation_function, px_parameters)
tars_fits = list(zip(targets, fitnesses))
if 'log_params' in self._params:
if self._params['log_params']:
self.parameter_log.append(
[(self.parse_individual(x[0]), x[1]) for x in tars_fits])
for ind, fit in tars_fits:
ind.fitness.values = (fit,)
def main():
t1 = timeit.default_timer()
with ProcessPoolExecutor(max_workers=4) as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
print("{} Seconds Needed for ProcessPoolExecutor".format(timeit.default_timer() - t1))
t2 = timeit.default_timer()
with ThreadPoolExecutor(max_workers=4) as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
print("{} Seconds Needed for ThreadPoolExecutor".format(timeit.default_timer() - t2))
t3 = timeit.default_timer()
for number in PRIMES:
isPrime = is_prime(number)
print("{} is prime: {}".format(number, isPrime))
print("{} Seconds needed for single threaded execution".format(timeit.default_timer()-t3))
def build_from_path(in_dir, out_dir, num_workers=1, tqdm=lambda x: x):
executor = ProcessPoolExecutor(max_workers=num_workers)
futures = []
index = 1
for book in books:
with open(os.path.join(in_dir, book, 'sentence_index.txt')) as f:
for line in f:
parts = line.strip().split('\t')
if line[0] is not '#' and len(parts) == 8 and float(parts[3]) > _min_confidence:
wav_path = os.path.join(in_dir, book, 'wav', '%s.wav' % parts[0])
labels_path = os.path.join(in_dir, book, 'lab', '%s.lab' % parts[0])
text = parts[5]
task = partial(_process_utterance, out_dir, index, wav_path, labels_path, text)
futures.append(executor.submit(task))
index += 1
results = [future.result() for future in tqdm(futures)]
return [r for r in results if r is not None]
def __init__(self, config_file, poller_pool, annotator_pool):
super(PollerResource).__init__()
# Use process pollers as netsnmp is not behaving well using just threads
logging.debug('Starting poller pool ...')
self.poller_executor = futures.ProcessPoolExecutor(
max_workers=poller_pool)
# Start MIB resolver after processes above (or it will fork it as well)
logging.debug('Initializing MIB resolver ...')
import mibresolver
self.resolver = mibresolver
logging.debug('Starting annotation pool ...')
# .. but annotators are just CPU, so use lightweight threads.
self.annotator_executor = futures.ThreadPoolExecutor(
max_workers=annotator_pool)
self.config_file = config_file
def download_chunks(self, max_workers=30):
print('Will now download chunks.')
with ProcessPoolExecutor(max_workers=max_workers) as executor:
result = list(executor.map(self.download_chunk,
zip(self.urls,
self.md5sums,
self.output_files)))
DownloadResultProcessor.process_and_print(result)
def _process_with_multiprocessing(self, X: Union[pd.DataFrame, np.ndarray], n_refs: int, cluster_array: np.ndarray):
"""
Process calling of .calculate_gap() method using the multiprocessing library
"""
with ProcessPoolExecutor(max_workers=self.n_jobs) as executor:
jobs = [executor.submit(self._calculate_gap, X, n_refs, n_clusters)
for n_clusters in cluster_array
]
for future in as_completed(jobs):
gap_value, k = future.result()
yield (gap_value, k)
def parallel_varfeatures(lclist,
outdir,
maxobjects=None,
timecols=None,
magcols=None,
errcols=None,
mindet=1000,
lcformat='hat-sql',
nworkers=None):
'''
This runs varfeatures in parallel for all light curves in lclist.
'''
# make sure to make the output directory if it doesn't exist
if not os.path.exists(outdir):
os.makedirs(outdir)
if maxobjects:
lclist = lclist[:maxobjects]
tasks = [(x, outdir, timecols, magcols, errcols, mindet, lcformat)
for x in lclist]
with ProcessPoolExecutor(max_workers=nworkers) as executor:
resultfutures = executor.map(varfeatures_worker, tasks)
results = [x for x in resultfutures]
resdict = {os.path.basename(x):y for (x,y) in zip(lclist, results)}
return resdict
def parallel_cp(pfpicklelist,
outdir,
lcbasedir,
lclistpkl=None,
nbrradiusarcsec=30.0,
maxobjects=None,
lcformat='hat-sql',
timecols=None,
magcols=None,
errcols=None,
nworkers=32):
'''This drives the parallel execution of runcp for a list of periodfinding
result pickles.
'''
if not os.path.exists(outdir):
os.mkdir(outdir)
if maxobjects:
pfpicklelist = pfpicklelist[:maxobjects]
tasklist = [(x, outdir, lcbasedir,
{'lcformat':lcformat,
'timecols':timecols,
'magcols':magcols,
'errcols':errcols,
'lclistpkl':lclistpkl,
'nbrradiusarcsec':nbrradiusarcsec}) for
x in pfpicklelist]
resultfutures = []
results = []
with ProcessPoolExecutor(max_workers=nworkers) as executor:
resultfutures = executor.map(runcp_worker, tasklist)
results = [x for x in resultfutures]
executor.shutdown()
return results
def transform(self, io, **kwargs):
return self.executor_fork(ProcessPoolExecutor, io, **kwargs)
def test_custom_executor_class(self):
cfg = load("tests/data/test_config.yml")
obj = cfg.executor_class()
assert isinstance(obj, ProcessPoolExecutor)
def test_context_manager_shutdown(self):
with futures.ProcessPoolExecutor(max_workers=5) as e:
processes = e._processes
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
for p in processes:
p.join()