python类dtype()的实例源码

input_data.py 文件源码 项目:IntroToDeepLearning 作者: robb-brown 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def extract_images(filename):
  """Extract the images into a 4D uint8 numpy array [index, y, x, depth]."""
  print('Extracting', filename)
  with gzip.open(filename) as bytestream:
    magic = _read32(bytestream)
    if magic != 2051:
      raise ValueError(
          'Invalid magic number %d in MNIST image file: %s' %
          (magic, filename))
    num_images = _read32(bytestream)
    rows = _read32(bytestream)
    cols = _read32(bytestream)
    buf = bytestream.read(rows * cols * num_images)
    data = numpy.frombuffer(buf, dtype=numpy.uint8)
    data = data.reshape(num_images, rows, cols, 1)
    return data
input_data.py 文件源码 项目:IntroToDeepLearning 作者: robb-brown 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def extract_images(filename):
  """Extract the images into a 4D uint8 numpy array [index, y, x, depth]."""
  print('Extracting', filename)
  with gzip.open(filename) as bytestream:
    magic = _read32(bytestream)
    if magic != 2051:
      raise ValueError(
          'Invalid magic number %d in MNIST image file: %s' %
          (magic, filename))
    num_images = _read32(bytestream)
    rows = _read32(bytestream)
    cols = _read32(bytestream)
    buf = bytestream.read(rows * cols * num_images)
    data = numpy.frombuffer(buf, dtype=numpy.uint8)
    data = data.reshape(num_images, rows, cols, 1)
    return data
mpibase.py 文件源码 项目:mpiFFT4py 作者: spectralDNS 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __keytransform__(self, key):
        if isinstance(key[0], np.ndarray):
            shape = key[0].shape
            dtype = key[0].dtype
            i = key[1]
            zero = True if len(key) == 2 else key[2]

        elif isinstance(key[0], tuple):
            if len(key) == 3:
                shape, dtype, i = key
                zero = True

            elif len(key) == 4:
                shape, dtype, i, zero = key

        else:
            raise TypeError("Wrong type of key for work array")

        assert isinstance(zero, bool)
        assert isinstance(i, int)
        self.fillzero = zero
        return (shape, np.dtype(dtype), i)
metric_specs.py 文件源码 项目:seq2seq 作者: google 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def accumulate_strings(values, name="strings"):
  """Accumulates strings into a vector.

  Args:
    values: A 1-d string tensor that contains values to add to the accumulator.

  Returns:
    A tuple (value_tensor, update_op).
  """
  tf.assert_type(values, tf.string)
  strings = tf.Variable(
      name=name,
      initial_value=[],
      dtype=tf.string,
      trainable=False,
      collections=[],
      validate_shape=True)
  value_tensor = tf.identity(strings)
  update_op = tf.assign(
      ref=strings, value=tf.concat([strings, values], 0), validate_shape=False)
  return value_tensor, update_op
test_preprocess.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_expect_dtypes_with_tuple(self):

        allowed_dtypes = (dtype('datetime64[ns]'), dtype('float'))

        @expect_dtypes(a=allowed_dtypes)
        def foo(a, b):
            return a, b

        for d in allowed_dtypes:
            good_a = arange(3).astype(d)
            good_b = object()
            ret_a, ret_b = foo(good_a, good_b)
            self.assertIs(good_a, ret_a)
            self.assertIs(good_b, ret_b)

        with self.assertRaises(TypeError) as e:
            foo(arange(3, dtype='uint32'), object())

        expected_message = (
            "{qualname}() expected a value with dtype 'datetime64[ns]' "
            "or 'float64' for argument 'a', but got 'uint32' instead."
        ).format(qualname=qualname(foo))
        self.assertEqual(e.exception.args[0], expected_message)
