python类dtype()的实例源码

neuralynxio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __mmap_nev_file(self, filename):
        """ Memory map the Neuralynx .nev file """
        nev_dtype = np.dtype([
            ('reserved', '<i2'),
            ('system_id', '<i2'),
            ('data_size', '<i2'),
            ('timestamp', '<u8'),
            ('event_id', '<i2'),
            ('ttl_input', '<i2'),
            ('crc_check', '<i2'),
            ('dummy1', '<i2'),
            ('dummy2', '<i2'),
            ('extra', '<i4', (8,)),
            ('event_string', 'a128'),
        ])

        if getsize(self.sessiondir + sep + filename) > 16384:
            return np.memmap(self.sessiondir + sep + filename,
                             dtype=nev_dtype, mode='r', offset=16384)
        else:
            return None
blackrockio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def __extract_nev_file_spec(self):
        """
        Extract file specification from an .nsx file
        """
        filename = '.'.join([self._filenames['nsx'], 'nev'])
        # Header structure of files specification 2.2 and higher. For files 2.1
        # and lower, the entries ver_major and ver_minor are not supported.
        dt0 = [
            ('file_id', 'S8'),
            ('ver_major', 'uint8'),
            ('ver_minor', 'uint8')]

        nev_file_id = np.fromfile(filename, count=1, dtype=dt0)[0]
        if nev_file_id['file_id'].decode() == 'NEURALEV':
            spec = '{0}.{1}'.format(
                nev_file_id['ver_major'], nev_file_id['ver_minor'])
        else:
            raise IOError('NEV file type {0} is not supported'.format(
                nev_file_id['file_id']))

        return spec
blackrockio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __read_nsx_data_variant_a(self, nsx_nb):
        """
        Extract nsx data from a 2.1 .nsx file
        """
        filename = '.'.join([self._filenames['nsx'], 'ns%i' % nsx_nb])

        # get shape of data
        shape = (
            self.__nsx_databl_param['2.1']('nb_data_points', nsx_nb),
            self.__nsx_basic_header[nsx_nb]['channel_count'])
        offset = self.__nsx_params['2.1']('bytes_in_headers', nsx_nb)

        # read nsx data
        # store as dict for compatibility with higher file specs
        data = {1: np.memmap(
            filename, dtype='int16', shape=shape, offset=offset)}

        return data
blackrockio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __read_nev_data(self, nev_data_masks, nev_data_types):
        """
        Extract nev data from a 2.1 or 2.2 .nev file
        """
        filename = '.'.join([self._filenames['nev'], 'nev'])
        data_size = self.__nev_basic_header['bytes_in_data_packets']
        header_size = self.__nev_basic_header['bytes_in_headers']

        # read all raw data packets and markers
        dt0 = [
            ('timestamp', 'uint32'),
            ('packet_id', 'uint16'),
            ('value', 'S{0}'.format(data_size - 6))]

        raw_data = np.memmap(filename, offset=header_size, dtype=dt0)

        masks = self.__nev_data_masks(raw_data['packet_id'])
        types = self.__nev_data_types(data_size)

        data = {}
        for k, v in nev_data_masks.items():
            data[k] = raw_data.view(types[k][nev_data_types[k]])[masks[k][v]]

        return data
blackrockio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __get_nev_rec_times(self):
        """
        Extracts minimum and maximum time points from a nev file.
        """
        filename = '.'.join([self._filenames['nev'], 'nev'])

        dt = [('timestamp', 'uint32')]
        offset = \
            self.__get_file_size(filename) - \
            self.__nev_params('bytes_in_data_packets')
        last_data_packet = np.memmap(filename, offset=offset, dtype=dt)[0]

        n_starts = [0 * self.__nev_params('event_unit')]
        n_stops = [
            last_data_packet['timestamp'] * self.__nev_params('event_unit')]

        return n_starts, n_stops
