def common_mode_range_error_chans(self):
"""
List[str]: Indicates a list of names of any virtual channels in
the task for which the device(s) detected a common mode
range violation. You must read
**common_mode_range_error_chans_exist** before you read this
property. Otherwise, you will receive an error.
"""
cfunc = lib_importer.windll.DAQmxGetReadCommonModeRangeErrorChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 2048
val = ctypes.create_string_buffer(temp_size)
error_code = cfunc(
self._handle, val, temp_size)
check_for_error(error_code)
return unflatten_channel_string(val.value.decode('ascii'))
python类create_string_buffer()的实例源码
def gcsCommand(self, commandAsString):
self._lib.PI_GcsCommandset.argtypes= [c_int, c_char_p]
self._lib.PI_GcsGetAnswer.argtypes= [c_int, c_char_p, c_int]
self._convertErrorToException(
self._lib.PI_GcsCommandset(self._id, commandAsString))
self._trickToCheckForSyntaxError()
retSize= c_int()
res= ''
self._convertErrorToException(
self._lib.PI_GcsGetAnswerSize(self._id, ctypes.byref(retSize)))
while retSize.value != 0:
buf= ctypes.create_string_buffer('\000', retSize.value)
self._convertErrorToException(
self._lib.PI_GcsGetAnswer(self._id, buf, retSize.value))
res+= buf.value
self._convertErrorToException(
self._lib.PI_GcsGetAnswerSize(self._id, ctypes.byref(retSize)))
return res
def getVolatileMemoryParameters(self, itemId, parameterId):
self._lib.PI_qSPA.argtypes= [c_int,
c_char_p,
CUnsignedIntArray,
CDoubleArray,
c_char_p,
c_int]
retValue= CDoubleArray([0.])
bufSize= 256
retString= ctypes.create_string_buffer('\000', bufSize)
self._convertErrorToException(
self._lib.PI_qSPA(
self._id, str(itemId),
CUnsignedIntArray([parameterId]),
retValue,
retString,
bufSize))
return retValue.toNumpyArray()
def to_rain(cls, val):
if val is None:
return cls.new(typi.null, 0, 0, cls.null)
elif val is False:
return cls.new(typi.bool, 0, 0, cls.null)
elif val is True:
return cls.new(typi.bool, 0, 1, cls.null)
elif isinstance(val, int):
return cls.new(typi.int, 0, val, cls.null)
elif isinstance(val, float):
raw = struct.pack('d', val)
intrep = struct.unpack('Q', raw)[0]
return cls.new(typi.float, 0, intrep, cls.null)
elif isinstance(val, str):
str_p = ct.create_string_buffer(val.encode('utf-8'))
cls._saves_.append(str_p)
return cls.new(typi.str, len(val), ct.cast(str_p, ct.c_void_p).value, cls.null)
raise Exception("Can't convert value {!r} to Rain".format(val))
def _ipconfig_getnode():
"""Get the hardware address on Windows by running ipconfig.exe."""
import os, re
dirs = ['', r'c:\windows\system32', r'c:\winnt\system32']
try:
import ctypes
buffer = ctypes.create_string_buffer(300)
ctypes.windll.kernel32.GetSystemDirectoryA(buffer, 300)
dirs.insert(0, buffer.value.decode('mbcs'))
except:
pass
for dir in dirs:
try:
pipe = os.popen(os.path.join(dir, 'ipconfig') + ' /all')
except IOError:
continue
else:
for line in pipe:
value = line.split(':')[-1].strip().lower()
if re.match('([0-9a-f][0-9a-f]-){5}[0-9a-f][0-9a-f]', value):
return int(value.replace('-', ''), 16)
finally:
pipe.close()
def uuid4():
"""Generate a random UUID."""
# When the system provides a version-4 UUID generator, use it.
if _uuid_generate_random:
_buffer = ctypes.create_string_buffer(16)
_uuid_generate_random(_buffer)
return UUID(bytes=_buffer.raw)
# Otherwise, get randomness from urandom or the 'random' module.
try:
import os
return UUID(bytes=os.urandom(16), version=4)
except:
import random
bytes = [chr(random.randrange(256)) for i in range(16)]
return UUID(bytes=bytes, version=4)
def link(self):
# link the program
glLinkProgram(self.handle)
temp = c_int(0)
# retrieve the link status
glGetProgramiv(self.handle, GL_LINK_STATUS, byref(temp))
# if linking failed, print the log
if not temp:
# retrieve the log length
glGetProgramiv(self.handle, GL_INFO_LOG_LENGTH, byref(temp))
# create a buffer for the log
buffer = create_string_buffer(temp.value)
# retrieve the log text
glGetProgramInfoLog(self.handle, temp, None, buffer)
# print the log to the console
print(buffer.value)
else:
# all is well, so we are linked
self.linked = True
def _get_terminal_size(fd):
handle = windll.kernel32.GetStdHandle(_handle_ids[fd])
if handle == 0:
raise OSError('handle cannot be retrieved')
if handle == -1:
raise WinError()
csbi = create_string_buffer(22)
res = windll.kernel32.GetConsoleScreenBufferInfo(handle, csbi)
if res:
res = struct.unpack("hhhhHhhhhhh", csbi.raw)
left, top, right, bottom = res[5:9]
columns = right - left + 1
lines = bottom - top + 1
return terminal_size(columns, lines)
else:
raise WinError()
def __init__(self, shape, typechar, itemsize):
import ctypes
ndim = len(shape)
self.ndim = ndim
self.shape = tuple(shape)
array_len = 1
for d in shape:
array_len *= d
self.size = itemsize * array_len
self.parent = ctypes.create_string_buffer(self.size)
self.itemsize = itemsize
strides = [itemsize] * ndim
for i in range(ndim - 1, 0, -1):
strides[i - 1] = strides[i] * shape[i]
self.strides = tuple(strides)
self.data = ctypes.addressof(self.parent), False
if self.itemsize == 1:
byteorder = '|'
elif sys.byteorder == 'big':
byteorder = '>'
else:
byteorder = '<'
self.typestr = byteorder + typechar + str(self.itemsize)
def oni_call(func):
@functools.wraps(func)
def wrapper(*args):
res = func(*args)
if res != OniStatus.ONI_STATUS_OK:
msg = oniGetExtendedError()
if not msg:
msg = ''
buf = ctypes.create_string_buffer(1024)
rc = _oniGetLogFileName(buf, ctypes.sizeof(buf))
if rc == OniStatus.ONI_STATUS_OK:
logfile = buf.value
else:
logfile = None
raise OpenNIError(res, msg.strip(), logfile)
return res
return wrapper
def test_api_call(self, mock_get_info):
ng.get_data('v-sweep')
# mock_get_info.assert_called_once_with(create_string_buffer(b'v-sweep'))
assert mock_get_info.called
ng.reset()
def check_for_error(error_code):
from nidaqmx._lib import lib_importer
if error_code < 0:
error_buffer = ctypes.create_string_buffer(2048)
cfunc = lib_importer.windll.DAQmxGetExtendedErrorInfo
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [ctypes.c_char_p, ctypes.c_uint]
cfunc(error_buffer, 2048)
raise DaqError(error_buffer.value.decode("utf-8"), error_code)
elif error_code > 0:
error_buffer = ctypes.create_string_buffer(2048)
cfunc = lib_importer.windll.DAQmxGetErrorString
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [ctypes.c_int, ctypes.c_char_p,
ctypes.c_uint]
cfunc(error_code, error_buffer, 2048)
warnings.warn(DaqWarning(
error_buffer.value.decode("utf-8"), error_code))
def ai_input_srcs(self):
"""
List[str]: Indicates the list of input sources supported by the
channel. Channels may support using the signal from the I/O
connector or one of several calibration signals.
"""
cfunc = lib_importer.windll.DAQmxGetPhysicalChanAIInputSrcs
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def teds_version_letter(self):
"""
str: Indicates the version letter of the sensor.
"""
cfunc = lib_importer.windll.DAQmxGetPhysicalChanTEDSVersionLetter
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return val.value.decode('ascii')
def expir_trig_dig_edge_src(self):
"""
str: Specifies the name of a terminal where a digital signal
exists to use as the source of the Expiration Trigger.
"""
cfunc = lib_importer.windll.DAQmxGetDigEdgeWatchdogExpirTrigSrc
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return val.value.decode('ascii')
def name(self):
"""
str: Indicates the name of the task.
"""
cfunc = lib_importer.windll.DAQmxGetTaskName
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return val.value.decode('ascii')
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevAOPhysicalChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevCIPhysicalChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevCOPhysicalChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevDILines
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevDOLines
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevDOPorts
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def device_names(self):
"""
List[str]: Indicates the names of all devices on this device
collection.
"""
cfunc = lib_importer.windll.DAQmxGetSysDevNames
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def global_channel_names(self):
"""
List[str]: The names of all the global channels on this
collection.
"""
cfunc = lib_importer.windll.DAQmxGetSysGlobalChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def scale_names(self):
"""
List[str]: Indicates the names of all the custom scales on this
collection.
"""
cfunc = lib_importer.windll.DAQmxGetSysScales
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def task_names(self):
"""
List[str]: Indicates the names of all the tasks on this collection.
"""
cfunc = lib_importer.windll.DAQmxGetSysTasks
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def chassis_module_devices(self):
"""
List[:class:`nidaqmx.system.device.Device`]: Indicates a list
containing the names of the modules in the chassis.
"""
cfunc = lib_importer.windll.DAQmxGetDevChassisModuleDevNames
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return [Device(v)
for v in unflatten_channel_string(val.value.decode('ascii'))]
def compact_daq_chassis_device(self):
"""
:class:`nidaqmx.system.device.Device`: Indicates the name of the
CompactDAQ chassis that contains this module.
"""
cfunc = lib_importer.windll.DAQmxGetDevCompactDAQChassisDevName
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return Device(val.value.decode('ascii'))
def product_type(self):
"""
str: Indicates the product name of the device.
"""
cfunc = lib_importer.windll.DAQmxGetDevProductType
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return val.value.decode('ascii')
def tcpip_ethernet_ip(self):
"""
str: Indicates the IPv4 address of the Ethernet interface in
dotted decimal format. This property returns 0.0.0.0 if the
Ethernet interface cannot acquire an address.
"""
cfunc = lib_importer.windll.DAQmxGetDevTCPIPEthernetIP
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return val.value.decode('ascii')