multigenome.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def _classify_gems(counts0, counts1):
        """ Infer number of distinct transcriptomes present in each GEM (1 or 2) and
            report cr_constants.GEM_CLASS_GENOME0 for a single cell w/ transcriptome 0,
            report cr_constants.GEM_CLASS_GENOME1 for a single cell w/ transcriptome 1,
            report cr_constants.GEM_CLASS_MULTIPLET for multiple transcriptomes """
        # Assumes that most of the GEMs are single-cell; model counts independently
        thresh0, thresh1 = [cr_constants.DEFAULT_MULTIPLET_THRESHOLD] * 2
        if sum(counts0 > counts1) >= 1 and sum(counts1 > counts0) >= 1:
            thresh0 = np.percentile(counts0[counts0 > counts1], cr_constants.MULTIPLET_PROB_THRESHOLD)
            thresh1 = np.percentile(counts1[counts1 > counts0], cr_constants.MULTIPLET_PROB_THRESHOLD)

        doublet = np.logical_and(counts0 >= thresh0, counts1 >= thresh1)
        dtype = np.dtype('|S%d' % max(len(cls) for cls in cr_constants.GEM_CLASSES))
        result = np.where(doublet, cr_constants.GEM_CLASS_MULTIPLET, cr_constants.GEM_CLASS_GENOME0).astype(dtype)
        result[np.logical_and(np.logical_not(result == cr_constants.GEM_CLASS_MULTIPLET), counts1 > counts0)] = cr_constants.GEM_CLASS_GENOME1

        return result
hdf5.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def widen_cat_column(old_ds, new_type):
    name = old_ds.name
    tmp_name = "__tmp_" + old_ds.name
    grp = old_ds.parent

    ds = grp.create_dataset(tmp_name,
            data = old_ds[:],
            shape = old_ds.shape,
            maxshape = (None,),
            dtype = new_type,
            compression = COMPRESSION,
            shuffle = True,
            chunks = (CHUNK_SIZE,))

    del grp[name]
    grp.move(tmp_name, name)
    return ds
hdf5.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def create_levels(ds, levels):
    # Create a dataset in the LEVEL_GROUP
    # and store as native numpy / h5py types
    level_grp = ds.file.get(LEVEL_GROUP)
    if level_grp is None:
        # Create a LEVEL_GROUP
        level_grp = ds.file.create_group(LEVEL_GROUP)
    ds_name = ds.name.split("/")[-1]
    dt = h5py.special_dtype(vlen=str)

    level_grp.create_dataset(ds_name,
                       shape = [len(levels)],
                       maxshape = (None,),
                       dtype = dt,
                       data  = levels,
                       compression = COMPRESSION,
                       chunks = (CHUNK_SIZE,))
hdf5.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def reg2bin_vector(begin, end):
    '''Vectorized tabix reg2bin -- much faster than reg2bin'''
    result = np.zeros(begin.shape)

    # Entries filled
    done = np.zeros(begin.shape, dtype=np.bool)

    for (bits, bins) in rev_bit_bins:
        begin_shift = begin >> bits
        new_done = (begin >> bits) == (end >> bits)
        mask = np.logical_and(new_done, np.logical_not(done))
        offset = ((1 << (29 - bits)) - 1) / 7
        result[mask] = offset + begin_shift[mask]

        done = new_done

    return result.astype(np.int32)
gdal_array.py 文件源码 项目:gee-bridge 作者: francbartoli 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def flip_code(code):
    if isinstance(code, (numpy.dtype,type)):
        # since several things map to complex64 we must carefully select
        # the opposite that is an exact match (ticket 1518)
        if code == numpy.int8:
            return gdalconst.GDT_Byte
        if code == numpy.complex64:
            return gdalconst.GDT_CFloat32

        for key, value in codes.items():
            if value == code:
                return key
        return None
    else:
        try:
            return codes[code]
        except KeyError:
            return None
