我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.dtype()。
def extract_images(filename): """Extract the images into a 4D uint8 numpy array [index, y, x, depth].""" print('Extracting', filename) with gzip.open(filename) as bytestream: magic = _read32(bytestream) if magic != 2051: raise ValueError( 'Invalid magic number %d in MNIST image file: %s' % (magic, filename)) num_images = _read32(bytestream) rows = _read32(bytestream) cols = _read32(bytestream) buf = bytestream.read(rows * cols * num_images) data = numpy.frombuffer(buf, dtype=numpy.uint8) data = data.reshape(num_images, rows, cols, 1) return data
def __keytransform__(self, key): if isinstance(key[0], np.ndarray): shape = key[0].shape dtype = key[0].dtype i = key[1] zero = True if len(key) == 2 else key[2] elif isinstance(key[0], tuple): if len(key) == 3: shape, dtype, i = key zero = True elif len(key) == 4: shape, dtype, i, zero = key else: raise TypeError("Wrong type of key for work array") assert isinstance(zero, bool) assert isinstance(i, int) self.fillzero = zero return (shape, np.dtype(dtype), i)
def accumulate_strings(values, name="strings"): """Accumulates strings into a vector. Args: values: A 1-d string tensor that contains values to add to the accumulator. Returns: A tuple (value_tensor, update_op). """ tf.assert_type(values, tf.string) strings = tf.Variable( name=name, initial_value=[], dtype=tf.string, trainable=False, collections=[], validate_shape=True) value_tensor = tf.identity(strings) update_op = tf.assign( ref=strings, value=tf.concat([strings, values], 0), validate_shape=False) return value_tensor, update_op
def test_expect_dtypes_with_tuple(self): allowed_dtypes = (dtype('datetime64[ns]'), dtype('float')) @expect_dtypes(a=allowed_dtypes) def foo(a, b): return a, b for d in allowed_dtypes: good_a = arange(3).astype(d) good_b = object() ret_a, ret_b = foo(good_a, good_b) self.assertIs(good_a, ret_a) self.assertIs(good_b, ret_b) with self.assertRaises(TypeError) as e: foo(arange(3, dtype='uint32'), object()) expected_message = ( "{qualname}() expected a value with dtype 'datetime64[ns]' " "or 'float64' for argument 'a', but got 'uint32' instead." ).format(qualname=qualname(foo)) self.assertEqual(e.exception.args[0], expected_message)
def _classify_gems(counts0, counts1): """ Infer number of distinct transcriptomes present in each GEM (1 or 2) and report cr_constants.GEM_CLASS_GENOME0 for a single cell w/ transcriptome 0, report cr_constants.GEM_CLASS_GENOME1 for a single cell w/ transcriptome 1, report cr_constants.GEM_CLASS_MULTIPLET for multiple transcriptomes """ # Assumes that most of the GEMs are single-cell; model counts independently thresh0, thresh1 = [cr_constants.DEFAULT_MULTIPLET_THRESHOLD] * 2 if sum(counts0 > counts1) >= 1 and sum(counts1 > counts0) >= 1: thresh0 = np.percentile(counts0[counts0 > counts1], cr_constants.MULTIPLET_PROB_THRESHOLD) thresh1 = np.percentile(counts1[counts1 > counts0], cr_constants.MULTIPLET_PROB_THRESHOLD) doublet = np.logical_and(counts0 >= thresh0, counts1 >= thresh1) dtype = np.dtype('|S%d' % max(len(cls) for cls in cr_constants.GEM_CLASSES)) result = np.where(doublet, cr_constants.GEM_CLASS_MULTIPLET, cr_constants.GEM_CLASS_GENOME0).astype(dtype) result[np.logical_and(np.logical_not(result == cr_constants.GEM_CLASS_MULTIPLET), counts1 > counts0)] = cr_constants.GEM_CLASS_GENOME1 return result
def widen_cat_column(old_ds, new_type): name = old_ds.name tmp_name = "__tmp_" + old_ds.name grp = old_ds.parent ds = grp.create_dataset(tmp_name, data = old_ds[:], shape = old_ds.shape, maxshape = (None,), dtype = new_type, compression = COMPRESSION, shuffle = True, chunks = (CHUNK_SIZE,)) del grp[name] grp.move(tmp_name, name) return ds
def create_levels(ds, levels): # Create a dataset in the LEVEL_GROUP # and store as native numpy / h5py types level_grp = ds.file.get(LEVEL_GROUP) if level_grp is None: # Create a LEVEL_GROUP level_grp = ds.file.create_group(LEVEL_GROUP) ds_name = ds.name.split("/")[-1] dt = h5py.special_dtype(vlen=str) level_grp.create_dataset(ds_name, shape = [len(levels)], maxshape = (None,), dtype = dt, data = levels, compression = COMPRESSION, chunks = (CHUNK_SIZE,))
def reg2bin_vector(begin, end): '''Vectorized tabix reg2bin -- much faster than reg2bin''' result = np.zeros(begin.shape) # Entries filled done = np.zeros(begin.shape, dtype=np.bool) for (bits, bins) in rev_bit_bins: begin_shift = begin >> bits new_done = (begin >> bits) == (end >> bits) mask = np.logical_and(new_done, np.logical_not(done)) offset = ((1 << (29 - bits)) - 1) / 7 result[mask] = offset + begin_shift[mask] done = new_done return result.astype(np.int32)
def flip_code(code): if isinstance(code, (numpy.dtype,type)): # since several things map to complex64 we must carefully select # the opposite that is an exact match (ticket 1518) if code == numpy.int8: return gdalconst.GDT_Byte if code == numpy.complex64: return gdalconst.GDT_CFloat32 for key, value in codes.items(): if value == code: return key return None else: try: return codes[code] except KeyError: return None
def gl_init(self,array_table): self.gl_hide = False self.gl_vertex_array = gl.VertexArray() glBindVertexArray(self.gl_vertex_array) self.gl_vertex_buffer = gl.Buffer() glBindBuffer(GL_ARRAY_BUFFER,self.gl_vertex_buffer) self.gl_element_count = 3*gl_count_triangles(self) self.gl_element_buffer = gl.Buffer() glBindBuffer(GL_ELEMENT_ARRAY_BUFFER,self.gl_element_buffer) vertex_type = numpy.dtype([array_table[attribute].field() for attribute in self.attributes]) vertex_count = sum(len(primitive.vertices) for primitive in self.primitives) vertex_array = numpy.empty(vertex_count,vertex_type) for attribute in self.attributes: array_table[attribute].load(self,vertex_array) vertex_array,element_map = numpy.unique(vertex_array,return_inverse=True) element_array = gl_create_element_array(self,element_map,self.gl_element_count) glBufferData(GL_ARRAY_BUFFER,vertex_array.nbytes,vertex_array,GL_STATIC_DRAW) glBufferData(GL_ELEMENT_ARRAY_BUFFER,element_array.nbytes,element_array,GL_STATIC_DRAW)
def make2d(array, cols=None, dtype=None): ''' Make a 2D array from an array of arrays. The `cols' and `dtype' arguments can be omitted if the array is not empty. ''' if (cols is None or dtype is None) and not len(array): raise RuntimeError("cols and dtype must be specified for empty " "array") if cols is None: cols = len(array[0]) if dtype is None: dtype = array[0].dtype return _np.fromiter(array, [('_', dtype, (cols,))], count=len(array))['_']
def _read(self, stream, text, byte_order): ''' Read the actual data from a PLY file. ''' if text: self._read_txt(stream) else: if self._have_list: # There are list properties, so a simple load is # impossible. self._read_bin(stream, byte_order) else: # There are no list properties, so loading the data is # much more straightforward. self._data = _np.fromfile(stream, self.dtype(byte_order), self.count) if len(self._data) < self.count: k = len(self._data) del self._data raise PlyParseError("early end-of-file", self, k) self._check_sanity()
def _read_bin(self, stream, byte_order): ''' Load a PLY element from a binary PLY file. The element may contain list properties. ''' self._data = _np.empty(self.count, dtype=self.dtype(byte_order)) for k in _range(self.count): for prop in self.properties: try: self._data[prop.name][k] = \ prop._read_bin(stream, byte_order) except StopIteration: raise PlyParseError("early end-of-file", self, k, prop)
def _merge_all(parts, dtype): if len(parts) == 1: return parts[0] else: nparts = [] for i in xrange(0, len(parts), 2): if i+1 < len(parts): npart = numpy.empty((len(parts[i])+len(parts[i+1]), 2), dtype) merge_elements = index_merge(parts[i], parts[i+1], npart) if merge_elements != len(npart): npart = npart[:merge_elements] nparts.append(npart) else: nparts.append(parts[i]) del parts return _merge_all(nparts, dtype)
def __init__(self, buf, offset = 0): # Accelerate class attributes self._encode = self.encode self._dtype = self.dtype self._xxh = self.xxh # Initialize buffer if offset: self._buf = self._likebuf = buffer(buf, offset) else: self._buf = buf self._likebuf = _likebuffer(buf) # Parse header and map index self.index_elements, self.index_offset = self._Header.unpack_from(self._buf, 0) self.index = numpy.ndarray(buffer = self._buf, offset = self.index_offset, dtype = self.dtype, shape = (self.index_elements, 3))
def test_rescaleData(): dtypes = map(np.dtype, ('ubyte', 'uint16', 'byte', 'int16', 'int', 'float')) for dtype1 in dtypes: for dtype2 in dtypes: data = (np.random.random(size=10) * 2**32 - 2**31).astype(dtype1) for scale, offset in [(10, 0), (10., 0.), (1, -50), (0.2, 0.5), (0.001, 0)]: if dtype2.kind in 'iu': lim = np.iinfo(dtype2) lim = lim.min, lim.max else: lim = (-np.inf, np.inf) s1 = np.clip(float(scale) * (data-float(offset)), *lim).astype(dtype2) s2 = pg.rescaleData(data, scale, offset, dtype2) assert s1.dtype == s2.dtype if dtype2.kind in 'iu': assert np.all(s1 == s2) else: assert np.allclose(s1, s2)
def solve3DTransform(points1, points2): """ Find a 3D transformation matrix that maps points1 onto points2. Points must be specified as either lists of 4 Vectors or (4, 3) arrays. """ import numpy.linalg pts = [] for inp in (points1, points2): if isinstance(inp, np.ndarray): A = np.empty((4,4), dtype=float) A[:,:3] = inp[:,:3] A[:,3] = 1.0 else: A = np.array([[inp[i].x(), inp[i].y(), inp[i].z(), 1] for i in range(4)]) pts.append(A) ## solve 3 sets of linear equations to determine transformation matrix elements matrix = np.zeros((4,4)) for i in range(3): ## solve Ax = B; x is one row of the desired transformation matrix matrix[i] = numpy.linalg.solve(pts[0], pts[1][:,i]) return matrix
def __init__(self, index, channel_names=None, channel_ids=None, name=None, description=None, file_origin=None, coordinates=None, **annotations): ''' Initialize a new :class:`ChannelIndex` instance. ''' # Inherited initialization # Sets universally recommended attributes, and places all others # in annotations super(ChannelIndex, self).__init__(name=name, description=description, file_origin=file_origin, **annotations) # Defaults if channel_names is None: channel_names = np.array([], dtype='S') if channel_ids is None: channel_ids = np.array([], dtype='i') # Store recommended attributes self.channel_names = np.array(channel_names) self.channel_ids = np.array(channel_ids) self.index = np.array(index) self.coordinates = coordinates
def load_bytes(self, data_blocks, dtype='<i1', start=None, end=None, expected_size=None): """ Return list of bytes contained in the specified set of blocks. NB : load all data as files cannot exceed 4Gb find later other solutions to spare memory. """ chunks = list() raw = '' # keep only data blocks having # a size greater than zero blocks = [k for k in data_blocks if k.size > 0] for data_block in blocks : self.file.seek(data_block.start) raw = self.file.read(data_block.size)[0:expected_size] databytes = np.frombuffer(raw, dtype=dtype) chunks.append(databytes) # concatenate all chunks and return # the specified slice if len(chunks)>0 : databytes = np.concatenate(chunks) return databytes[start:end] else : return np.array([])
def load_channel_data(self, ep, ch): """ Return a numpy array containing the list of bytes corresponding to the specified episode and channel. """ #memorise the sample size and symbol sample_size = self.sample_size(ep, ch) sample_symbol = self.sample_symbol(ep, ch) #create a bit mask to define which #sample to keep from the file bit_mask = self.create_bit_mask(ep, ch) #load all bytes contained in an episode data_blocks = self.get_data_blocks(ep) databytes = self.load_bytes(data_blocks) raw = self.filter_bytes(databytes, bit_mask) #reshape bytes from the sample size dt = np.dtype(numpy_map[sample_symbol]) dt.newbyteorder('<') return np.frombuffer(raw.reshape([len(raw) / sample_size, sample_size]), dt)
def get_signal_data(self, ep, ch): """ Return a numpy array containing all samples of a signal, acquired on an Elphy analog channel, formatted as a list of (time, value) tuples. """ #get data from the file y_data = self.load_encoded_data(ep, ch) x_data = np.arange(0, len(y_data)) #create a recarray data = np.recarray(len(y_data), dtype=[('x', b_float), ('y', b_float)]) #put in the recarray the scaled data x_factors = self.x_scale_factors(ep, ch) y_factors = self.y_scale_factors(ep, ch) data['x'] = x_factors.scale(x_data) data['y'] = y_factors.scale(y_data) return data
def get_tag_data(self, ep, tag_ch): """ Return a numpy array containing all samples of a signal, acquired on an Elphy tag channel, formatted as a list of (time, value) tuples. """ #get data from the file y_data = self.load_encoded_tags(ep, tag_ch) x_data = np.arange(0, len(y_data)) #create a recarray data = np.recarray(len(y_data), dtype=[('x', b_float), ('y', b_int)]) #put in the recarray the scaled data factors = self.x_tag_scale_factors(ep) data['x'] = factors.scale(x_data) data['y'] = y_data return data
def get_event(self, ep, ch, marked_ks): """ Return a :class:`ElphyEvent` which is a descriptor of the specified event channel. """ assert ep in range(1, self.n_episodes + 1) assert ch in range(1, self.n_channels + 1) # find the event channel number evt_channel = np.where(marked_ks == -1)[0][0] assert evt_channel in range(1, self.n_events(ep) + 1) block = self.episode_block(ep) ep_blocks = self.get_blocks_stored_in_episode(ep) evt_blocks = [k for k in ep_blocks if k.identifier == 'REVT'] n_events = np.sum([k.n_events[evt_channel - 1] for k in evt_blocks], dtype=int) x_unit = block.ep_block.x_unit return ElphyEvent(self, ep, evt_channel, x_unit, n_events, ch_number=ch)
def load_encoded_events(self, episode, evt_channel, identifier): """ Return times stored as a 4-bytes integer in the specified event channel. """ data_blocks = self.group_blocks_of_type(episode, identifier) ep_blocks = self.get_blocks_stored_in_episode(episode) evt_blocks = [k for k in ep_blocks if k.identifier == identifier] #compute events on each channel n_events = np.sum([k.n_events for k in evt_blocks], dtype=int, axis=0) pre_events = np.sum(n_events[0:evt_channel - 1], dtype=int) start = pre_events end = start + n_events[evt_channel - 1] expected_size = 4 * np.sum(n_events, dtype=int) return self.load_bytes(data_blocks, dtype='<i4', start=start, end=end, expected_size=expected_size)
def load_encoded_spikes(self, episode, evt_channel, identifier): """ Return times stored as a 4-bytes integer in the specified spike channel. NB: it is meant for Blackrock-type, having an additional byte for each event time as spike sorting label. These additiona bytes are appended trailing the times. """ # to load the requested spikes for the specified episode and event channel: # get all the elphy blocks having as identifier 'RSPK' (or whatever) all_rspk_blocks = [k for k in self.blocks if k.identifier == identifier] rspk_block = all_rspk_blocks[episode-1] # RDATA(h?dI) REVT(NbVeV:I, NbEv:256I ... spike data are 4byte integers rspk_header = 4*( rspk_block.size - rspk_block.data_size-2 + len(rspk_block.n_events)) pre_events = np.sum(rspk_block.n_events[0:evt_channel-1], dtype=int, axis=0) # the real start is after header, preceeding events (which are 4byte) and preceeding labels (1byte) start = rspk_header + (4*pre_events) + pre_events end = start + 4*rspk_block.n_events[evt_channel-1] raw = self.load_bytes( [rspk_block], dtype='<i1', start=start, end=end, expected_size=rspk_block.size ) # re-encoding after reading byte by byte res = np.frombuffer(raw[0:(4*rspk_block.n_events[evt_channel-1])], dtype='<i4') res.sort() # sometimes timings are not sorted #print "load_encoded_data() - spikes:",res return res
def get_waveform_data(self, episode, electrode_id): """ Return waveforms corresponding to the specified spike channel. This function is triggered when the ``waveforms`` property of an :class:`Spike` descriptor instance is accessed. """ block = self.episode_block(episode) times, databytes = self.load_encoded_waveforms(episode, electrode_id) n_events, = databytes.shape wf_samples = databytes['waveform'].shape[1] dtype = [ ('time', float), ('electrode_id', int), ('unit_id', int), ('waveform', float, (wf_samples, 2)) ] data = np.empty(n_events, dtype=dtype) data['electrode_id'] = databytes['channel_id'][:, 0] data['unit_id'] = databytes['unit_id'][:, 0] data['time'] = databytes['elphy_time'][:, 0] * block.ep_block.dX data['waveform'][:, :, 0] = times * block.ep_block.dX data['waveform'][:, :, 1] = databytes['waveform'] * block.ep_block.dY_wf + block.ep_block.Y0_wf return data
def get_rspk_data(self, spk_channel): """ Return times stored as a 4-bytes integer in the specified event channel. """ evt_blocks = self.get_blocks_of_type('RSPK') #compute events on each channel n_events = np.sum([k.n_events for k in evt_blocks], dtype=int, axis=0) pre_events = np.sum(n_events[0:spk_channel], dtype=int) # sum of array values up to spk_channel-1!!!! start = pre_events + (7 + len(n_events))# rspk header end = start + n_events[spk_channel] expected_size = 4 * np.sum(n_events, dtype=int) # constant return self.load_bytes(evt_blocks, dtype='<i4', start=start, end=end, expected_size=expected_size) # --------------------------------------------------------- # factories.py
def __mmap_ncs_packet_headers(self, filename): """ Memory map of the Neuralynx .ncs file optimized for extraction of data packet headers Reading standard dtype improves speed, but timestamps need to be reconstructed """ filesize = getsize(self.sessiondir + sep + filename) # in byte if filesize > 16384: data = np.memmap(self.sessiondir + sep + filename, dtype='<u4', shape=((filesize - 16384) / 4 / 261, 261), mode='r', offset=16384) ts = data[:, 0:2] multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data), axis=0) timestamps = np.sum(ts * multi, axis=1) # timestamps = data[:,0] + (data[:,1] *2**32) header_u4 = data[:, 2:5] return timestamps, header_u4 else: return None
def __mmap_ncs_packet_timestamps(self, filename): """ Memory map of the Neuralynx .ncs file optimized for extraction of data packet headers Reading standard dtype improves speed, but timestamps need to be reconstructed """ filesize = getsize(self.sessiondir + sep + filename) # in byte if filesize > 16384: data = np.memmap(self.sessiondir + sep + filename, dtype='<u4', shape=(int((filesize - 16384) / 4 / 261), 261), mode='r', offset=16384) ts = data[:, 0:2] multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data), axis=0) timestamps = np.sum(ts * multi, axis=1) # timestamps = data[:,0] + data[:,1]*2**32 return timestamps else: return None
def __mmap_nev_file(self, filename): """ Memory map the Neuralynx .nev file """ nev_dtype = np.dtype([ ('reserved', '<i2'), ('system_id', '<i2'), ('data_size', '<i2'), ('timestamp', '<u8'), ('event_id', '<i2'), ('ttl_input', '<i2'), ('crc_check', '<i2'), ('dummy1', '<i2'), ('dummy2', '<i2'), ('extra', '<i4', (8,)), ('event_string', 'a128'), ]) if getsize(self.sessiondir + sep + filename) > 16384: return np.memmap(self.sessiondir + sep + filename, dtype=nev_dtype, mode='r', offset=16384) else: return None
def __extract_nev_file_spec(self): """ Extract file specification from an .nsx file """ filename = '.'.join([self._filenames['nsx'], 'nev']) # Header structure of files specification 2.2 and higher. For files 2.1 # and lower, the entries ver_major and ver_minor are not supported. dt0 = [ ('file_id', 'S8'), ('ver_major', 'uint8'), ('ver_minor', 'uint8')] nev_file_id = np.fromfile(filename, count=1, dtype=dt0)[0] if nev_file_id['file_id'].decode() == 'NEURALEV': spec = '{0}.{1}'.format( nev_file_id['ver_major'], nev_file_id['ver_minor']) else: raise IOError('NEV file type {0} is not supported'.format( nev_file_id['file_id'])) return spec
def __read_nsx_data_variant_a(self, nsx_nb): """ Extract nsx data from a 2.1 .nsx file """ filename = '.'.join([self._filenames['nsx'], 'ns%i' % nsx_nb]) # get shape of data shape = ( self.__nsx_databl_param['2.1']('nb_data_points', nsx_nb), self.__nsx_basic_header[nsx_nb]['channel_count']) offset = self.__nsx_params['2.1']('bytes_in_headers', nsx_nb) # read nsx data # store as dict for compatibility with higher file specs data = {1: np.memmap( filename, dtype='int16', shape=shape, offset=offset)} return data
def __read_nev_data(self, nev_data_masks, nev_data_types): """ Extract nev data from a 2.1 or 2.2 .nev file """ filename = '.'.join([self._filenames['nev'], 'nev']) data_size = self.__nev_basic_header['bytes_in_data_packets'] header_size = self.__nev_basic_header['bytes_in_headers'] # read all raw data packets and markers dt0 = [ ('timestamp', 'uint32'), ('packet_id', 'uint16'), ('value', 'S{0}'.format(data_size - 6))] raw_data = np.memmap(filename, offset=header_size, dtype=dt0) masks = self.__nev_data_masks(raw_data['packet_id']) types = self.__nev_data_types(data_size) data = {} for k, v in nev_data_masks.items(): data[k] = raw_data.view(types[k][nev_data_types[k]])[masks[k][v]] return data
def __get_nev_rec_times(self): """ Extracts minimum and maximum time points from a nev file. """ filename = '.'.join([self._filenames['nev'], 'nev']) dt = [('timestamp', 'uint32')] offset = \ self.__get_file_size(filename) - \ self.__nev_params('bytes_in_data_packets') last_data_packet = np.memmap(filename, offset=offset, dtype=dt)[0] n_starts = [0 * self.__nev_params('event_unit')] n_stops = [ last_data_packet['timestamp'] * self.__nev_params('event_unit')] return n_starts, n_stops
def __get_waveforms_dtype(self): """ Extracts the actual waveform dtype set for each channel. """ # Blackrock code giving the approiate dtype conv = {0: 'int8', 1: 'int8', 2: 'int16', 4: 'int32'} # get all electrode ids from nev ext header all_el_ids = self.__nev_ext_header[b'NEUEVWAV']['electrode_id'] # get the dtype of waveform (this is stupidly complicated) if self.__is_set( np.array(self.__nev_basic_header['additionnal_flags']), 0): dtype_waveforms = dict((k, 'int16') for k in all_el_ids) else: # extract bytes per waveform waveform_bytes = \ self.__nev_ext_header[b'NEUEVWAV']['bytes_per_waveform'] # extract dtype for waveforms fro each electrode dtype_waveforms = dict(zip(all_el_ids, conv[waveform_bytes])) return dtype_waveforms
def __read_comment(self,n_start,n_stop,data,lazy=False): event_unit = self.__nev_params('event_unit') if lazy: times = [] labels = np.array([],dtype='s') else: times = data['timestamp']*event_unit labels = data['comment'].astype(str) # mask for given time interval mask = (times >= n_start) & (times < n_stop) if np.sum(mask)>0: ev = Event( times = times[mask].astype(float), labels = labels[mask], name = 'comment') if lazy: ev.lazy_shape = np.sum(mask) else: ev = None return ev # --------------end------added by zhangbo 20170926--------
def reformat_integer_v1(data, nbchannel, header): """ reformat when dtype is int16 for ABF version 1 """ chans = [chan_num for chan_num in header['nADCSamplingSeq'] if chan_num >= 0] for n, i in enumerate(chans[:nbchannel]): # respect SamplingSeq data[:, n] /= header['fInstrumentScaleFactor'][i] data[:, n] /= header['fSignalGain'][i] data[:, n] /= header['fADCProgrammableGain'][i] if header['nTelegraphEnable'][i]: data[:, n] /= header['fTelegraphAdditGain'][i] data[:, n] *= header['fADCRange'] data[:, n] /= header['lADCResolution'] data[:, n] += header['fInstrumentOffset'][i] data[:, n] -= header['fSignalOffset'][i]
def get_spiketrain(self, episode, electrode_id): """ Return a :class:`Spike` which is a descriptor of the specified spike channel. """ assert episode in range(1, self.n_episodes + 1) assert electrode_id in range(1, self.n_spiketrains(episode) + 1) # get some properties stored in the episode sub-block block = self.episode_block(episode) x_unit = block.ep_block.x_unit x_unit_wf = getattr(block.ep_block, 'x_unit_wf', None) y_unit_wf = getattr(block.ep_block, 'y_unit_wf', None) # number of spikes in the entire episode spk_blocks = [k for k in self.blocks if k.identifier == 'RSPK'] n_events = np.sum([k.n_events[electrode_id - 1] for k in spk_blocks], dtype=int) # number of samples in a waveform wf_sampling_frequency = 1.0 / block.ep_block.dX wf_blocks = [k for k in self.blocks if k.identifier == 'RspkWave'] if wf_blocks : wf_samples = wf_blocks[0].wavelength t_start = wf_blocks[0].pre_trigger * block.ep_block.dX else: wf_samples = 0 t_start = 0 return ElphySpikeTrain(self, episode, electrode_id, x_unit, n_events, wf_sampling_frequency, wf_samples, x_unit_wf, y_unit_wf, t_start)