def _read_binary_i_16(
task_handle, read_array, num_samps_per_chan, timeout,
fill_mode=FillMode.GROUP_BY_CHANNEL):
samps_per_chan_read = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxReadBinaryI16
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int, ctypes.c_double,
ctypes.c_int,
wrapped_ndpointer(dtype=numpy.int16, flags=('C', 'W')),
ctypes.c_uint, ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, num_samps_per_chan, timeout, fill_mode.value,
read_array, numpy.prod(read_array.shape),
ctypes.byref(samps_per_chan_read), None)
check_for_error(error_code)
return samps_per_chan_read.value
python类c_uint()的实例源码
def _read_binary_u_16(
task_handle, read_array, num_samps_per_chan, timeout,
fill_mode=FillMode.GROUP_BY_CHANNEL):
samps_per_chan_read = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxReadBinaryU16
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int, ctypes.c_double,
ctypes.c_int,
wrapped_ndpointer(dtype=numpy.uint16, flags=('C', 'W')),
ctypes.c_uint, ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, num_samps_per_chan, timeout, fill_mode.value,
read_array, numpy.prod(read_array.shape),
ctypes.byref(samps_per_chan_read), None)
check_for_error(error_code)
return samps_per_chan_read.value
def _read_binary_i_32(
task_handle, read_array, num_samps_per_chan, timeout,
fill_mode=FillMode.GROUP_BY_CHANNEL):
samps_per_chan_read = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxReadBinaryI32
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int, ctypes.c_double,
ctypes.c_int,
wrapped_ndpointer(dtype=numpy.int32, flags=('C', 'W')),
ctypes.c_uint, ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, num_samps_per_chan, timeout, fill_mode.value,
read_array, numpy.prod(read_array.shape),
ctypes.byref(samps_per_chan_read), None)
check_for_error(error_code)
return samps_per_chan_read.value
def _read_binary_u_32(
task_handle, read_array, num_samps_per_chan, timeout,
fill_mode=FillMode.GROUP_BY_CHANNEL):
samps_per_chan_read = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxReadBinaryU32
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int, ctypes.c_double,
ctypes.c_int,
wrapped_ndpointer(dtype=numpy.uint32, flags=('C', 'W')),
ctypes.c_uint, ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, num_samps_per_chan, timeout, fill_mode.value,
read_array, numpy.prod(read_array.shape),
ctypes.byref(samps_per_chan_read), None)
check_for_error(error_code)
return samps_per_chan_read.value
def _read_digital_u_8(
task_handle, read_array, num_samps_per_chan, timeout,
fill_mode=FillMode.GROUP_BY_CHANNEL):
samps_per_chan_read = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxReadDigitalU8
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int, ctypes.c_double,
ctypes.c_int,
wrapped_ndpointer(dtype=numpy.uint8, flags=('C', 'W')),
ctypes.c_uint, ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, num_samps_per_chan, timeout, fill_mode.value,
read_array, numpy.prod(read_array.shape),
ctypes.byref(samps_per_chan_read), None)
check_for_error(error_code)
return samps_per_chan_read.value
def _read_digital_u_32(
task_handle, read_array, num_samps_per_chan, timeout,
fill_mode=FillMode.GROUP_BY_CHANNEL):
samps_per_chan_read = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxReadDigitalU32
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int, ctypes.c_double,
ctypes.c_int,
wrapped_ndpointer(dtype=numpy.uint32, flags=('C', 'W')),
ctypes.c_uint, ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, num_samps_per_chan, timeout, fill_mode.value,
read_array, numpy.prod(read_array.shape),
ctypes.byref(samps_per_chan_read), None)
check_for_error(error_code)
return samps_per_chan_read.value
def _read_digital_scalar_u_32(task_handle, timeout):
value = ctypes.c_uint()
cfunc = lib_importer.windll.DAQmxReadDigitalScalarU32
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_double,
ctypes.POINTER(ctypes.c_uint), ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, timeout, ctypes.byref(value), None)
check_for_error(error_code)
return value.value
def _read_counter_f_64(task_handle, read_array, num_samps_per_chan, timeout):
samps_per_chan_read = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxReadCounterF64
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int, ctypes.c_double,
wrapped_ndpointer(dtype=numpy.float64, flags=('C', 'W')),
ctypes.c_uint, ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, num_samps_per_chan, timeout,
read_array, numpy.prod(read_array.shape),
ctypes.byref(samps_per_chan_read), None)
check_for_error(error_code)
return samps_per_chan_read.value
def _read_counter_u_32(task_handle, read_array, num_samps_per_chan, timeout):
samps_per_chan_read = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxReadCounterU32
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int, ctypes.c_double,
wrapped_ndpointer(dtype=numpy.uint32, flags=('C', 'W')),
ctypes.c_uint, ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, num_samps_per_chan, timeout,
read_array, numpy.prod(read_array.shape),
ctypes.byref(samps_per_chan_read), None)
check_for_error(error_code)
return samps_per_chan_read.value
def _read_counter_u_32_ex(
task_handle, read_array, num_samps_per_chan, timeout,
fill_mode=FillMode.GROUP_BY_CHANNEL):
samps_per_chan_read = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxReadCounterU32Ex
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int, ctypes.c_double,
ctypes.c_int,
wrapped_ndpointer(dtype=numpy.uint32, flags=('C', 'W')),
ctypes.c_uint, ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, num_samps_per_chan, timeout, fill_mode.value,
read_array, numpy.prod(read_array.shape),
ctypes.byref(samps_per_chan_read), None)
check_for_error(error_code)
return samps_per_chan_read.value
def _read_counter_scalar_u_32(task_handle, timeout):
value = ctypes.c_uint()
cfunc = lib_importer.windll.DAQmxReadCounterScalarU32
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_double,
ctypes.POINTER(ctypes.c_uint), ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, timeout, ctypes.byref(value), None)
check_for_error(error_code)
return value.value
def _read_ctr_freq(
task_handle, freq, duty_cycle, num_samps_per_chan, timeout,
interleaved=FillMode.GROUP_BY_CHANNEL):
samps_per_chan_read = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxReadCtrFreq
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int, ctypes.c_double,
ctypes.c_int,
wrapped_ndpointer(dtype=numpy.float64, flags=('C', 'W')),
wrapped_ndpointer(dtype=numpy.float64, flags=('C', 'W')),
ctypes.c_uint, ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, num_samps_per_chan, timeout, interleaved.value,
freq, duty_cycle, numpy.prod(freq.shape),
ctypes.byref(samps_per_chan_read), None)
check_for_error(error_code)
return samps_per_chan_read.value
def _read_ctr_time(
task_handle, high_time, low_time, num_samps_per_chan, timeout,
interleaved=FillMode.GROUP_BY_CHANNEL):
samps_per_chan_read = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxReadCtrTime
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int, ctypes.c_double,
ctypes.c_int,
wrapped_ndpointer(dtype=numpy.float64, flags=('C', 'W')),
wrapped_ndpointer(dtype=numpy.float64, flags=('C', 'W')),
ctypes.c_uint, ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, num_samps_per_chan, timeout, interleaved.value,
high_time, low_time, numpy.prod(high_time.shape),
ctypes.byref(samps_per_chan_read), None)
check_for_error(error_code)
return samps_per_chan_read.value
def _read_raw(task_handle, read_array, num_samps_per_chan, timeout):
samples_read = ctypes.c_int()
number_of_bytes_per_sample = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxReadRaw
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int, ctypes.c_double,
wrapped_ndpointer(dtype=read_array.dtype, flags=('C', 'W')),
ctypes.c_uint, ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(ctypes.c_int), ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, num_samps_per_chan, timeout, read_array,
read_array.nbytes, ctypes.byref(samples_read),
ctypes.byref(number_of_bytes_per_sample), None)
check_for_error(error_code)
return samples_read.value, number_of_bytes_per_sample.value
def avail_samp_per_chan(self):
"""
int: Indicates the number of samples available to read per
channel. This value is the same for all channels in the
task.
"""
val = ctypes.c_uint()
cfunc = lib_importer.windll.DAQmxGetReadAvailSampPerChan
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle,
ctypes.POINTER(ctypes.c_uint)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
def common_mode_range_error_chans(self):
"""
List[str]: Indicates a list of names of any virtual channels in
the task for which the device(s) detected a common mode
range violation. You must read
**common_mode_range_error_chans_exist** before you read this
property. Otherwise, you will receive an error.
"""
cfunc = lib_importer.windll.DAQmxGetReadCommonModeRangeErrorChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 2048
val = ctypes.create_string_buffer(temp_size)
error_code = cfunc(
self._handle, val, temp_size)
check_for_error(error_code)
return unflatten_channel_string(val.value.decode('ascii'))
def input_buf_size(self):
"""
int: Specifies the number of samples the input buffer can hold
for each channel in the task. Zero indicates to allocate no
buffer. Use a buffer size of 0 to perform a hardware-timed
operation without using a buffer. Setting this property
overrides the automatic input buffer allocation that NI-
DAQmx performs.
"""
val = ctypes.c_uint()
cfunc = lib_importer.windll.DAQmxGetBufInputBufSize
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle,
ctypes.POINTER(ctypes.c_uint)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
def input_onbrd_buf_size(self):
"""
int: Indicates in samples per channel the size of the onboard
input buffer of the device.
"""
val = ctypes.c_uint()
cfunc = lib_importer.windll.DAQmxGetBufInputOnbrdBufSize
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle,
ctypes.POINTER(ctypes.c_uint)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
def num_chans(self):
"""
int: Indicates the number of channels that DAQmx Read reads from
the task. This value is the number of channels in the task
or the number of channels you specify with
**channels_to_read**.
"""
val = ctypes.c_uint()
cfunc = lib_importer.windll.DAQmxGetReadNumChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle,
ctypes.POINTER(ctypes.c_uint)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
def raw_data_width(self):
"""
int: Indicates in bytes the size of a raw sample from the task.
"""
val = ctypes.c_uint()
cfunc = lib_importer.windll.DAQmxGetReadRawDataWidth
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle,
ctypes.POINTER(ctypes.c_uint)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value