def dump(self, target):
"""Serializes MPArray to :code:`h5py.Group`. Recover using
:func:`~load`.
:param target: :code:`h5py.Group` the instance should be saved to or
path to h5 file (it's then serialized to /)
"""
if isinstance(target, str):
import h5py
with h5py.File(target, 'w') as outfile:
return self.dump(outfile)
for prop in ('ranks', 'shape'):
# these are only saved for convenience
target.attrs[prop] = str(getattr(self, prop))
# these are actually used in MPArray.load
target.attrs['len'] = len(self)
target.attrs['canonical_form'] = self.canonical_form
for site, lten in enumerate(self._lt):
target[str(site)] = lten
python类Group()的实例源码
def group_sites(self, sites_per_group):
"""Group several MPA sites into one site.
The resulting MPA has length ``len(self) // sites_per_group`` and
``sites_per_group * self.ndims[i]`` physical legs on site ``i``. The
physical legs on each sites are in local form.
:param int sites_per_group: Number of sites to be grouped into one
:returns: An MPA with ``sites_per_group`` fewer sites and more ndims
"""
if (len(self) % sites_per_group) != 0:
raise ValueError('Cannot group: {} not a multiple of {}'
.format(len(self), sites_per_group))
if sites_per_group == 1:
return self
ltens = [_ltens_to_array(self._lt[i:i + sites_per_group])
for i in range(0, len(self), sites_per_group)]
return MPArray(ltens)
def __getitem__(self, key):
if type(key) is int:
raise NotImplementedError('Iteration not supported.')
with h5py.File(self.hdf5_dbase_root, 'r') as hf:
grp = hf[self.top_level_path]
if key not in grp:
raise IndexError('{} not found in {} filetype'.format(key, self.top_level_path))
ds = grp[key]
if isinstance(ds, h5py.Group):
data = DataIndexer(self.hdf5_dbase_root, '/'.join([self.top_level_path, key]))
else:
if ds.attrs['unit'] == 'SKIP' or ds.dtype == 'object':
data = np.array(ds, dtype=ds.dtype)
else:
data = u.Quantity(np.array(ds), ds.attrs['unit'], dtype=ds.dtype)
if '|S' in data.dtype.str:
data = data.astype(str)
return data
def save_hdf5(filename, obj, compression=4):
"""Saves an object to the file in HDF5 format.
This is a short-cut function to save only one object into an HDF5 file. If
you want to save multiple objects to one HDF5 file, use
:class:`HDF5Serializer` directly by passing appropriate :class:`h5py.Group`
objects.
Args:
filename (str): Target file name.
obj: Object to be serialized. It must support serialization protocol.
compression (int): Gzip compression level.
"""
_check_available()
with h5py.File(filename, 'w') as f:
s = HDF5Serializer(f, compression=compression)
s.save(obj)
def load_hdf5(filename, obj):
"""Loads an object from the file in HDF5 format.
This is a short-cut function to load from an HDF5 file that contains only
one object. If you want to load multiple objects from one HDF5 file, use
:class:`HDF5Deserializer` directly by passing appropriate
:class:`h5py.Group` objects.
Args:
filename (str): Name of the file to be loaded.
obj: Object to be deserialized. It must support serialization protocol.
"""
_check_available()
with h5py.File(filename, 'r') as f:
d = HDF5Deserializer(f)
d.load(obj)
def _visitfunc(self, name, node):
level = len(name.split('/'))
indent = ' '*4*(level-1)
#indent = '<span style="color:blue;">'.format(level*4)
localname = name.split('/')[-1]
#search_text = self.settings['search_text'].lower()
search_text = self.search_text
if search_text and (search_text in localname.lower()):
localname = """<span style="color: red;">{}</span>""".format(localname)
if isinstance(node, h5py.Group):
self.tree_str += indent +"|> <b>{}/</b><br/>".format(localname)
elif isinstance(node, h5py.Dataset):
self.tree_str += indent +"|D <b>{}</b>: {} {}<br/>".format(localname, node.shape, node.dtype)
for key, val in node.attrs.items():
if search_text:
if search_text in str(key).lower():
key = """<span style="color: red;">{}</span>""".format(key)
if search_text in str(val).lower():
val = """<span style="color: red;">{}</span>""".format(val)
self.tree_str += indent+" |- <i>{}</i> = {}<br/>".format(key, val)
def setup_openpmd_species_record( self, grp, quantity ) :
"""
Set the attributes that are specific to a species record
Parameter
---------
grp : an h5py.Group object or h5py.Dataset
The group that correspond to `quantity`
(in particular, its path must end with "/<quantity>")
quantity : string
The name of the record being setup
e.g. "position", "momentum"
"""
# Generic setup
self.setup_openpmd_record( grp, quantity )
# Weighting information
grp.attrs["macroWeighted"] = macro_weighted_dict[quantity]
grp.attrs["weightingPower"] = weighting_power_dict[quantity]
def h5py_dataset_iterator(self,g, prefix=''):
for key in g.keys():
item = g[key]
path = '{}/{}'.format(prefix, key)
keys = [i for i in item.keys()]
if isinstance(item[keys[0]], h5py.Dataset): # test for dataset
data = {'path':path}
for k in keys:
if not isinstance(item[k], h5py.Group):
dataset = np.array(item[k].value)
if type(dataset) is np.ndarray:
if dataset.size != 0:
if type(dataset[0]) is np.bytes_:
dataset = [a.decode('ascii') for a in dataset]
data.update({k:dataset})
yield data
else: # test for group (go down)
yield from self.h5py_dataset_iterator(item, path)
def get_data(self, path, prefix=''):
item = self.store[path]
path = '{}/{}'.format(prefix, path)
keys = [i for i in item.keys()]
data = {'path': path}
# print(path)
for k in keys:
if not isinstance(item[k], h5py.Group):
dataset = np.array(item[k].value)
if type(dataset) is np.ndarray:
if dataset.size != 0:
if type(dataset[0]) is np.bytes_:
dataset = [a.decode('ascii') for a in dataset]
data.update({k: dataset})
return data
def test_cache(self):
# create main test file
filename = self.getFileName("create_group_cache")
print("filename:", filename)
f = h5py.File(filename, 'w', use_cache=True)
self.assertTrue('/' in f)
r = f['/']
self.assertEqual(len(r), 0)
self.assertTrue(isinstance(r, h5py.Group))
self.assertTrue(r.name, '/')
self.assertEqual(len(r.attrs.keys()), 0)
self.assertFalse('g1' in r)
g1 = r.create_group('g1')
self.assertEqual(len(r), 1)
file = g1.file
f.close()
def __cal_firing_rate(self,data,channel,bin_size,overlap,pre_time,post_time):
bins_left = [pre_time]
while bins_left[-1] < post_time:
bins_left.append(bins_left[-1]+bin_size-overlap)
bins_left = np.array(bins_left)
bins_right = bins_left+bin_size
bins_mean = (bins_left+bins_right)/2.0
zero_offset = bins_mean[bins_mean>0][0]
bins_left = bins_left - zero_offset
bins_right = bins_right - zero_offset
bins_mean = bins_mean - zero_offset
bins_left = bins_left[bins_right<=post_time]
bins_mean = bins_mean[bins_right<=post_time]
bins_right = bins_right[bins_right<=post_time]
bins_mean = bins_mean[bins_left>=pre_time]
bins_right = bins_right[bins_left>=pre_time]
bins_left = bins_left[bins_left>=pre_time]
def cal_fr(ite_spike):
ite_fr = list()
for i in range(bins_left.shape[0]):
ite_fr_i = ite_spike[(ite_spike>=bins_left[i])&(ite_spike<bins_right[i])].shape[0]
ite_fr.append(ite_fr_i)
ite_fr = np.array(ite_fr)
ite_fr = ite_fr*1000.0/bin_size
return ite_fr
firing_rate = data[channel].apply(cal_fr)
return firing_rate, bins_mean
# Group data by experimental conditions and plot PSTH and raster of each condition
def reclaim_space(self,file_name):
'''
Args
file_name (string):
the name of the work space
Return
-
'''
f = hp.File(file_name,'r')
f2 = hp.File(file_name.split('.h5')[0]+'_reclaim.h5','w')
used_keys = list()
def valid_key(name):
if isinstance(f[name],hp.Group):
pass
else:
used_keys.append(name)
f.visit(valid_key)
for key in used_keys:
f2[key] = f[key].value
f.flush()
f2.flush()
f.close()
f2.close()
os.remove(file_name)
os.rename(file_name.split('.h5')[0]+'_reclaim.h5',file_name)
print('Space is reclaimed now')
def __cal_firing_rate(self,data,channel,bin_size,overlap,pre_time,post_time):
bins_left = [pre_time]
while bins_left[-1] < post_time:
bins_left.append(bins_left[-1]+bin_size-overlap)
bins_left = np.array(bins_left)
bins_right = bins_left+bin_size
bins_mean = (bins_left+bins_right)/2.0
zero_offset = bins_mean[bins_mean>0][0]
bins_left = bins_left - zero_offset
bins_right = bins_right - zero_offset
bins_mean = bins_mean - zero_offset
bins_left = bins_left[bins_right<=post_time]
bins_mean = bins_mean[bins_right<=post_time]
bins_right = bins_right[bins_right<=post_time]
bins_mean = bins_mean[bins_left>=pre_time]
bins_right = bins_right[bins_left>=pre_time]
bins_left = bins_left[bins_left>=pre_time]
def cal_fr(ite_spike):
ite_fr = list()
for i in range(bins_left.shape[0]):
ite_fr_i = ite_spike[(ite_spike>=bins_left[i])&(ite_spike<bins_right[i])].shape[0]
ite_fr.append(ite_fr_i)
ite_fr = np.array(ite_fr)
ite_fr = ite_fr*1000.0/bin_size
return ite_fr
firing_rate = data[channel].apply(cal_fr)
return firing_rate, bins_mean
# Group data by experimental conditions and plot PSTH and raster of each condition
def reclaim_space(self,file_name):
'''
Args
file_name (string):
the name of the work space
Return
-
'''
f = hp.File(file_name,'r')
f2 = hp.File(file_name.split('.h5')[0]+'_reclaim.h5','w')
used_keys = list()
def valid_key(name):
if isinstance(f[name],hp.Group):
pass
else:
used_keys.append(name)
f.visit(valid_key)
for key in used_keys:
f2[key] = f[key].value
f.flush()
f2.flush()
f.close()
f2.close()
os.remove(file_name)
os.rename(file_name.split('.h5')[0]+'_reclaim.h5',file_name)
print('Space is reclaimed now')
def load(cls, source):
"""Deserializes MPArray from :code:`h5py.Group`. Serialize using
:func:`~dump`.
:param target: :code:`h5py.Group` containing serialized MPArray or
path to a single h5 File containing serialized MPArray under /
"""
if isinstance(source, str):
import h5py
with h5py.File(source, 'r') as infile:
return cls.load(infile)
ltens = [source[str(i)].value for i in range(source.attrs['len'])]
return cls(LocalTensors(ltens, cform=source.attrs['canonical_form']))
def _populate_data(self, ret_dict, obj, name):
"""Read data recursively from an HDF5 value and add it to `ret_dict`.
If `obj` is a dataset, it is added to `ret_dict`. If `obj` is a group,
a sub-dictionary is created in `ret_dict` for `obj` and populated
recursively by calling this function on all of the items in the `obj`
group.
Parameters
----------
ret_dict : OrderedDict
Dictionary to which metadata will be added.
obj : h5py.Dataset | h5py.Group
HDF5 value from which to read metadata.
name : valid dictionary key
Dictionary key in `ret_dict` under which to store the data from
`obj`.
"""
if isinstance(obj, h5py.Dataset):
# [()] casts a Dataset as a numpy array
ret_dict[name] = obj[()]
else:
# create a dictionary for this group
ret_dict[name] = {}
for key, value in obj.items():
self._populate_data(ret_dict[name], value, key)
def to_hdf5(self, file_or_path):
"""
Write data to an HDF5 file.
Parameters
----------
file_or_path : str, `h5py.File`, `h5py.Group`
"""
import h5py
if isinstance(file_or_path, str):
f = h5py.File(file_or_path, 'w')
close = True
else:
f = file_or_path
close = False
d = f.create_dataset('mjd', data=self.t.tcb.mjd)
d.attrs['format'] = 'mjd'
d.attrs['scale'] = 'tcb'
d = f.create_dataset('rv', data=self.rv.value)
d.attrs['unit'] = str(self.rv.unit)
d = f.create_dataset('rv_err', data=self.stddev.value)
d.attrs['unit'] = str(self.stddev.unit)
if close:
f.close()
def from_hdf5(cls, file_or_path):
"""
Read data to an HDF5 file.
Parameters
----------
file_or_path : str, `h5py.File`, `h5py.Group`
"""
import h5py
if isinstance(file_or_path, str):
f = h5py.File(file_or_path, 'r')
close = True
else:
f = file_or_path
close = False
t = f['mjd']
rv = f['rv'][:] * u.Unit(f['rv'].attrs['unit'])
stddev = f['rv_err'][:] * u.Unit(f['rv_err'].attrs['unit'])
if close:
f.close()
return cls(t=t, rv=rv, stddev=stddev)
def Group(self):
if self._err:
raise self._err
if self._Group is None:
try:
from h5py import Group
except ImportError:
Group = NotAModule(self._name)
self._Group = Group
return self._Group
def __traverse_add(self, item, filename):
if isinstance(item, h5py.Dataset):
self.add_dataset(item, filename + item.name)
elif isinstance(item, h5py.Group):
for k in item:
self.__traverse_add(item[k], filename)
else:
print("Skipping " + item.name)
def _ls(item, recursive=False, groups=False, level=0):
keys = []
if isinstance(item, h5.Group):
if groups and level > 0:
keys.append(item.name)
if level == 0 or recursive:
for key in list(item.keys()):
keys.extend(_ls(item[key], recursive, groups, level + 1))
elif not groups:
keys.append(item.name)
return keys
def loadDataHDF5(data):
if isinstance(data,h5py.File) or isinstance(data,h5py.Group):
return {k:loadDataHDF5(v) for k,v in data.iteritems()}
elif isinstance(data,h5py.Dataset):
return data.value
else:
print 'unhandled datatype: %s' % type(data)
def _visitfunc(self, name, node):
level = len(name.split('/'))
indent = ' '*level
localname = name.split('/')[-1]
if isinstance(node, h5py.Group):
self.tree_str += indent +"|> {}\n".format(localname)
elif isinstance(node, h5py.Dataset):
self.tree_str += indent +"|D {}: {} {}\n".format(localname, node.shape, node.dtype)
for key, val in node.attrs.items():
self.tree_str += indent+" |- {} = {}\n".format(key, val)
def _get_dset_array(self, dspath):
"""returns a pickle-safe array for the branch specified by dspath"""
branch = self._my_ds_from_path(dspath)
if isinstance(branch, h5py.Group):
return 'group'
else:
return (H5Array(branch), dict(branch.attrs))
def setup_openpmd_species_component( self, grp, quantity ) :
"""
Set the attributes that are specific to a species component
Parameter
---------
grp : an h5py.Group object or h5py.Dataset
quantity : string
The name of the component
"""
self.setup_openpmd_component( grp )
def setup_openpmd_record( self, dset, quantity ) :
"""
Sets the attributes of a record, that comply with OpenPMD
Parameter
---------
dset : an h5py.Dataset or h5py.Group object
quantity : string
The name of the record considered
"""
dset.attrs["unitDimension"] = unit_dimension_dict[quantity]
# No time offset (approximation)
dset.attrs["timeOffset"] = 0.
def __getitem__(self, key):
h5py_item = self.h5py_group[key]
if isinstance(h5py_item, h5py.Group):
if 'h5sparse_format' in h5py_item.attrs:
# detect the sparse matrix
return Dataset(h5py_item)
else:
return Group(h5py_item)
elif isinstance(h5py_item, h5py.Dataset):
return h5py_item
else:
raise ValueError("Unexpected item type.")
def groups(self, path='/'):
"""Return the list of groups under a given node."""
return [key for key in self.children(path)
if isinstance(self._h5py_file[path + '/' + key],
h5py.Group)]
def _print_node_info(self, name, node):
"""Print node information."""
info = ('/' + name).ljust(50)
if isinstance(node, h5py.Group):
pass
elif isinstance(node, h5py.Dataset):
info += str(node.shape).ljust(20)
info += str(node.dtype).ljust(8)
print(info)
def __init__(self, filename, mode=0):
self.filename = filename
h5file = h5py.File(self.filename, 'r')
var_list = []
for var, g in h5file.items():
if not isinstance(g, h5py.Group): continue
uids = g.get('uids')[()].tolist()
var_list.append((var, uids))
super(FileInputProcessor, self).__init__(var_list, mode)
h5file.close()