def client_context(dask_client=None, dask_scheduler=None):
'''client_context creates a dask distributed or threadpool client or None
Parameters:
dask_client: str from choices ("DISTRIBUTED", 'THREAD_POOL', 'SERIAL')
or None to take DASK_CLIENT from environment
dask_scheduler: Distributed scheduler url or None to take
DASK_SCHEDULER from environment
'''
env = parse_env_vars()
dask_client = dask_client or env.get('DASK_CLIENT', 'DISTRIBUTED')
dask_scheduler = dask_scheduler or env.get('DASK_SCHEDULER')
if dask_client == 'DISTRIBUTED':
client = Executor(dask_scheduler) if dask_scheduler else Executor()
elif dask_client == 'THREAD_POOL':
client = ThreadPool(env.get('DASK_THREADS'))
elif dask_client == 'SERIAL':
client = None
else:
raise ValueError('Did not expect DASK_CLIENT to be {}'.format(dask_client))
get_func = _find_get_func_for_client(client)
with da.set_options(pool=dask_client):
yield client
评论列表
文章目录