def hash(obj, hash_name='md5', coerce_mmap=False):
""" Quick calculation of a hash to identify uniquely Python objects
containing numpy arrays.
Parameters
-----------
hash_name: 'md5' or 'sha1'
Hashing algorithm used. sha1 is supposedly safer, but md5 is
faster.
coerce_mmap: boolean
Make no difference between np.memmap and np.ndarray
"""
if 'numpy' in sys.modules:
hasher = NumpyHasher(hash_name=hash_name, coerce_mmap=coerce_mmap)
else:
hasher = Hasher(hash_name=hash_name)
return hasher.hash(obj)
python类memmap()的实例源码
def get_mmap(X):
"""
converts a numpy array to
a numpy memmory mapped array
"""
#TODO: use tempfile.NamedTemporaryFile
if type(X) is np.core.memmap:
return X
fid = 0
filename = mmap_base+"data"+str(fid)+".dat"
for i in range(max_mmap_files):
if os.path.isfile(filename):
fid += 1
filename = mmap_base+"data"+str(fid)+".dat"
else:
break
_X = np.memmap(filename, dtype='float64', mode='w+', shape=X.shape)
_X[:] = X[:]
del X
import gc
gc.collect()
return _X
def append(self, array):
"""Append data from `array` to self."""
if self.closed:
raise ValueError('Array is not opened.')
if not self.initialized:
self.init_from_array(array)
if array.shape[1:] != self.shape[1:]:
raise ValueError("Appended array is of different shape.")
elif array.dtype != self.dtype:
raise ValueError("Appended array is of different dtype.")
# Append new data
pos = self.header_length + self.size * self.itemsize
self.fs.seek(pos)
self.fs.write(array.tobytes('C'))
self.shape = (self.shape[0] + len(array), ) + self.shape[1:]
# Only prepare the header bytes, need to be flushed to take effect
self._prepare_header_data()
# Invalidate the memmap
self._memmap = None
def shmem_client_send_env_id(self):
"""
Multiplayer Scene can support multiple kinds of environments (robots, actors).
For example, Stadium supports Hopper and Ant.
On server side, environment of the same type should be created. To do
that, we send env_id over pipe.
Obervations, actions must have size matching that on server. So we open shared memory
files at this point, after server created those files based on knowledge it now has,
and sent "accepted" back here.
"""
os.write(self.sh_pipe_actready, (self.spec.id + "\n").encode("ascii"))
check = self.sh_pipe_obsready.readline()[:-1]
assert(check=="accepted")
self.sh_obs = np.memmap(self.prefix + "_obs", mode="r+", shape=self.observation_space.shape, dtype=np.float32)
self.sh_act = np.memmap(self.prefix + "_act", mode="r+", shape=self.action_space.shape, dtype=np.float32)
self.sh_rew = np.memmap(self.prefix + "_rew", mode="r+", shape=(1,), dtype=np.float32)
self.sh_rgb = np.memmap(self.prefix + "_rgb", mode="r+", shape=(self.VIDEO_H,self.VIDEO_W,3), dtype=np.uint8)
def read_env_id_and_create_env(self):
self.sh_pipe_actready = open(self.sh_pipe_actready_filename, "rt")
self.sh_pipe_obsready = os.open(self.sh_pipe_obsready_filename, os.O_WRONLY)
env_id = self.sh_pipe_actready.readline()[:-1]
if env_id.find("-v")==-1:
raise ValueError("multiplayer client %s sent here invalid environment id '%s'" % (self.prefix, env_id))
#
# And at this point we know env_id.
#
print("Player %i connected, wants to operate %s in this scene" % (self.player_n, env_id))
self.env = gym.make(env_id) # gym.make() creates at least timeout wrapper, we need it.
self.env.unwrapped.scene = self.scene
self.env.unwrapped.player_n = self.player_n
assert isinstance(self.env.observation_space, gym.spaces.Box)
assert isinstance(self.env.action_space, gym.spaces.Box)
self.sh_obs = np.memmap(self.prefix + "_obs", mode="w+", shape=self.env.observation_space.shape, dtype=np.float32)
self.sh_act = np.memmap(self.prefix + "_act", mode="w+", shape=self.env.action_space.shape, dtype=np.float32)
self.sh_rew = np.memmap(self.prefix + "_rew", mode="w+", shape=(1,), dtype=np.float32)
self.sh_rgb = np.memmap(self.prefix + "_rgb", mode="w+", shape=(self.env.unwrapped.VIDEO_H,self.env.unwrapped.VIDEO_W,3), dtype=np.uint8)
os.write(self.sh_pipe_obsready, b'accepted\n')
def flush(self):
"""
Write any changes in the array to the file on disk.
For further information, see `memmap`.
Parameters
----------
None
See Also
--------
memmap
"""
if self.base is not None and hasattr(self.base, 'flush'):
self.base.flush()
def __init__(self, hash_name='md5', coerce_mmap=False):
"""
Parameters
----------
hash_name: string
The hash algorithm to be used
coerce_mmap: boolean
Make no difference between np.memmap and np.ndarray
objects.
"""
self.coerce_mmap = coerce_mmap
Hasher.__init__(self, hash_name=hash_name)
# delayed import of numpy, to avoid tight coupling
import numpy as np
self.np = np
if hasattr(np, 'getbuffer'):
self._getbuffer = np.getbuffer
else:
self._getbuffer = memoryview
def hash(obj, hash_name='md5', coerce_mmap=False):
""" Quick calculation of a hash to identify uniquely Python objects
containing numpy arrays.
Parameters
-----------
hash_name: 'md5' or 'sha1'
Hashing algorithm used. sha1 is supposedly safer, but md5 is
faster.
coerce_mmap: boolean
Make no difference between np.memmap and np.ndarray
"""
if 'numpy' in sys.modules:
hasher = NumpyHasher(hash_name=hash_name, coerce_mmap=coerce_mmap)
else:
hasher = Hasher(hash_name=hash_name)
return hasher.hash(obj)
def setSavePathFile(self, save=False, path_result=None):
a = np.zeros((max(1,self.zones), 1, 2), dtype=np.int32)
if save:
if path_result is None:
warnings.warn("Path file not set properly. Need to specify output file too")
else:
if path_result[-3:].lower() != 'aep':
dictio_name = path_result + '.aed'
path_result += '.aep'
else:
dictio_name = path_result[:-3] + 'aed'
if self.nodes > 0 and self.zones > 0:
a = np.memmap(path_result, dtype=np.int32, mode='w+', shape=(self.zones,self.nodes, 2))
saveDataFileDictionary(self.__graph_id__,'path file', [int(x) for x in a.shape[:]], dictio_name)
self.path_file = {'save': save,
'results': a
}
def save_portion(pars):
big_mov,d,tot_frames,fnames,idx_start,idx_end=pars
big_mov = np.memmap(big_mov, mode='r+', dtype=np.float32,shape=(d, tot_frames), order='C')
Ttot=0
Yr_tot=np.zeros((idx_end-idx_start,tot_frames))
print Yr_tot.shape
for f in fnames:
print f
Yr,dims,T=load_memmap(f)
print idx_start,idx_end
Yr_tot[:,Ttot:Ttot+T]=np.array(Yr[idx_start:idx_end])
Ttot=Ttot+T
del Yr
big_mov[idx_start:idx_end,:]=Yr_tot
del Yr_tot
print 'done'
del big_mov
return Ttot
#%%
#%%
memmap.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def flush(self):
"""
Write any changes in the array to the file on disk.
For further information, see `memmap`.
Parameters
----------
None
See Also
--------
memmap
"""
if self.base is not None and hasattr(self.base, 'flush'):
self.base.flush()
def __init__(self, maxlen, input_shape, action_size):
self.maxlen = maxlen
dirname = tempfile.mkdtemp()
#use memory maps so we won't have to worry about eating up lots of RAM
get_path = lambda name: os.path.join(dirname, name)
self.screens = np.memmap(get_path('screens'), dtype=np.float32, mode='w+', shape=tuple([self.maxlen]+input_shape))
self.actions = np.memmap(get_path('actions'), dtype=np.float32, mode='w+', shape=(self.maxlen, action_size))
self.rewards = np.memmap(get_path('rewards'), dtype=np.float32, mode='w+', shape=(self.maxlen,))
self.is_terminal = np.memmap(get_path('terminals'), dtype=np.bool, mode='w+', shape=(self.maxlen,))
self.position = 0
self.full = False
# def _get_states(batch):
# s = list()
# for i in xrange(-3, 2):
# s.append(self.screens[batch+i])
# return np.vstack(s[:-1]), np.vstack(s[1:])
def _strided_from_memmap(filename, dtype, mode, offset, order, shape, strides,
total_buffer_len):
"""Reconstruct an array view on a memory mapped file."""
if mode == 'w+':
# Do not zero the original data when unpickling
mode = 'r+'
if strides is None:
# Simple, contiguous memmap
return make_memmap(filename, dtype=dtype, shape=shape, mode=mode,
offset=offset, order=order)
else:
# For non-contiguous data, memmap the total enclosing buffer and then
# extract the non-contiguous view with the stride-tricks API
base = make_memmap(filename, dtype=dtype, shape=total_buffer_len,
mode=mode, offset=offset, order=order)
return as_strided(base, shape=shape, strides=strides)
def __init__(self, hash_name='md5', coerce_mmap=False):
"""
Parameters
----------
hash_name: string
The hash algorithm to be used
coerce_mmap: boolean
Make no difference between np.memmap and np.ndarray
objects.
"""
self.coerce_mmap = coerce_mmap
Hasher.__init__(self, hash_name=hash_name)
# delayed import of numpy, to avoid tight coupling
import numpy as np
self.np = np
if hasattr(np, 'getbuffer'):
self._getbuffer = np.getbuffer
else:
self._getbuffer = memoryview
def hash(obj, hash_name='md5', coerce_mmap=False):
""" Quick calculation of a hash to identify uniquely Python objects
containing numpy arrays.
Parameters
-----------
hash_name: 'md5' or 'sha1'
Hashing algorithm used. sha1 is supposedly safer, but md5 is
faster.
coerce_mmap: boolean
Make no difference between np.memmap and np.ndarray
"""
if 'numpy' in sys.modules:
hasher = NumpyHasher(hash_name=hash_name, coerce_mmap=coerce_mmap)
else:
hasher = Hasher(hash_name=hash_name)
return hasher.hash(obj)
def _gen_prediction_array(self, task, job, threading):
"""Generate prediction array either in-memory or persist to disk."""
shape = task.shape(job)
if threading:
self.job.predict_out = np.empty(shape, dtype=_dtype(task))
else:
f = os.path.join(self.job.dir, '%s_out_array.mmap' % task.name)
try:
self.job.predict_out = np.memmap(
filename=f, dtype=_dtype(task), mode='w+', shape=shape)
except Exception as exc:
raise OSError(
"Cannot create prediction matrix of shape ("
"%i, %i), size %i MBs, for %s.\n Details:\n%r" %
(shape[0], shape[1], 8 * shape[0] * shape[1] / (1024 ** 2),
task.name, exc))
def load_raw(path, prompt_info=None):
try:
info = load_info(path)
except FileNotFoundError as error:
if prompt_info is None:
raise error
else:
result = prompt_info()
if result is None:
return
else:
info, save = result
info = [info]
if save:
base, ext = _ospath.splitext(path)
info_path = base + '.yaml'
save_info(info_path, info)
dtype = _np.dtype(info[0]['Data Type'])
shape = (info[0]['Frames'], info[0]['Height'], info[0]['Width'])
movie = _np.memmap(path, dtype, 'r', shape=shape)
if info[0]['Byte Order'] != '<':
movie = movie.byteswap()
info[0]['Byte Order'] = '<'
return movie, info
def _init_in_memory_chunks(self, size):
available_mem = psutil.virtual_memory().available
required_mem = self._calculate_required_memory(size)
if required_mem <= available_mem:
self._in_memory_chunks = np.empty(shape=(size, self.data_producer.dimension()),
order='C', dtype=np.float32)
else:
if self.oom_strategy == 'raise':
self.logger.warning('K-means failed to load all the data (%s required, %s available) into memory. '
'Consider using a larger stride or set the oom_strategy to \'memmap\' which works '
'with a memmapped temporary file.'
% (bytes_to_string(required_mem), bytes_to_string(available_mem)))
raise MemoryError()
else:
self.logger.warning('K-means failed to load all the data (%s required, %s available) into memory '
'and now uses a memmapped temporary file which is comparably slow. '
'Consider using a larger stride.'
% (bytes_to_string(required_mem), bytes_to_string(available_mem)))
self._in_memory_chunks = np.memmap(tempfile.mkstemp()[1], mode="w+",
shape=(size, self.data_producer.dimension()), order='C',
dtype=np.float32)
def __init__(self, n_clusters, max_iter=5, metric='euclidean', tolerance=1e-5, init_strategy='kmeans++',
batch_size=0.2, oom_strategy='memmap', fixed_seed=False, stride=None, n_jobs=None, skip=0):
if stride is not None:
raise ValueError("stride is a dummy value in MiniBatch Kmeans")
if batch_size > 1:
raise ValueError("batch_size should be less or equal to 1, but was %s" % batch_size)
self._cluster_centers_iter = None
self._centers_iter_list = []
super(MiniBatchKmeansClustering, self).__init__(n_clusters, max_iter, metric,
tolerance, init_strategy, False,
oom_strategy, stride=stride, n_jobs=n_jobs, skip=skip)
self.set_params(batch_size=batch_size)
def flush(self):
"""
Write any changes in the array to the file on disk.
For further information, see `memmap`.
Parameters
----------
None
See Also
--------
memmap
"""
if self.base is not None and hasattr(self.base, 'flush'):
self.base.flush()