blackrockio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __get_waveforms_dtype(self):
        """
        Extracts the actual waveform dtype set for each channel.
        """
        # Blackrock code giving the approiate dtype
        conv = {0: 'int8', 1: 'int8', 2: 'int16', 4: 'int32'}

        # get all electrode ids from nev ext header
        all_el_ids = self.__nev_ext_header[b'NEUEVWAV']['electrode_id']

        # get the dtype of waveform (this is stupidly complicated)
        if self.__is_set(
                np.array(self.__nev_basic_header['additionnal_flags']), 0):
            dtype_waveforms = dict((k, 'int16') for k in all_el_ids)
        else:
            # extract bytes per waveform
            waveform_bytes = \
                self.__nev_ext_header[b'NEUEVWAV']['bytes_per_waveform']
            # extract dtype for waveforms fro each electrode
            dtype_waveforms = dict(zip(all_el_ids, conv[waveform_bytes]))

        return dtype_waveforms
blackrockio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __read_comment(self,n_start,n_stop,data,lazy=False):
        event_unit = self.__nev_params('event_unit')
        if lazy:
            times = []
            labels = np.array([],dtype='s')
        else:
            times = data['timestamp']*event_unit
            labels = data['comment'].astype(str)
        # mask for given time interval
        mask = (times >= n_start) & (times < n_stop)

        if np.sum(mask)>0:
            ev = Event(
                times = times[mask].astype(float),
                labels = labels[mask],
                name = 'comment')
            if lazy:
                ev.lazy_shape = np.sum(mask)
        else:
            ev = None
        return ev
    # --------------end------added by zhangbo 20170926--------
axonio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def reformat_integer_v1(data, nbchannel, header):
    """
    reformat when dtype is int16 for ABF version 1
    """
    chans = [chan_num for chan_num in
             header['nADCSamplingSeq'] if chan_num >= 0]
    for n, i in enumerate(chans[:nbchannel]):  # respect SamplingSeq
        data[:, n] /= header['fInstrumentScaleFactor'][i]
        data[:, n] /= header['fSignalGain'][i]
        data[:, n] /= header['fADCProgrammableGain'][i]
        if header['nTelegraphEnable'][i]:
            data[:, n] /= header['fTelegraphAdditGain'][i]
        data[:, n] *= header['fADCRange']
        data[:, n] /= header['lADCResolution']
        data[:, n] += header['fInstrumentOffset'][i]
        data[:, n] -= header['fSignalOffset'][i]
functions.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def solve3DTransform(points1, points2):
    """
    Find a 3D transformation matrix that maps points1 onto points2.
    Points must be specified as either lists of 4 Vectors or 
    (4, 3) arrays.
    """
    import numpy.linalg
    pts = []
    for inp in (points1, points2):
        if isinstance(inp, np.ndarray):
            A = np.empty((4,4), dtype=float)
            A[:,:3] = inp[:,:3]
            A[:,3] = 1.0
        else:
            A = np.array([[inp[i].x(), inp[i].y(), inp[i].z(), 1] for i in range(4)])
        pts.append(A)

    ## solve 3 sets of linear equations to determine transformation matrix elements
    matrix = np.zeros((4,4))
    for i in range(3):
        ## solve Ax = B; x is one row of the desired transformation matrix
        matrix[i] = numpy.linalg.solve(pts[0], pts[1][:,i])  

    return matrix
channelindex.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, index, channel_names=None, channel_ids=None,
                 name=None, description=None, file_origin=None,
                 coordinates=None, **annotations):
        '''
        Initialize a new :class:`ChannelIndex` instance.
        '''
        # Inherited initialization
        # Sets universally recommended attributes, and places all others
        # in annotations
        super(ChannelIndex, self).__init__(name=name,
                                           description=description,
                                           file_origin=file_origin,
                                           **annotations)

        # Defaults
        if channel_names is None:
            channel_names = np.array([], dtype='S')
        if channel_ids is None:
            channel_ids = np.array([], dtype='i')

        # Store recommended attributes
        self.channel_names = np.array(channel_names)
        self.channel_ids = np.array(channel_ids)
        self.index = np.array(index)
        self.coordinates = coordinates
elphyio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def load_bytes(self, data_blocks, dtype='<i1', start=None, end=None, expected_size=None):
        """
        Return list of bytes contained
        in the specified set of blocks.

        NB : load all data as files cannot exceed 4Gb 
             find later other solutions to spare memory.
        """
        chunks = list()
        raw = ''
        # keep only data blocks having
        # a size greater than zero
        blocks = [k for k in data_blocks if k.size > 0]
        for data_block in blocks :
            self.file.seek(data_block.start)
            raw = self.file.read(data_block.size)[0:expected_size]
            databytes = np.frombuffer(raw, dtype=dtype)
            chunks.append(databytes)
        # concatenate all chunks and return
        # the specified slice
        if len(chunks)>0 :
            databytes = np.concatenate(chunks)
            return databytes[start:end]
        else :
            return np.array([])
