def __mmap_nev_file(self, filename):
""" Memory map the Neuralynx .nev file """
nev_dtype = np.dtype([
('reserved', '<i2'),
('system_id', '<i2'),
('data_size', '<i2'),
('timestamp', '<u8'),
('event_id', '<i2'),
('ttl_input', '<i2'),
('crc_check', '<i2'),
('dummy1', '<i2'),
('dummy2', '<i2'),
('extra', '<i4', (8,)),
('event_string', 'a128'),
])
if getsize(self.sessiondir + sep + filename) > 16384:
return np.memmap(self.sessiondir + sep + filename,
dtype=nev_dtype, mode='r', offset=16384)
else:
return None
python类dtype()的实例源码
def __extract_nev_file_spec(self):
"""
Extract file specification from an .nsx file
"""
filename = '.'.join([self._filenames['nsx'], 'nev'])
# Header structure of files specification 2.2 and higher. For files 2.1
# and lower, the entries ver_major and ver_minor are not supported.
dt0 = [
('file_id', 'S8'),
('ver_major', 'uint8'),
('ver_minor', 'uint8')]
nev_file_id = np.fromfile(filename, count=1, dtype=dt0)[0]
if nev_file_id['file_id'].decode() == 'NEURALEV':
spec = '{0}.{1}'.format(
nev_file_id['ver_major'], nev_file_id['ver_minor'])
else:
raise IOError('NEV file type {0} is not supported'.format(
nev_file_id['file_id']))
return spec
def __read_nsx_data_variant_a(self, nsx_nb):
"""
Extract nsx data from a 2.1 .nsx file
"""
filename = '.'.join([self._filenames['nsx'], 'ns%i' % nsx_nb])
# get shape of data
shape = (
self.__nsx_databl_param['2.1']('nb_data_points', nsx_nb),
self.__nsx_basic_header[nsx_nb]['channel_count'])
offset = self.__nsx_params['2.1']('bytes_in_headers', nsx_nb)
# read nsx data
# store as dict for compatibility with higher file specs
data = {1: np.memmap(
filename, dtype='int16', shape=shape, offset=offset)}
return data
def __read_nev_data(self, nev_data_masks, nev_data_types):
"""
Extract nev data from a 2.1 or 2.2 .nev file
"""
filename = '.'.join([self._filenames['nev'], 'nev'])
data_size = self.__nev_basic_header['bytes_in_data_packets']
header_size = self.__nev_basic_header['bytes_in_headers']
# read all raw data packets and markers
dt0 = [
('timestamp', 'uint32'),
('packet_id', 'uint16'),
('value', 'S{0}'.format(data_size - 6))]
raw_data = np.memmap(filename, offset=header_size, dtype=dt0)
masks = self.__nev_data_masks(raw_data['packet_id'])
types = self.__nev_data_types(data_size)
data = {}
for k, v in nev_data_masks.items():
data[k] = raw_data.view(types[k][nev_data_types[k]])[masks[k][v]]
return data
def __get_nev_rec_times(self):
"""
Extracts minimum and maximum time points from a nev file.
"""
filename = '.'.join([self._filenames['nev'], 'nev'])
dt = [('timestamp', 'uint32')]
offset = \
self.__get_file_size(filename) - \
self.__nev_params('bytes_in_data_packets')
last_data_packet = np.memmap(filename, offset=offset, dtype=dt)[0]
n_starts = [0 * self.__nev_params('event_unit')]
n_stops = [
last_data_packet['timestamp'] * self.__nev_params('event_unit')]
return n_starts, n_stops
def __get_waveforms_dtype(self):
"""
Extracts the actual waveform dtype set for each channel.
"""
# Blackrock code giving the approiate dtype
conv = {0: 'int8', 1: 'int8', 2: 'int16', 4: 'int32'}
# get all electrode ids from nev ext header
all_el_ids = self.__nev_ext_header[b'NEUEVWAV']['electrode_id']
# get the dtype of waveform (this is stupidly complicated)
if self.__is_set(
np.array(self.__nev_basic_header['additionnal_flags']), 0):
dtype_waveforms = dict((k, 'int16') for k in all_el_ids)
else:
# extract bytes per waveform
waveform_bytes = \
self.__nev_ext_header[b'NEUEVWAV']['bytes_per_waveform']
# extract dtype for waveforms fro each electrode
dtype_waveforms = dict(zip(all_el_ids, conv[waveform_bytes]))
return dtype_waveforms
def __read_comment(self,n_start,n_stop,data,lazy=False):
event_unit = self.__nev_params('event_unit')
if lazy:
times = []
labels = np.array([],dtype='s')
else:
times = data['timestamp']*event_unit
labels = data['comment'].astype(str)
# mask for given time interval
mask = (times >= n_start) & (times < n_stop)
if np.sum(mask)>0:
ev = Event(
times = times[mask].astype(float),
labels = labels[mask],
name = 'comment')
if lazy:
ev.lazy_shape = np.sum(mask)
else:
ev = None
return ev
# --------------end------added by zhangbo 20170926--------
def reformat_integer_v1(data, nbchannel, header):
"""
reformat when dtype is int16 for ABF version 1
"""
chans = [chan_num for chan_num in
header['nADCSamplingSeq'] if chan_num >= 0]
for n, i in enumerate(chans[:nbchannel]): # respect SamplingSeq
data[:, n] /= header['fInstrumentScaleFactor'][i]
data[:, n] /= header['fSignalGain'][i]
data[:, n] /= header['fADCProgrammableGain'][i]
if header['nTelegraphEnable'][i]:
data[:, n] /= header['fTelegraphAdditGain'][i]
data[:, n] *= header['fADCRange']
data[:, n] /= header['lADCResolution']
data[:, n] += header['fInstrumentOffset'][i]
data[:, n] -= header['fSignalOffset'][i]
def solve3DTransform(points1, points2):
"""
Find a 3D transformation matrix that maps points1 onto points2.
Points must be specified as either lists of 4 Vectors or
(4, 3) arrays.
"""
import numpy.linalg
pts = []
for inp in (points1, points2):
if isinstance(inp, np.ndarray):
A = np.empty((4,4), dtype=float)
A[:,:3] = inp[:,:3]
A[:,3] = 1.0
else:
A = np.array([[inp[i].x(), inp[i].y(), inp[i].z(), 1] for i in range(4)])
pts.append(A)
## solve 3 sets of linear equations to determine transformation matrix elements
matrix = np.zeros((4,4))
for i in range(3):
## solve Ax = B; x is one row of the desired transformation matrix
matrix[i] = numpy.linalg.solve(pts[0], pts[1][:,i])
return matrix
def __init__(self, index, channel_names=None, channel_ids=None,
name=None, description=None, file_origin=None,
coordinates=None, **annotations):
'''
Initialize a new :class:`ChannelIndex` instance.
'''
# Inherited initialization
# Sets universally recommended attributes, and places all others
# in annotations
super(ChannelIndex, self).__init__(name=name,
description=description,
file_origin=file_origin,
**annotations)
# Defaults
if channel_names is None:
channel_names = np.array([], dtype='S')
if channel_ids is None:
channel_ids = np.array([], dtype='i')
# Store recommended attributes
self.channel_names = np.array(channel_names)
self.channel_ids = np.array(channel_ids)
self.index = np.array(index)
self.coordinates = coordinates
def load_bytes(self, data_blocks, dtype='<i1', start=None, end=None, expected_size=None):
"""
Return list of bytes contained
in the specified set of blocks.
NB : load all data as files cannot exceed 4Gb
find later other solutions to spare memory.
"""
chunks = list()
raw = ''
# keep only data blocks having
# a size greater than zero
blocks = [k for k in data_blocks if k.size > 0]
for data_block in blocks :
self.file.seek(data_block.start)
raw = self.file.read(data_block.size)[0:expected_size]
databytes = np.frombuffer(raw, dtype=dtype)
chunks.append(databytes)
# concatenate all chunks and return
# the specified slice
if len(chunks)>0 :
databytes = np.concatenate(chunks)
return databytes[start:end]
else :
return np.array([])
def load_channel_data(self, ep, ch):
"""
Return a numpy array containing the
list of bytes corresponding to the
specified episode and channel.
"""
#memorise the sample size and symbol
sample_size = self.sample_size(ep, ch)
sample_symbol = self.sample_symbol(ep, ch)
#create a bit mask to define which
#sample to keep from the file
bit_mask = self.create_bit_mask(ep, ch)
#load all bytes contained in an episode
data_blocks = self.get_data_blocks(ep)
databytes = self.load_bytes(data_blocks)
raw = self.filter_bytes(databytes, bit_mask)
#reshape bytes from the sample size
dt = np.dtype(numpy_map[sample_symbol])
dt.newbyteorder('<')
return np.frombuffer(raw.reshape([len(raw) / sample_size, sample_size]), dt)
def get_signal_data(self, ep, ch):
"""
Return a numpy array containing all samples of a
signal, acquired on an Elphy analog channel, formatted
as a list of (time, value) tuples.
"""
#get data from the file
y_data = self.load_encoded_data(ep, ch)
x_data = np.arange(0, len(y_data))
#create a recarray
data = np.recarray(len(y_data), dtype=[('x', b_float), ('y', b_float)])
#put in the recarray the scaled data
x_factors = self.x_scale_factors(ep, ch)
y_factors = self.y_scale_factors(ep, ch)
data['x'] = x_factors.scale(x_data)
data['y'] = y_factors.scale(y_data)
return data
def get_tag_data(self, ep, tag_ch):
"""
Return a numpy array containing all samples of a
signal, acquired on an Elphy tag channel, formatted
as a list of (time, value) tuples.
"""
#get data from the file
y_data = self.load_encoded_tags(ep, tag_ch)
x_data = np.arange(0, len(y_data))
#create a recarray
data = np.recarray(len(y_data), dtype=[('x', b_float), ('y', b_int)])
#put in the recarray the scaled data
factors = self.x_tag_scale_factors(ep)
data['x'] = factors.scale(x_data)
data['y'] = y_data
return data
def load_encoded_events(self, episode, evt_channel, identifier):
"""
Return times stored as a 4-bytes integer
in the specified event channel.
"""
data_blocks = self.group_blocks_of_type(episode, identifier)
ep_blocks = self.get_blocks_stored_in_episode(episode)
evt_blocks = [k for k in ep_blocks if k.identifier == identifier]
#compute events on each channel
n_events = np.sum([k.n_events for k in evt_blocks], dtype=int, axis=0)
pre_events = np.sum(n_events[0:evt_channel - 1], dtype=int)
start = pre_events
end = start + n_events[evt_channel - 1]
expected_size = 4 * np.sum(n_events, dtype=int)
return self.load_bytes(data_blocks, dtype='<i4', start=start, end=end, expected_size=expected_size)
def load_encoded_spikes(self, episode, evt_channel, identifier):
"""
Return times stored as a 4-bytes integer
in the specified spike channel.
NB: it is meant for Blackrock-type, having an additional byte for each event time as spike sorting label.
These additiona bytes are appended trailing the times.
"""
# to load the requested spikes for the specified episode and event channel:
# get all the elphy blocks having as identifier 'RSPK' (or whatever)
all_rspk_blocks = [k for k in self.blocks if k.identifier == identifier]
rspk_block = all_rspk_blocks[episode-1]
# RDATA(h?dI) REVT(NbVeV:I, NbEv:256I ... spike data are 4byte integers
rspk_header = 4*( rspk_block.size - rspk_block.data_size-2 + len(rspk_block.n_events))
pre_events = np.sum(rspk_block.n_events[0:evt_channel-1], dtype=int, axis=0)
# the real start is after header, preceeding events (which are 4byte) and preceeding labels (1byte)
start = rspk_header + (4*pre_events) + pre_events
end = start + 4*rspk_block.n_events[evt_channel-1]
raw = self.load_bytes( [rspk_block], dtype='<i1', start=start, end=end, expected_size=rspk_block.size )
# re-encoding after reading byte by byte
res = np.frombuffer(raw[0:(4*rspk_block.n_events[evt_channel-1])], dtype='<i4')
res.sort() # sometimes timings are not sorted
#print "load_encoded_data() - spikes:",res
return res
def get_spiketrain(self, episode, electrode_id):
"""
Return a :class:`Spike` which is a
descriptor of the specified spike channel.
"""
assert episode in range(1, self.n_episodes + 1)
assert electrode_id in range(1, self.n_spiketrains(episode) + 1)
# get some properties stored in the episode sub-block
block = self.episode_block(episode)
x_unit = block.ep_block.x_unit
x_unit_wf = getattr(block.ep_block, 'x_unit_wf', None)
y_unit_wf = getattr(block.ep_block, 'y_unit_wf', None)
# number of spikes in the entire episode
spk_blocks = [k for k in self.blocks if k.identifier == 'RSPK']
n_events = np.sum([k.n_events[electrode_id - 1] for k in spk_blocks], dtype=int)
# number of samples in a waveform
wf_sampling_frequency = 1.0 / block.ep_block.dX
wf_blocks = [k for k in self.blocks if k.identifier == 'RspkWave']
if wf_blocks :
wf_samples = wf_blocks[0].wavelength
t_start = wf_blocks[0].pre_trigger * block.ep_block.dX
else:
wf_samples = 0
t_start = 0
return ElphySpikeTrain(self, episode, electrode_id, x_unit, n_events, wf_sampling_frequency, wf_samples, x_unit_wf, y_unit_wf, t_start)
def get_rspk_data(self, spk_channel):
"""
Return times stored as a 4-bytes integer
in the specified event channel.
"""
evt_blocks = self.get_blocks_of_type('RSPK')
#compute events on each channel
n_events = np.sum([k.n_events for k in evt_blocks], dtype=int, axis=0)
pre_events = np.sum(n_events[0:spk_channel], dtype=int) # sum of array values up to spk_channel-1!!!!
start = pre_events + (7 + len(n_events))# rspk header
end = start + n_events[spk_channel]
expected_size = 4 * np.sum(n_events, dtype=int) # constant
return self.load_bytes(evt_blocks, dtype='<i4', start=start, end=end, expected_size=expected_size)
# ---------------------------------------------------------
# factories.py
def __mmap_ncs_packet_headers(self, filename):
"""
Memory map of the Neuralynx .ncs file optimized for extraction of
data packet headers
Reading standard dtype improves speed, but timestamps need to be
reconstructed
"""
filesize = getsize(self.sessiondir + sep + filename) # in byte
if filesize > 16384:
data = np.memmap(self.sessiondir + sep + filename,
dtype='<u4',
shape=((filesize - 16384) / 4 / 261, 261),
mode='r', offset=16384)
ts = data[:, 0:2]
multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
axis=0)
timestamps = np.sum(ts * multi, axis=1)
# timestamps = data[:,0] + (data[:,1] *2**32)
header_u4 = data[:, 2:5]
return timestamps, header_u4
else:
return None
def __mmap_nev_file(self, filename):
""" Memory map the Neuralynx .nev file """
nev_dtype = np.dtype([
('reserved', '<i2'),
('system_id', '<i2'),
('data_size', '<i2'),
('timestamp', '<u8'),
('event_id', '<i2'),
('ttl_input', '<i2'),
('crc_check', '<i2'),
('dummy1', '<i2'),
('dummy2', '<i2'),
('extra', '<i4', (8,)),
('event_string', 'a128'),
])
if getsize(self.sessiondir + sep + filename) > 16384:
return np.memmap(self.sessiondir + sep + filename,
dtype=nev_dtype, mode='r', offset=16384)
else:
return None