shp1.py 文件源码 项目:j3dview 作者: blank63 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def gl_init(self,array_table):
        self.gl_hide = False

        self.gl_vertex_array = gl.VertexArray()
        glBindVertexArray(self.gl_vertex_array)

        self.gl_vertex_buffer = gl.Buffer()
        glBindBuffer(GL_ARRAY_BUFFER,self.gl_vertex_buffer)

        self.gl_element_count = 3*gl_count_triangles(self)
        self.gl_element_buffer = gl.Buffer()
        glBindBuffer(GL_ELEMENT_ARRAY_BUFFER,self.gl_element_buffer)

        vertex_type =  numpy.dtype([array_table[attribute].field() for attribute in self.attributes])
        vertex_count = sum(len(primitive.vertices) for primitive in self.primitives)
        vertex_array = numpy.empty(vertex_count,vertex_type)

        for attribute in self.attributes:
            array_table[attribute].load(self,vertex_array)

        vertex_array,element_map = numpy.unique(vertex_array,return_inverse=True)
        element_array = gl_create_element_array(self,element_map,self.gl_element_count)

        glBufferData(GL_ARRAY_BUFFER,vertex_array.nbytes,vertex_array,GL_STATIC_DRAW)
        glBufferData(GL_ELEMENT_ARRAY_BUFFER,element_array.nbytes,element_array,GL_STATIC_DRAW)
plyfile.py 文件源码 项目:pointnet 作者: charlesq34 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def make2d(array, cols=None, dtype=None):
    '''
    Make a 2D array from an array of arrays.  The `cols' and `dtype'
    arguments can be omitted if the array is not empty.

    '''
    if (cols is None or dtype is None) and not len(array):
        raise RuntimeError("cols and dtype must be specified for empty "
                           "array")

    if cols is None:
        cols = len(array[0])

    if dtype is None:
        dtype = array[0].dtype

    return _np.fromiter(array, [('_', dtype, (cols,))],
                        count=len(array))['_']
plyfile.py 文件源码 项目:pointnet 作者: charlesq34 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _read(self, stream, text, byte_order):
        '''
        Read the actual data from a PLY file.

        '''
        if text:
            self._read_txt(stream)
        else:
            if self._have_list:
                # There are list properties, so a simple load is
                # impossible.
                self._read_bin(stream, byte_order)
            else:
                # There are no list properties, so loading the data is
                # much more straightforward.
                self._data = _np.fromfile(stream,
                                          self.dtype(byte_order),
                                          self.count)

        if len(self._data) < self.count:
            k = len(self._data)
            del self._data
            raise PlyParseError("early end-of-file", self, k)

        self._check_sanity()
plyfile.py 文件源码 项目:pointnet 作者: charlesq34 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _read_bin(self, stream, byte_order):
        '''
        Load a PLY element from a binary PLY file.  The element may
        contain list properties.

        '''
        self._data = _np.empty(self.count, dtype=self.dtype(byte_order))

        for k in _range(self.count):
            for prop in self.properties:
                try:
                    self._data[prop.name][k] = \
                        prop._read_bin(stream, byte_order)
                except StopIteration:
                    raise PlyParseError("early end-of-file",
                                        self, k, prop)
mapped_struct.py 文件源码 项目:sharedbuffers 作者: jampp 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _merge_all(parts, dtype):
    if len(parts) == 1:
        return parts[0]
    else:
        nparts = []
        for i in xrange(0, len(parts), 2):
            if i+1 < len(parts):
                npart = numpy.empty((len(parts[i])+len(parts[i+1]), 2), dtype)
                merge_elements = index_merge(parts[i], parts[i+1], npart)
                if merge_elements != len(npart):
                    npart = npart[:merge_elements]
                nparts.append(npart)
            else:
                nparts.append(parts[i])
        del parts
        return _merge_all(nparts, dtype)
mapped_struct.py 文件源码 项目:sharedbuffers 作者: jampp 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, buf, offset = 0):
        # Accelerate class attributes
        self._encode = self.encode
        self._dtype = self.dtype
        self._xxh = self.xxh

        # Initialize buffer
        if offset:
            self._buf = self._likebuf = buffer(buf, offset)
        else:
            self._buf = buf
            self._likebuf = _likebuffer(buf)

        # Parse header and map index
        self.index_elements, self.index_offset = self._Header.unpack_from(self._buf, 0)

        self.index = numpy.ndarray(buffer = self._buf, 
            offset = self.index_offset, 
            dtype = self.dtype, 
            shape = (self.index_elements, 3))