elphyio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def load_channel_data(self, ep, ch):
        """
        Return a numpy array containing the
        list of bytes corresponding to the
        specified episode and channel.
        """
        #memorise the sample size and symbol
        sample_size = self.sample_size(ep, ch)
        sample_symbol = self.sample_symbol(ep, ch)

        #create a bit mask to define which
        #sample to keep from the file
        bit_mask = self.create_bit_mask(ep, ch)

        #load all bytes contained in an episode
        data_blocks = self.get_data_blocks(ep)
        databytes = self.load_bytes(data_blocks)
        raw = self.filter_bytes(databytes, bit_mask)

        #reshape bytes from the sample size
        dt = np.dtype(numpy_map[sample_symbol])
        dt.newbyteorder('<')
        return np.frombuffer(raw.reshape([len(raw) / sample_size, sample_size]), dt)
elphyio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def get_signal_data(self, ep, ch):
        """
        Return a numpy array containing all samples of a
        signal, acquired on an Elphy analog channel, formatted
        as a list of (time, value) tuples.
        """
        #get data from the file
        y_data = self.load_encoded_data(ep, ch)
        x_data = np.arange(0, len(y_data))

        #create a recarray
        data = np.recarray(len(y_data), dtype=[('x', b_float), ('y', b_float)])

        #put in the recarray the scaled data
        x_factors = self.x_scale_factors(ep, ch)
        y_factors = self.y_scale_factors(ep, ch)
        data['x'] = x_factors.scale(x_data)
        data['y'] = y_factors.scale(y_data)

        return data
elphyio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_tag_data(self, ep, tag_ch):
        """
        Return a numpy array containing all samples of a
        signal, acquired on an Elphy tag channel, formatted
        as a list of (time, value) tuples.
        """
        #get data from the file
        y_data = self.load_encoded_tags(ep, tag_ch)
        x_data = np.arange(0, len(y_data))

        #create a recarray
        data = np.recarray(len(y_data), dtype=[('x', b_float), ('y', b_int)])

        #put in the recarray the scaled data
        factors = self.x_tag_scale_factors(ep)
        data['x'] = factors.scale(x_data)
        data['y'] = y_data
        return data
elphyio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def load_encoded_events(self, episode, evt_channel, identifier):
        """
        Return times stored as a 4-bytes integer
        in the specified event channel.
        """
        data_blocks = self.group_blocks_of_type(episode, identifier)
        ep_blocks = self.get_blocks_stored_in_episode(episode)
        evt_blocks = [k for k in ep_blocks if k.identifier == identifier]

        #compute events on each channel
        n_events = np.sum([k.n_events for k in evt_blocks], dtype=int, axis=0)
        pre_events = np.sum(n_events[0:evt_channel - 1], dtype=int)
        start = pre_events
        end = start + n_events[evt_channel - 1]
        expected_size = 4 * np.sum(n_events, dtype=int)
        return self.load_bytes(data_blocks, dtype='<i4', start=start, end=end, expected_size=expected_size)
elphyio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def load_encoded_spikes(self, episode, evt_channel, identifier):
        """
        Return times stored as a 4-bytes integer
        in the specified spike channel.
        NB: it is meant for Blackrock-type, having an additional byte for each event time as spike sorting label.
            These additiona bytes are appended trailing the times.
        """
        # to load the requested spikes for the specified episode and event channel:
        # get all the elphy blocks having as identifier 'RSPK' (or whatever)
        all_rspk_blocks = [k for k in self.blocks if k.identifier == identifier]
        rspk_block = all_rspk_blocks[episode-1]
        # RDATA(h?dI) REVT(NbVeV:I, NbEv:256I ... spike data are 4byte integers
        rspk_header = 4*( rspk_block.size - rspk_block.data_size-2 + len(rspk_block.n_events))
        pre_events = np.sum(rspk_block.n_events[0:evt_channel-1], dtype=int, axis=0)
        # the real start is after header, preceeding events (which are 4byte) and preceeding labels (1byte)
        start = rspk_header + (4*pre_events) + pre_events
        end = start + 4*rspk_block.n_events[evt_channel-1]
        raw = self.load_bytes( [rspk_block], dtype='<i1', start=start, end=end, expected_size=rspk_block.size )
        # re-encoding after reading byte by byte
        res = np.frombuffer(raw[0:(4*rspk_block.n_events[evt_channel-1])], dtype='<i4')
        res.sort() # sometimes timings are not sorted
        #print "load_encoded_data() - spikes:",res
        return res
