def update_row_param(self, phi_csr, mu0, c, v, r_prev, u_prev, phi_r, phi_u, num_process):
nrow = self.y_csr.shape[0]
# Update 'c' and 'v' block-wise in parallel.
if num_process == 1:
r, u = self.update_row_param_blockwise(self.y_csr, phi_csr, mu0, c, v, r_prev, u_prev, phi_r, phi_u)
else:
n_block = num_process
block_ind = np.linspace(0, nrow, 1 + n_block, dtype=int)
ru = joblib.Parallel(n_jobs=num_process)(
joblib.delayed(self.update_row_param_blockwise)(
self.y_csr[block_ind[m]:block_ind[m + 1], :],
phi_csr[block_ind[m]:block_ind[m + 1], :],
mu0, c, v,
r_prev[block_ind[m]:block_ind[m + 1]],
u_prev[block_ind[m]:block_ind[m + 1]],
phi_r[block_ind[m]:block_ind[m + 1]],
phi_u)
for m in range(n_block))
r = np.concatenate([ru_i[0] for ru_i in ru])
u = np.vstack([ru_i[1] for ru_i in ru])
return r, u
python类Parallel()的实例源码
matrix_factorization.py 文件源码
项目:probabilistic-matrix-factorization
作者: aki-nishimura
项目源码
文件源码
阅读 34
收藏 0
点赞 0
评论 0
def gen_batch_in_thread(img_map, df_cap, vocab_size, n_jobs=4,
size_per_thread=32):
imgs, curs, nxts, seqs, vhists = [], [], [], [], []
returns = Parallel(n_jobs=4, backend='threading')(
delayed(generate_batch)
(img_train, df_cap, vocab_size, size=size_per_thread)
for i in range(0, n_jobs))
for triple in returns:
imgs.extend(triple[0])
curs.extend(triple[1])
nxts.extend(triple[2])
seqs.extend(triple[3])
vhists.extend(triple[4])
return np.array(imgs), np.array(curs).reshape((-1, 1)), np.array(nxts), \
np.array(seqs), np.array(vhists)
def evaluate(input_path, n_jobs):
aud, ann = zip(*crema.utils.get_ann_audio(input_path))
test_idx = set(pd.read_json('index_test.json')['id'])
# drop anything not in the test set
ann = [ann_i for ann_i in ann if crema.utils.base(ann_i) in test_idx]
aud = [aud_i for aud_i in aud if crema.utils.base(aud_i) in test_idx]
stream = tqdm(zip(ann, aud), desc='Evaluating test set', total=len(ann))
results = Parallel(n_jobs=n_jobs)(delayed(track_eval)(ann_i, aud_i)
for ann_i, aud_i in stream)
df = pd.DataFrame.from_dict(dict(results), orient='index')
print('Results')
print('-------')
print(df.describe())
df.to_json(os.path.join(OUTPUT_PATH, 'test_scores.json'))
matrix_factorization.py 文件源码
项目:probabilistic-matrix-factorization
作者: aki-nishimura
项目源码
文件源码
阅读 40
收藏 0
点赞 0
评论 0
def update_col_param(self, phi_csc, mu0, r, u, c_prev, v_prev, phi_c, phi_v, num_process):
ncol = self.y_csc.shape[1]
if num_process == 1:
c, v = self.update_col_param_blockwise(self.y_csc, phi_csc, mu0, r, u, c_prev, v_prev, phi_c, phi_v)
else:
# Update 'c' and 'v' block-wise in parallel.
n_block = num_process
block_ind = np.linspace(0, ncol, 1 + n_block, dtype=int)
cv = joblib.Parallel(n_jobs=num_process)(
joblib.delayed(self.update_col_param_blockwise)(
self.y_csc[:, block_ind[m]:block_ind[m + 1]],
phi_csc[:, block_ind[m]:block_ind[m + 1]],
mu0, r, u,
c_prev[block_ind[m]:block_ind[m + 1]],
v_prev[block_ind[m]:block_ind[m + 1]],
phi_c[block_ind[m]:block_ind[m + 1]],
phi_v)
for m in range(n_block))
c = np.concatenate([cv_j[0] for cv_j in cv])
v = np.vstack([cv_j[1] for cv_j in cv])
return c, v
def _fit_single_job(self, job_list, X, y):
cv_results_ = {}
# for i, (train_index, test_index) in job_list:
# LOG.info("Training fold %d", i + 1)
#
# slave_result_ = self._worker(
# i, X, y, train_index, test_index)
#
# _build_cv_results(cv_results_, **slave_result_)
slave_results = jl.Parallel(n_jobs=self.n_jobs) \
(jl.delayed(_worker)(
self, i, X, y, train_index, test_index) for i, (
train_index, test_index) in job_list)
for slave_result_ in slave_results:
_build_cv_results(cv_results_, **slave_result_)
self.cv_results_ = cv_results_
def degreetocart(data_f1):
global df2
df2 = data_f1.copy()
print "phase 1"
df2['X'] = np.nan
df2['Y'] = np.nan
df2['Z'] = np.nan
df2 = df2.astype(float)
print "phase 2"
num_cores = multiprocessing.cpu_count()
results_x = Parallel(n_jobs=num_cores)(delayed(xloop)(i) for i in xrange(0,len(df2)))
print "phase 3"
#print results_x
#print results_x
#print " this is "
#print results_x[0]
results_y = Parallel(n_jobs=num_cores)(delayed(yloop)(i) for i in xrange(0,len(df2)))
print "phase 4"
results_z = Parallel(n_jobs=num_cores)(delayed(zloop)(i) for i in xrange(0,len(df2)))
print "phase 5"
#print results_y
#Parallel(n_jobs=num_cores)(delayed(adjloop)(i) for i in xrange(0,len(df2)))
for i in xrange(0,len(df2)):
print i
df2['X'][i] = results_x[i]
df2['Y'][i] = results_y[i]
df2['Z'][i] = results_z[i]
def get_distilled_labels(filenames):
result_labels = []
print("Creating labels")
result_labels = Parallel(n_jobs=num_cores)(delayed(make_label)(long_filename) for long_filename in tqdm(filenames))
return result_labels
# This function recives paths to images and lines from file with labels
# and returns only path to images that have corresponding label
def get_information(ws, x, label, num_of_bins, interval_information_display, model, layerSize,
calc_parallel=True, py_hats=0):
"""Calculate the information for the network for all the epochs and all the layers"""
print('Start calculating the information...')
bins = np.linspace(-1, 1, num_of_bins)
label = np.array(label).astype(np.float)
pys, pys1, p_y_given_x, b1, b, unique_a, unique_inverse_x, unique_inverse_y, pxs = extract_probs(label, x)
if calc_parallel:
params = np.array(Parallel(n_jobs=NUM_CORES
)(delayed(calc_information_for_epoch)
(i, interval_information_display, ws[i], bins, unique_inverse_x, unique_inverse_y,
label,
b, b1, len(unique_a), pys,
pxs, p_y_given_x, pys1, model.save_file, x.shape[1], layerSize)
for i in range(len(ws))))
else:
params = np.array([calc_information_for_epoch
(i, interval_information_display, ws[i], bins, unique_inverse_x, unique_inverse_y,
label, b, b1, len(unique_a), pys,
pxs, p_y_given_x, pys1, model.save_file, x.shape[1], layerSize)
for i in range(len(ws))])
return params
def _erpac(xp, xa, n_perm, n_jobs):
"""Sub erpac function
[xp] = [xa] = (npts, ntrials)
"""
npts, ntrials = xp.shape
# Compute ERPAC
xerpac = np.zeros((npts,))
for t in range(npts):
xerpac[t] = circ_corrcc(xp[t, :], xa[t, :])[0]
# Compute surrogates:
data = Parallel(n_jobs=n_jobs)(delayed(_erpacSuro)(
xp, xa, npts, ntrials) for pe in range(n_perm))
suro = np.array(data)
# Normalize erpac:
xerpac = (xerpac - suro.mean(0))/suro.std(0)
# Get p-value:
pvalue = norm.cdf(-np.abs(xerpac))*2
return xerpac, pvalue
def _fit(x, y, clf, cv, mf, grp, center, n_jobs):
"""Sub function for fitting
"""
# Check the inputs size :
x, y = checkXY(x, y, mf, grp, center)
rep, nfeat = len(cv), len(x)
# Tricks : construct a list of tuple containing the index of
# (repetitions,features) & loop on it. Optimal for parallel computing :
claIdx, listRep, listFeat = list2index(rep, nfeat)
# Run the classification :
cvs = Parallel(n_jobs=n_jobs)(delayed(_cvscore)(
x[k[1]], y, clone(clf), cv[k[0]]) for k in claIdx)
da, y_true, y_pred = zip(*cvs)
# Reconstruct elements :
da = np.array(groupInList(da, listFeat))
y_true = groupInList(y_true, listFeat)
y_pred = groupInList(y_pred, listFeat)
return da, x, y, y_true, y_pred
def _fit(x, y, train, test, self, n_jobs):
"""Sub fit function
"""
nsuj, nfeat = x.shape
iteract = product(range(nfeat), zip(train, test))
ya = Parallel(n_jobs=n_jobs)(delayed(_subfit)(
np.concatenate(tuple(x[i].iloc[k[0]])),
np.concatenate(tuple(x[i].iloc[k[1]])),
np.concatenate(tuple(y[0].iloc[k[0]])),
np.concatenate(tuple(y[0].iloc[k[1]])),
self) for i, k in iteract)
# Re-arrange ypred and ytrue:
ypred, ytrue = zip(*ya)
ypred = [np.concatenate(tuple(k)) for k in np.split(np.array(ypred), nfeat)]
ytrue = [np.concatenate(tuple(k)) for k in np.split(np.array(ytrue), nfeat)]
da = np.ravel([100*accuracy_score(ytrue[k], ypred[k]) for k in range(nfeat)])
return da, ytrue, ypred
def jobmap(func, INPUT_ITR, FLAG_PARALLEL=False, batch_size=None,
*args, **kwargs):
n_jobs = -1 if FLAG_PARALLEL else 1
dfunc = joblib.delayed(func)
with joblib.Parallel(n_jobs=n_jobs) as MP:
# Yield the whole thing if there isn't a batch_size
if batch_size is None:
for z in MP(dfunc(x, *args, **kwargs)
for x in INPUT_ITR):
yield z
raise StopIteration
ITR = iter(INPUT_ITR)
progress_bar = tqdm()
for block in grouper(ITR, batch_size):
MPITR = MP(dfunc(x, *args, **kwargs) for x in block)
for k,z in enumerate(MPITR):
yield z
progress_bar.update(k+1)
def process_and_evaluate(model, X, Y, k, n_jobs=1):
"""
Arguments:
X : query_id, query pairs
Y : dict of dicts (harvestable)
k : int how many to retrieve
"""
print("Starting query time with %d jobs" % n_jobs)
# TODO can we unzip Y and only pass the fucking chunk of y which
# it needs to harvest??
qids_rs = Parallel(n_jobs=n_jobs)(delayed(process_query)(model, x, Y, k)
for x in X)
print("Evaluating the results:")
scores = evaluate_results(qids_rs, Y, k)
return scores
def smooth(s,lengthscale,parallel=True):
"""smoothes s vertically"""
if len(s.shape) == 1:
s=s[...,None]
nChans = s.shape[1]
lengthscale=2*round(float(lengthscale)/2)
W = np.hamming(min(lengthscale,s.shape[0]))
W/= np.sum(W)
if s.shape[1]>1:
if parallel:
njobs=JOBLIB_NCORES
else:
njobs=1
slidingMean = (Parallel(n_jobs=njobs,backend=JOBLIB_BACKEND,temp_folder=JOBLIB_TEMPFOLDER)
(delayed(smoothLine)(s[:,chan],W) for chan in range(nChans)))
return np.array(slidingMean).T
else:
return smoothLine(s[:,0],W)[...,None]
def run(self, model, x, y=None, scoring=None, max_threads=1):
# get scorers
if scoring is not None:
if isinstance(scoring, list):
scorers_fn = dict([(self.get_scorer_name(k), get_scorer(k)) for k in scoring])
else:
scorers_fn = dict([(self.get_scorer_name(scoring), get_scorer(scoring))])
else:
# By default uses the model loss function as scoring function
scorers_fn = dict([(model.get_loss_func(), get_scorer(model.get_loss_func()))])
model_cfg = model.to_json()
if y is None:
args = [(model_cfg['model'], train, test, x, scorers_fn) for train, test in self.cv.split(x, y)]
cv_fn = self._do_unsupervised_cv
else:
args = [(model_cfg['model'], train, test, x, y, scorers_fn) for train, test in self.cv.split(x, y)]
cv_fn = self._do_supervised_cv
with Parallel(n_jobs=min(max_threads, len(args))) as parallel:
cv_results = parallel(delayed(function=cv_fn, check_pickle=False)(*a) for a in args)
return self._consolidate_cv_scores(cv_results)
def testParallel(parallel = True):
inputs = range(0, 1000, 1)
param = 1000
if parallel == True:
# parallel stuff
# This is reference code for parallel implementation
inputs = range(10)
num_cores = multiprocessing.cpu_count()
results = joblib.Parallel(n_jobs=num_cores)(joblib.delayed(childFunc)(i, param) for i in inputs)
else:
for i in inputs:
childFunc(i)
print(results)
def count_reads_in_windows(bed_file, args):
# type: (str, Namespace) -> List[pd.DataFrame]
chromosome_size_dict = args.chromosome_sizes
chromosomes = natsorted(list(chromosome_size_dict.keys()))
parallel_count_reads = partial(_count_reads_in_windows, bed_file, args)
info("Binning chromosomes {}".format(", ".join([c.replace("chr", "")
for c in chromosomes])))
chromosome_dfs = Parallel(n_jobs=args.number_cores)(
delayed(parallel_count_reads)(chromosome_size_dict[chromosome],
chromosome, strand)
for chromosome, strand in product(chromosomes, ["+", "-"]))
info("Merging the bins on both strands per chromosome.")
both_chromosome_strand_dfs = [df_pair
for df_pair in _pairwise(chromosome_dfs)]
merged_chromosome_dfs = Parallel(
n_jobs=args.number_cores)(delayed(merge_chromosome_dfs)(df_pair)
for df_pair in both_chromosome_strand_dfs)
return merged_chromosome_dfs
def count_reads_in_windows_paired_end(bed_file, args):
# type: (str, Namespace) -> List[pd.DataFrame]
chromosome_size_dict = args.chromosome_sizes
chromosomes = natsorted(list(chromosome_size_dict.keys()))
parallel_count_reads = partial(_count_reads_in_windows_paired_end,
bed_file, args)
info("Binning chromosomes {}".format(", ".join([c.replace("chr", "")
for c in chromosomes])))
chromosome_dfs = Parallel(n_jobs=args.number_cores)(
delayed(parallel_count_reads)(chromosome_size_dict[chromosome],
chromosome)
for chromosome in chromosomes)
return chromosome_dfs
def merge_same_files(sample1_dfs, sample2_dfs, nb_cpu):
# type: (List[pd.DataFrame], List[pd.DataFrame], int) -> List[pd.DataFrame]
# if one list is missing a chromosome, we might pair up the wrong dataframes
# therefore creating dicts beforehand to ensure they are paired up properly
d1, d2 = ensure_same_chromosomes_in_list(sample1_dfs,
sample2_dfs)
assert len(d1) == len(d2)
logging.info("Merging same class data.")
merged_chromosome_dfs = Parallel(n_jobs=nb_cpu)(delayed(_merge_same_files)(
d1[chromosome],
d2[chromosome]) for chromosome in d1.keys())
return merged_chromosome_dfs
def create_matrixes(chip, input, df, args):
# type: (Iterable[pd.DataFrame], Iterable[pd.DataFrame], pd.DataFrame, Namespace) -> List[pd.DataFrame]
"Creates matrixes which can be written to file as is (matrix) or as bedGraph."
genome = args.chromosome_sizes
chip = put_dfs_in_chromosome_dict(chip)
input = put_dfs_in_chromosome_dict(input)
all_chromosomes = natsorted(set(list(chip.keys()) + list(input.keys())))
# print("df1\n", df, file=sys.stderr)
islands = enriched_bins(df, args)
# print("islands1\n", islands, file=sys.stderr)
logging.info("Creating matrixes from count data.")
dfms = Parallel(n_jobs=args.number_cores)(delayed(_create_matrixes)(
chromosome, chip, input, islands, genome[chromosome],
args.window_size) for chromosome in all_chromosomes)
return dfms
def fit(self, X, y):
assert isinstance(X, list) #TODO: this should not be an assert
assert len(y) > 0
assert len(X) == len(y)
# TODO: add support for fitting again after having already performed a fit
self.n_labels_ = y.shape[1]
self.models_ = []
# Train one model per label. If no data is available for a given label, the model is set to None.
models, data = [], []
for idx in range(self.n_labels_):
d = [X[i] for i in np.where(y[:, idx] == 1)[0]]
if len(d) == 0:
model = None
else:
model = clone(self.model)
data.append(d)
models.append(model)
assert len(models) == len(data)
n_jobs = self.n_jobs if self.model.supports_parallel() else 1
self.models_ = Parallel(n_jobs=n_jobs)(delayed(_perform_fit)(models[i], data[i]) for i in range(len(models)))
assert len(self.models_) == self.n_labels_
preprocess_images.py 文件源码
项目:benchmark-keras
作者: beeva-ricardoguerrero
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def preprocess_images_multiprocess(path2dataset_orig, prefix_orig, path2dataset_dest, prefix_dest, img_rows, img_cols, img_crop_rows, img_crop_cols):
# Origin path = prefix + path -> /mnt/img/img393.JPEG
# Destiny path = prefix2 + path -> /mnt/h5/img393.h5
with open(path2dataset_orig, 'rb') as fin:
paths = fin.readlines()
num_total_paths = len(paths)
num_cores = multiprocessing.cpu_count()
processed_paths = Parallel(n_jobs=num_cores)(delayed(preprocess_images_worker) \
(line, prefix_orig, prefix_dest, img_rows, img_cols, img_crop_rows, img_crop_cols) for line in paths)
processed_paths = [elem for elem in processed_paths if elem]
with open(path2dataset_dest, "wb") as fout:
fout.writelines(processed_paths)
print("Total images pre-processed: %d (remember that corrupted or not present images were discarded)" % len(processed_paths))
def _unbound_tae_starter(tae, *args, **kwargs):
"""
Unbound function to be used by joblibs Parallel, since directly passing the
TAE results in pickling-problems.
Parameters
----------
tae: ExecuteTARun
tae to be used
*args, **kwargs: various
arguments to the tae
Returns
-------
tae_results: tuple
return from tae.start
"""
return tae.start(*args, **kwargs)
def label_by_dir(self, file_path, target_dir, dir_and_label, task_size=10):
label_dirs = dir_and_label.keys()
dirs = [d for d in os.listdir(target_dir)
if os.path.isdir(os.path.join(target_dir, d))
and d in label_dirs]
write_flg = True
for d in dirs:
self.logger.info(
"Extracting {} (labeled by {}).".format(d, dir_and_label[d]))
label = dir_and_label[d]
dir_path = os.path.join(target_dir, d)
pathes = [os.path.join(dir_path, f) for f in os.listdir(dir_path)]
pathes = [p for p in pathes if os.path.isfile(p)]
task_length = int(math.ceil(len(pathes) / task_size))
for i in xtqdm(range(task_length)):
index = i * task_size
tasks = pathes[index:(index + task_size)]
lines = Parallel(n_jobs=-1)(
delayed(self._make_pair)(label, t) for t in tasks)
mode = "w" if write_flg else "a"
with open(file_path, mode=mode, encoding="utf-8") as f:
for ln in lines:
f.write(ln)
write_flg = False
def get_graph_stats(graph_obj_handle, prop='degrees'):
# if prop == 'degrees':
num_cores = multiprocessing.cpu_count()
inputs = [int(i*len(graph_obj_handle)/num_cores) for i in range(num_cores)] + [len(graph_obj_handle)]
res = Parallel(n_jobs=num_cores)(delayed(get_values)(graph_obj_handle, inputs[i], inputs[i+1], prop) for i in range(num_cores))
stat_dict = {}
if 'degrees' in prop:
stat_dict['degrees'] = list(set([d for core_res in res for file_res in core_res for d in file_res['degrees']]))
if 'edge_labels' in prop:
stat_dict['edge_labels'] = list(set([d for core_res in res for file_res in core_res for d in file_res['edge_labels']]))
if 'target_mean' in prop or 'target_std' in prop:
param = np.array([file_res['params'] for core_res in res for file_res in core_res])
if 'target_mean' in prop:
stat_dict['target_mean'] = np.mean(param, axis=0)
if 'target_std' in prop:
stat_dict['target_std'] = np.std(param, axis=0)
return stat_dict
def get_Ddiff_row_deprecated(self, i):
from ..cython import utils_cy
if self.M is None:
m_i = utils_cy.get_M_row(i, self.evals, self.rbasis, self.lbasis)
else:
m_i = self.M[i]
len_chunk = np.ceil(self.X.shape[0] / self.n_jobs).astype(int)
n_chunks = np.ceil(self.X.shape[0] / len_chunk).astype(int)
chunks = [np.arange(start, min(start + len_chunk, self.X.shape[0]))
for start in range(0, n_chunks * len_chunk, len_chunk)]
if self.n_jobs >= 4: # problems with high memory calculations, we skip computing M above
# here backend threading is not necessary, and seems to slow
# down everything considerably
result_lst = Parallel(n_jobs=self.n_jobs)(
delayed(self._get_Ddiff_row_chunk)(m_i, chunk)
for chunk in chunks)
d_i = np.zeros(self.X.shape[0])
for i_chunk, chunk in enumerate(chunks):
if self.n_jobs >= 4: d_i_chunk = result_lst[i_chunk]
else: d_i_chunk = self._get_Ddiff_row_chunk(m_i, chunk)
d_i[chunk] = d_i_chunk
return d_i
twitter_account_checker.py 文件源码
项目:anomalous-vertices-detection
作者: Kagandi
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def batch_url_extractor(input_path, output_path):
last_id = False
if os.path.isfile(output_path):
last_id = get_last_written_id(output_path)
f = read_file(input_path)
for line_count, link in enumerate(f):
user_id = link[0].strip()
if last_id == user_id:
last_id = False
break
if last_id is False:
processes = Parallel(n_jobs=4)(
delayed(get_twitter_account_state)(user_id) for user_id in f)
processes = [x for x in processes if x is not None]
# if line_count % 10000 == 0:
append_list_to_csv(output_path, processes)
# write_to_file(output_path, two_dimensional_list_to_string(result))
def runStat(dataI, minPts, cut, cpu, fout, hichip=0):
"""
Calling p-values of interactions for all chromosomes.
"""
logger.info("Starting estimate significance for interactions")
ds = Parallel(n_jobs=cpu)(delayed(getIntSig)(
dataI[key]["f"], dataI[key]["records"], minPts, cut)
for key in dataI.keys())
ds = [d for d in ds if d is not None]
if len(ds) == 0:
logger.error("Something wrong, no loops found, sorry, bye.")
return 1
ds = pd.concat(ds)
try:
if hichip:
ds = markIntSigHic(ds)
else:
ds = markIntSig(ds)
ds.to_csv(fout + ".loop", sep="\t", index_label="loopId")
except:
logger.warning(
"Something wrong happend to significance estimation, only output called loops"
)
ds.to_csv(fout + "_raw.loop", sep="\t", index_label="loopId")
return 0
def h0_opt_test_stresstest(IP=None, stress_count=2000, stress_threads=50, **kwargs):
'''
Throw {stress_count} domains at the pihole via {stress_threads} threads
'''
from joblib import Parallel, delayed
top_array = open('topsites.txt').read().splitlines()
random.shuffle(top_array)
results = Parallel(n_jobs=stress_threads, backend='threading' )(delayed(dns_stress)(IP, site) for site in top_array[:stress_count])
good = sum( 1 for (a,b) in results if a == 'good' )
numbers = [ b for (a,b) in results if a == 'good' ]
bad = sum( 1 for (a,b) in results if a == 'bad' )
vmin = min(numbers)*1000
vmax = max(numbers)*1000
vavg = sum(numbers)*1000//len(numbers)
vstd = (sum(((n*1000) - vavg) ** 2 for n in numbers) / len(numbers)) ** .5
return not bad or (good/bad)>0.05, "{good}/{bad} min {vmin:.2f}ms avg {vavg:.2f}ms max {vmax:.2f}ms std {vstd:.2f}ms".format(**locals())
def get_X_y():
start = time.time()
X = []
y = []
for j in range(10):
print('Load folder c{}'.format(j))
path = os.path.join('imgs/train', 'c' + str(j), '*.jpg')
files = glob.glob(path)
X.extend(Parallel(n_jobs=-1)(delayed(process_image)(im_file) for im_file in files))
y.extend([j]*len(files))
end = time.time() - start
print("Time: %.2f seconds" % end)
return np.array(X), np.array(y)