test_functions.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_rescaleData():
    dtypes = map(np.dtype, ('ubyte', 'uint16', 'byte', 'int16', 'int', 'float'))
    for dtype1 in dtypes:
        for dtype2 in dtypes:
            data = (np.random.random(size=10) * 2**32 - 2**31).astype(dtype1)
            for scale, offset in [(10, 0), (10., 0.), (1, -50), (0.2, 0.5), (0.001, 0)]:
                if dtype2.kind in 'iu':
                    lim = np.iinfo(dtype2)
                    lim = lim.min, lim.max
                else:
                    lim = (-np.inf, np.inf)
                s1 = np.clip(float(scale) * (data-float(offset)), *lim).astype(dtype2)
                s2 = pg.rescaleData(data, scale, offset, dtype2)
                assert s1.dtype == s2.dtype
                if dtype2.kind in 'iu':
                    assert np.all(s1 == s2)
                else:
                    assert np.allclose(s1, s2)
functions.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def solve3DTransform(points1, points2):
    """
    Find a 3D transformation matrix that maps points1 onto points2.
    Points must be specified as either lists of 4 Vectors or 
    (4, 3) arrays.
    """
    import numpy.linalg
    pts = []
    for inp in (points1, points2):
        if isinstance(inp, np.ndarray):
            A = np.empty((4,4), dtype=float)
            A[:,:3] = inp[:,:3]
            A[:,3] = 1.0
        else:
            A = np.array([[inp[i].x(), inp[i].y(), inp[i].z(), 1] for i in range(4)])
        pts.append(A)

    ## solve 3 sets of linear equations to determine transformation matrix elements
    matrix = np.zeros((4,4))
    for i in range(3):
        ## solve Ax = B; x is one row of the desired transformation matrix
        matrix[i] = numpy.linalg.solve(pts[0], pts[1][:,i])  

    return matrix
channelindex.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, index, channel_names=None, channel_ids=None,
                 name=None, description=None, file_origin=None,
                 coordinates=None, **annotations):
        '''
        Initialize a new :class:`ChannelIndex` instance.
        '''
        # Inherited initialization
        # Sets universally recommended attributes, and places all others
        # in annotations
        super(ChannelIndex, self).__init__(name=name,
                                           description=description,
                                           file_origin=file_origin,
                                           **annotations)

        # Defaults
        if channel_names is None:
            channel_names = np.array([], dtype='S')
        if channel_ids is None:
            channel_ids = np.array([], dtype='i')

        # Store recommended attributes
        self.channel_names = np.array(channel_names)
        self.channel_ids = np.array(channel_ids)
        self.index = np.array(index)
        self.coordinates = coordinates
elphyio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def load_bytes(self, data_blocks, dtype='<i1', start=None, end=None, expected_size=None):
        """
        Return list of bytes contained
        in the specified set of blocks.

        NB : load all data as files cannot exceed 4Gb 
             find later other solutions to spare memory.
        """
        chunks = list()
        raw = ''
        # keep only data blocks having
        # a size greater than zero
        blocks = [k for k in data_blocks if k.size > 0]
        for data_block in blocks :
            self.file.seek(data_block.start)
            raw = self.file.read(data_block.size)[0:expected_size]
            databytes = np.frombuffer(raw, dtype=dtype)
            chunks.append(databytes)
        # concatenate all chunks and return
        # the specified slice
        if len(chunks)>0 :
            databytes = np.concatenate(chunks)
            return databytes[start:end]
        else :
            return np.array([])
elphyio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def load_channel_data(self, ep, ch):
        """
        Return a numpy array containing the
        list of bytes corresponding to the
        specified episode and channel.
        """
        #memorise the sample size and symbol
        sample_size = self.sample_size(ep, ch)
        sample_symbol = self.sample_symbol(ep, ch)

        #create a bit mask to define which
        #sample to keep from the file
        bit_mask = self.create_bit_mask(ep, ch)

        #load all bytes contained in an episode
        data_blocks = self.get_data_blocks(ep)
        databytes = self.load_bytes(data_blocks)
        raw = self.filter_bytes(databytes, bit_mask)

        #reshape bytes from the sample size
        dt = np.dtype(numpy_map[sample_symbol])
        dt.newbyteorder('<')
        return np.frombuffer(raw.reshape([len(raw) / sample_size, sample_size]), dt)