elphyio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_spiketrain(self, episode, electrode_id):
        """
        Return a :class:`Spike` which is a
        descriptor of the specified spike channel.
        """
        assert episode in range(1, self.n_episodes + 1)
        assert electrode_id in range(1, self.n_spiketrains(episode) + 1)
        # get some properties stored in the episode sub-block
        block = self.episode_block(episode)
        x_unit = block.ep_block.x_unit
        x_unit_wf = getattr(block.ep_block, 'x_unit_wf', None)
        y_unit_wf = getattr(block.ep_block, 'y_unit_wf', None)
        # number of spikes in the entire episode
        spk_blocks = [k for k in self.blocks if k.identifier == 'RSPK']
        n_events = np.sum([k.n_events[electrode_id - 1] for k in spk_blocks], dtype=int)
        # number of samples in a waveform
        wf_sampling_frequency = 1.0 / block.ep_block.dX
        wf_blocks = [k for k in self.blocks if k.identifier == 'RspkWave']
        if wf_blocks :
            wf_samples = wf_blocks[0].wavelength
            t_start = wf_blocks[0].pre_trigger * block.ep_block.dX
        else:
            wf_samples = 0
            t_start = 0
        return ElphySpikeTrain(self, episode, electrode_id, x_unit, n_events, wf_sampling_frequency, wf_samples, x_unit_wf, y_unit_wf, t_start)
elphyio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_rspk_data(self, spk_channel):
        """
        Return times stored as a 4-bytes integer
        in the specified event channel.
        """
        evt_blocks = self.get_blocks_of_type('RSPK')
        #compute events on each channel
        n_events = np.sum([k.n_events for k in evt_blocks], dtype=int, axis=0)
        pre_events = np.sum(n_events[0:spk_channel], dtype=int) # sum of array values up to spk_channel-1!!!!
        start = pre_events + (7 + len(n_events))# rspk header
        end = start + n_events[spk_channel]
        expected_size = 4 * np.sum(n_events, dtype=int) # constant
        return self.load_bytes(evt_blocks, dtype='<i4', start=start, end=end, expected_size=expected_size)


# ---------------------------------------------------------
# factories.py
neuralynxio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __mmap_ncs_packet_headers(self, filename):
        """
        Memory map of the Neuralynx .ncs file optimized for extraction of
        data packet headers
        Reading standard dtype improves speed, but timestamps need to be
        reconstructed
        """
        filesize = getsize(self.sessiondir + sep + filename)  # in byte
        if filesize > 16384:
            data = np.memmap(self.sessiondir + sep + filename,
                             dtype='<u4',
                             shape=((filesize - 16384) / 4 / 261, 261),
                             mode='r', offset=16384)

            ts = data[:, 0:2]
            multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
                              axis=0)
            timestamps = np.sum(ts * multi, axis=1)
            # timestamps = data[:,0] + (data[:,1] *2**32)
            header_u4 = data[:, 2:5]

            return timestamps, header_u4
        else:
            return None
neuralynxio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def __mmap_nev_file(self, filename):
        """ Memory map the Neuralynx .nev file """
        nev_dtype = np.dtype([
            ('reserved', '<i2'),
            ('system_id', '<i2'),
            ('data_size', '<i2'),
            ('timestamp', '<u8'),
            ('event_id', '<i2'),
            ('ttl_input', '<i2'),
            ('crc_check', '<i2'),
            ('dummy1', '<i2'),
            ('dummy2', '<i2'),
            ('extra', '<i4', (8,)),
            ('event_string', 'a128'),
        ])

        if getsize(self.sessiondir + sep + filename) > 16384:
            return np.memmap(self.sessiondir + sep + filename,
                             dtype=nev_dtype, mode='r', offset=16384)
        else:
            return None


问题


面经


文章

微信
公众号

扫码关注公众号