def pandas_parallel(df, func, nthreads, mp_type, split, *opts):
'''wrapper to run pandas apply function with multiprocessing. *opts will take any number of optional arguments to be passed
into the function. Note these will be passed to the function as a tuple and need to be parsed. '''
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
logger.info("Begin multiprocessing of function %s in a pool of %s workers using %s protocol" % (str(func.__name__), nthreads, mp_type))
if split == '':
logger.debug("*The dataframe will be split evenly across the %s workers" % nthreads)
df_split = np.array_split(df, min(nthreads, len(df.index)))
else:
logger.debug('*The dataframe will be split on the column %s' % split)
df_split = (df.loc[df[split] == sub, :] for sub in df[split].unique())
# Pack params for processing
if not opts:
params = df_split
elif len(opts) == 1:
params = zip(df_split, repeat(opts[0]))
elif len(opts) > 1:
params = zip(df_split, repeat(opts))
init = time.time()
try:
logger.debug("*Initializing a %s pool with %s workers" % (mp_type, nthreads))
pool = mp.Pool(nthreads, init_worker) # Initialize the pool
pool_func = getattr(pool, mp_type) # set the function (map, imap, etc)
if mp_type == "map_async":
these_res = pool_func(func, params).get()
out = pd.concat(these_res, axis=0)
elif mp_type in ["map", "imap", "imap_unordered"]:
these_res = pool_func(func, params)
out = pd.concat(these_res, axis=0) # this works even for iterables
pool.close()
except KeyboardInterrupt as e:
logger.error("Error: Keyboard interrupt")
pool.terminate()
raise e
except Exception as e:
logger.error("Exception: " + str(e))
pool.terminate()
traceback = sys.exc_info()[2]
raise_(ValueError, e, traceback)
finally:
pool.join()
logger.debug("*Time to run pandas_parallel on " + str(func.__name__) + " took %g seconds" % (time.time() - init))
return(out)
评论列表
文章目录