elphyio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_signal_data(self, ep, ch):
        """
        Return a numpy array containing all samples of a
        signal, acquired on an Elphy analog channel, formatted
        as a list of (time, value) tuples.
        """
        #get data from the file
        y_data = self.load_encoded_data(ep, ch)
        x_data = np.arange(0, len(y_data))

        #create a recarray
        data = np.recarray(len(y_data), dtype=[('x', b_float), ('y', b_float)])

        #put in the recarray the scaled data
        x_factors = self.x_scale_factors(ep, ch)
        y_factors = self.y_scale_factors(ep, ch)
        data['x'] = x_factors.scale(x_data)
        data['y'] = y_factors.scale(y_data)

        return data
elphyio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_tag_data(self, ep, tag_ch):
        """
        Return a numpy array containing all samples of a
        signal, acquired on an Elphy tag channel, formatted
        as a list of (time, value) tuples.
        """
        #get data from the file
        y_data = self.load_encoded_tags(ep, tag_ch)
        x_data = np.arange(0, len(y_data))

        #create a recarray
        data = np.recarray(len(y_data), dtype=[('x', b_float), ('y', b_int)])

        #put in the recarray the scaled data
        factors = self.x_tag_scale_factors(ep)
        data['x'] = factors.scale(x_data)
        data['y'] = y_data
        return data
elphyio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_event(self, ep, ch, marked_ks):
        """
        Return a :class:`ElphyEvent` which is a
        descriptor of the specified event channel.
        """
        assert ep in range(1, self.n_episodes + 1)
        assert ch in range(1, self.n_channels + 1)

        # find the event channel number
        evt_channel = np.where(marked_ks == -1)[0][0]
        assert evt_channel in range(1, self.n_events(ep) + 1)

        block = self.episode_block(ep)
        ep_blocks = self.get_blocks_stored_in_episode(ep)
        evt_blocks = [k for k in ep_blocks if k.identifier == 'REVT']
        n_events = np.sum([k.n_events[evt_channel - 1] for k in evt_blocks], dtype=int)
        x_unit = block.ep_block.x_unit

        return ElphyEvent(self, ep, evt_channel, x_unit, n_events, ch_number=ch)
elphyio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load_encoded_events(self, episode, evt_channel, identifier):
        """
        Return times stored as a 4-bytes integer
        in the specified event channel.
        """
        data_blocks = self.group_blocks_of_type(episode, identifier)
        ep_blocks = self.get_blocks_stored_in_episode(episode)
        evt_blocks = [k for k in ep_blocks if k.identifier == identifier]

        #compute events on each channel
        n_events = np.sum([k.n_events for k in evt_blocks], dtype=int, axis=0)
        pre_events = np.sum(n_events[0:evt_channel - 1], dtype=int)
        start = pre_events
        end = start + n_events[evt_channel - 1]
        expected_size = 4 * np.sum(n_events, dtype=int)
        return self.load_bytes(data_blocks, dtype='<i4', start=start, end=end, expected_size=expected_size)
elphyio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def load_encoded_spikes(self, episode, evt_channel, identifier):
        """
        Return times stored as a 4-bytes integer
        in the specified spike channel.
        NB: it is meant for Blackrock-type, having an additional byte for each event time as spike sorting label.
            These additiona bytes are appended trailing the times.
        """
        # to load the requested spikes for the specified episode and event channel:
        # get all the elphy blocks having as identifier 'RSPK' (or whatever)
        all_rspk_blocks = [k for k in self.blocks if k.identifier == identifier]
        rspk_block = all_rspk_blocks[episode-1]
        # RDATA(h?dI) REVT(NbVeV:I, NbEv:256I ... spike data are 4byte integers
        rspk_header = 4*( rspk_block.size - rspk_block.data_size-2 + len(rspk_block.n_events))
        pre_events = np.sum(rspk_block.n_events[0:evt_channel-1], dtype=int, axis=0)
        # the real start is after header, preceeding events (which are 4byte) and preceeding labels (1byte)
        start = rspk_header + (4*pre_events) + pre_events
        end = start + 4*rspk_block.n_events[evt_channel-1]
        raw = self.load_bytes( [rspk_block], dtype='<i1', start=start, end=end, expected_size=rspk_block.size )
        # re-encoding after reading byte by byte
        res = np.frombuffer(raw[0:(4*rspk_block.n_events[evt_channel-1])], dtype='<i4')
        res.sort() # sometimes timings are not sorted
        #print "load_encoded_data() - spikes:",res
        return res
