def __call__(self, a):
m = _get_backing_memmap(a)
if m is not None:
# a is already backed by a memmap file, let's reuse it directly
return _reduce_memmap_backed(a, m)
if (not a.dtype.hasobject
and self._max_nbytes is not None
and a.nbytes > self._max_nbytes):
# check that the folder exists (lazily create the pool temp folder
# if required)
try:
os.makedirs(self._temp_folder)
os.chmod(self._temp_folder, FOLDER_PERMISSIONS)
except OSError as e:
if e.errno != errno.EEXIST:
raise e
# Find a unique, concurrent safe filename for writing the
# content of this array only once.
basename = "%d-%d-%s.pkl" % (
os.getpid(), id(threading.current_thread()), hash(a))
filename = os.path.join(self._temp_folder, basename)
# In case the same array with the same content is passed several
# times to the pool subprocess children, serialize it only once
# XXX: implement an explicit reference counting scheme to make it
# possible to delete temporary files as soon as the workers are
# done processing this data.
if not os.path.exists(filename):
if self.verbose > 0:
print("Memmaping (shape=%r, dtype=%s) to new file %s" % (
a.shape, a.dtype, filename))
for dumped_filename in dump(a, filename):
os.chmod(dumped_filename, FILE_PERMISSIONS)
if self._prewarm:
# Warm up the data to avoid concurrent disk access in
# multiple children processes
load(filename, mmap_mode=self._mmap_mode).max()
elif self.verbose > 1:
print("Memmaping (shape=%s, dtype=%s) to old file %s" % (
a.shape, a.dtype, filename))
# The worker process will use joblib.load to memmap the data
return (load, (filename, self._mmap_mode))
else:
# do not convert a into memmap, let pickler do its usual copy with
# the default system pickler
if self.verbose > 1:
print("Pickling array (shape=%r, dtype=%s)." % (
a.shape, a.dtype))
return (loads, (dumps(a, protocol=HIGHEST_PROTOCOL),))
###############################################################################
# Enable custom pickling in Pool queues
评论列表
文章目录