elphyio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def get_waveform_data(self, episode, electrode_id):
        """
        Return waveforms corresponding to the specified
        spike channel. This function is triggered when the
        ``waveforms`` property of an :class:`Spike` descriptor
        instance is accessed.
        """
        block = self.episode_block(episode)
        times, databytes = self.load_encoded_waveforms(episode, electrode_id)
        n_events, = databytes.shape
        wf_samples = databytes['waveform'].shape[1]
        dtype = [
            ('time', float),
            ('electrode_id', int),
            ('unit_id', int),
            ('waveform', float, (wf_samples, 2))
        ]
        data = np.empty(n_events, dtype=dtype)
        data['electrode_id'] = databytes['channel_id'][:, 0]
        data['unit_id'] = databytes['unit_id'][:, 0]
        data['time'] = databytes['elphy_time'][:, 0] * block.ep_block.dX
        data['waveform'][:, :, 0] = times * block.ep_block.dX
        data['waveform'][:, :, 1] = databytes['waveform'] * block.ep_block.dY_wf + block.ep_block.Y0_wf
        return data
elphyio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_rspk_data(self, spk_channel):
        """
        Return times stored as a 4-bytes integer
        in the specified event channel.
        """
        evt_blocks = self.get_blocks_of_type('RSPK')
        #compute events on each channel
        n_events = np.sum([k.n_events for k in evt_blocks], dtype=int, axis=0)
        pre_events = np.sum(n_events[0:spk_channel], dtype=int) # sum of array values up to spk_channel-1!!!!
        start = pre_events + (7 + len(n_events))# rspk header
        end = start + n_events[spk_channel]
        expected_size = 4 * np.sum(n_events, dtype=int) # constant
        return self.load_bytes(evt_blocks, dtype='<i4', start=start, end=end, expected_size=expected_size)


# ---------------------------------------------------------
# factories.py
neuralynxio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __mmap_ncs_packet_headers(self, filename):
        """
        Memory map of the Neuralynx .ncs file optimized for extraction of
        data packet headers
        Reading standard dtype improves speed, but timestamps need to be
        reconstructed
        """
        filesize = getsize(self.sessiondir + sep + filename)  # in byte
        if filesize > 16384:
            data = np.memmap(self.sessiondir + sep + filename,
                             dtype='<u4',
                             shape=((filesize - 16384) / 4 / 261, 261),
                             mode='r', offset=16384)

            ts = data[:, 0:2]
            multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
                              axis=0)
            timestamps = np.sum(ts * multi, axis=1)
            # timestamps = data[:,0] + (data[:,1] *2**32)
            header_u4 = data[:, 2:5]

            return timestamps, header_u4
        else:
            return None
neuralynxio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __mmap_ncs_packet_timestamps(self, filename):
        """
        Memory map of the Neuralynx .ncs file optimized for extraction of
        data packet headers
        Reading standard dtype improves speed, but timestamps need to be
        reconstructed
        """
        filesize = getsize(self.sessiondir + sep + filename)  # in byte
        if filesize > 16384:
            data = np.memmap(self.sessiondir + sep + filename,
                             dtype='<u4',
                             shape=(int((filesize - 16384) / 4 / 261), 261),
                             mode='r', offset=16384)

            ts = data[:, 0:2]
            multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
                              axis=0)
            timestamps = np.sum(ts * multi, axis=1)
            # timestamps = data[:,0] + data[:,1]*2**32

            return timestamps
        else:
            return None


问题


面经


文章

微信
公众号

扫码关注公众号