def common_mode_range_error_chans(self):
"""
List[str]: Indicates a list of names of any virtual channels in
the task for which the device(s) detected a common mode
range violation. You must read
**common_mode_range_error_chans_exist** before you read this
property. Otherwise, you will receive an error.
"""
cfunc = lib_importer.windll.DAQmxGetReadCommonModeRangeErrorChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 2048
val = ctypes.create_string_buffer(temp_size)
error_code = cfunc(
self._handle, val, temp_size)
check_for_error(error_code)
return unflatten_channel_string(val.value.decode('ascii'))
python类c_char_p()的实例源码
def gcsCommand(self, commandAsString):
self._lib.PI_GcsCommandset.argtypes= [c_int, c_char_p]
self._lib.PI_GcsGetAnswer.argtypes= [c_int, c_char_p, c_int]
self._convertErrorToException(
self._lib.PI_GcsCommandset(self._id, commandAsString))
self._trickToCheckForSyntaxError()
retSize= c_int()
res= ''
self._convertErrorToException(
self._lib.PI_GcsGetAnswerSize(self._id, ctypes.byref(retSize)))
while retSize.value != 0:
buf= ctypes.create_string_buffer('\000', retSize.value)
self._convertErrorToException(
self._lib.PI_GcsGetAnswer(self._id, buf, retSize.value))
res+= buf.value
self._convertErrorToException(
self._lib.PI_GcsGetAnswerSize(self._id, ctypes.byref(retSize)))
return res
def getVolatileMemoryParameters(self, itemId, parameterId):
self._lib.PI_qSPA.argtypes= [c_int,
c_char_p,
CUnsignedIntArray,
CDoubleArray,
c_char_p,
c_int]
retValue= CDoubleArray([0.])
bufSize= 256
retString= ctypes.create_string_buffer('\000', bufSize)
self._convertErrorToException(
self._lib.PI_qSPA(
self._id, str(itemId),
CUnsignedIntArray([parameterId]),
retValue,
retString,
bufSize))
return retValue.toNumpyArray()
def getDataRecorderConfiguration(self):
nRecorders= self.getNumberOfRecorderTables()
sourceBufSize= 256
source= ctypes.create_string_buffer('\000', sourceBufSize)
option= CIntArray(np.zeros(nRecorders, dtype=np.int32))
table=CIntArray(np.arange(1, nRecorders + 1))
self._lib.PI_qDRC.argtypes= [c_int, CIntArray, c_char_p,
CIntArray, c_int, c_int]
self._convertErrorToException(
self._lib.PI_qDRC(self._id, table, source,
option, sourceBufSize, nRecorders))
sources= [x.strip() for x in source.value.split('\n')]
cfg= DataRecorderConfiguration()
for i in range(nRecorders):
cfg.setTable(table.toNumpyArray()[i],
sources[i],
option.toNumpyArray()[i])
return cfg
def pcpfastExtractValues(result_p, vsetidx, vlistidx, dtype):
""" quicker implementation of pmExtractValue than the default provided with the pcp python bindings
this version saves converting the C indexes to python and back again
"""
inst = c_int()
outAtom = pmapi.pmAtomValue()
status = LIBPCPFAST.pcpfastExtractValues(result_p, byref(inst), byref(outAtom), vsetidx, vlistidx, dtype)
if status < 0:
raise pmapi.pmErr(status)
if dtype == c_api.PM_TYPE_STRING:
# Get pointer to C string
c_str = c_char_p()
memmove(byref(c_str), addressof(outAtom) + pmapi.pmAtomValue.cp.offset, sizeof(c_char_p))
# Convert to a python string and have result point to it
outAtom.cp = outAtom.cp
# Free the C string
LIBC.free(c_str)
return outAtom.dref(dtype), inst.value
def have_compatible_glibc(major, minimum_minor):
# ctypes.CDLL(None) internally calls dlopen(NULL), and as the dlopen
# manpage says, "If filename is NULL, then the returned handle is for the
# main program". This way we can let the linker do the work to figure out
# which libc our process is actually using.
process_namespace = ctypes.CDLL(None)
try:
gnu_get_libc_version = process_namespace.gnu_get_libc_version
except AttributeError:
# Symbol doesn't exist -> therefore, we are not linked to
# glibc.
return False
# Call gnu_get_libc_version, which returns a string like "2.5".
gnu_get_libc_version.restype = ctypes.c_char_p
version_str = gnu_get_libc_version()
# py2 / py3 compatibility:
if not isinstance(version_str, str):
version_str = version_str.decode("ascii")
return check_glibc_version(version_str, major, minimum_minor)
def have_compatible_glibc(major, minimum_minor):
# ctypes.CDLL(None) internally calls dlopen(NULL), and as the dlopen
# manpage says, "If filename is NULL, then the returned handle is for the
# main program". This way we can let the linker do the work to figure out
# which libc our process is actually using.
process_namespace = ctypes.CDLL(None)
try:
gnu_get_libc_version = process_namespace.gnu_get_libc_version
except AttributeError:
# Symbol doesn't exist -> therefore, we are not linked to
# glibc.
return False
# Call gnu_get_libc_version, which returns a string like "2.5".
gnu_get_libc_version.restype = ctypes.c_char_p
version_str = gnu_get_libc_version()
# py2 / py3 compatibility:
if not isinstance(version_str, str):
version_str = version_str.decode("ascii")
return check_glibc_version(version_str, major, minimum_minor)
def have_compatible_glibc(major, minimum_minor):
# ctypes.CDLL(None) internally calls dlopen(NULL), and as the dlopen
# manpage says, "If filename is NULL, then the returned handle is for the
# main program". This way we can let the linker do the work to figure out
# which libc our process is actually using.
process_namespace = ctypes.CDLL(None)
try:
gnu_get_libc_version = process_namespace.gnu_get_libc_version
except AttributeError:
# Symbol doesn't exist -> therefore, we are not linked to
# glibc.
return False
# Call gnu_get_libc_version, which returns a string like "2.5".
gnu_get_libc_version.restype = ctypes.c_char_p
version_str = gnu_get_libc_version()
# py2 / py3 compatibility:
if not isinstance(version_str, str):
version_str = version_str.decode("ascii")
return check_glibc_version(version_str, major, minimum_minor)
def have_compatible_glibc(major, minimum_minor):
# ctypes.CDLL(None) internally calls dlopen(NULL), and as the dlopen
# manpage says, "If filename is NULL, then the returned handle is for the
# main program". This way we can let the linker do the work to figure out
# which libc our process is actually using.
process_namespace = ctypes.CDLL(None)
try:
gnu_get_libc_version = process_namespace.gnu_get_libc_version
except AttributeError:
# Symbol doesn't exist -> therefore, we are not linked to
# glibc.
return False
# Call gnu_get_libc_version, which returns a string like "2.5".
gnu_get_libc_version.restype = ctypes.c_char_p
version_str = gnu_get_libc_version()
# py2 / py3 compatibility:
if not isinstance(version_str, str):
version_str = version_str.decode("ascii")
return check_glibc_version(version_str, major, minimum_minor)
def have_compatible_glibc(major, minimum_minor):
# ctypes.CDLL(None) internally calls dlopen(NULL), and as the dlopen
# manpage says, "If filename is NULL, then the returned handle is for the
# main program". This way we can let the linker do the work to figure out
# which libc our process is actually using.
process_namespace = ctypes.CDLL(None)
try:
gnu_get_libc_version = process_namespace.gnu_get_libc_version
except AttributeError:
# Symbol doesn't exist -> therefore, we are not linked to
# glibc.
return False
# Call gnu_get_libc_version, which returns a string like "2.5".
gnu_get_libc_version.restype = ctypes.c_char_p
version_str = gnu_get_libc_version()
# py2 / py3 compatibility:
if not isinstance(version_str, str):
version_str = version_str.decode("ascii")
return check_glibc_version(version_str, major, minimum_minor)
def hook_callback(self, *args):
adapted_args = []
for value, type in zip(args, self.original_types[1:]):
if type == ctypes.c_wchar_p:
adapted_args.append(ctypes.c_wchar_p(value))
elif type == ctypes.c_char_p:
adapted_args.append(ctypes.c_char_p((value)))
else:
adapted_args.append(value)
def real_function(*args):
if args == ():
args = adapted_args
return self.realfunction(*args)
return self.callback(*adapted_args, real_function=real_function)
# Use this tricks to prevent garbage collection of hook ?
#def __del__(self):
# pass
## New simple hook API based on winproxy
def libvlc_media_new_location(p_instance, psz_mrl):
'''Create a media with a certain given media resource location,
for instance a valid URL.
@note: To refer to a local file with this function,
the file://... URI syntax B{must} be used (see IETF RFC3986).
We recommend using L{libvlc_media_new_path}() instead when dealing with
local files.
See L{libvlc_media_release}.
@param p_instance: the instance.
@param psz_mrl: the media location.
@return: the newly created media or None on error.
'''
f = _Cfunctions.get('libvlc_media_new_location', None) or \
_Cfunction('libvlc_media_new_location', ((1,), (1,),), class_result(Media),
ctypes.c_void_p, Instance, ctypes.c_char_p)
return f(p_instance, psz_mrl)
def libvlc_media_add_option(p_md, psz_options):
'''Add an option to the media.
This option will be used to determine how the media_player will
read the media. This allows to use VLC's advanced
reading/streaming options on a per-media basis.
@note: The options are listed in 'vlc --long-help' from the command line,
e.g. "-sout-all". Keep in mind that available options and their semantics
vary across LibVLC versions and builds.
@warning: Not all options affects L{Media} objects:
Specifically, due to architectural issues most audio and video options,
such as text renderer options, have no effects on an individual media.
These options must be set through L{libvlc_new}() instead.
@param p_md: the media descriptor.
@param psz_options: the options (as a string).
'''
f = _Cfunctions.get('libvlc_media_add_option', None) or \
_Cfunction('libvlc_media_add_option', ((1,), (1,),), None,
None, Media, ctypes.c_char_p)
return f(p_md, psz_options)
def libvlc_media_discoverer_new(p_inst, psz_name):
'''Create a media discoverer object by name.
After this object is created, you should attach to events in order to be
notified of the discoverer state.
You should also attach to media_list events in order to be notified of new
items discovered.
You need to call L{libvlc_media_discoverer_start}() in order to start the
discovery.
See L{libvlc_media_discoverer_media_list}
See L{libvlc_media_discoverer_event_manager}
See L{libvlc_media_discoverer_start}.
@param p_inst: libvlc instance.
@param psz_name: service name; use L{libvlc_media_discoverer_list_get}() to get a list of the discoverer names available in this libVLC instance.
@return: media discover object or None in case of error.
@version: LibVLC 3.0.0 or later.
'''
f = _Cfunctions.get('libvlc_media_discoverer_new', None) or \
_Cfunction('libvlc_media_discoverer_new', ((1,), (1,),), class_result(MediaDiscoverer),
ctypes.c_void_p, Instance, ctypes.c_char_p)
return f(p_inst, psz_name)
def libvlc_video_set_format(mp, chroma, width, height, pitch):
'''Set decoded video chroma and dimensions.
This only works in combination with L{libvlc_video_set_callbacks}(),
and is mutually exclusive with L{libvlc_video_set_format_callbacks}().
@param mp: the media player.
@param chroma: a four-characters string identifying the chroma (e.g. "RV32" or "YUYV").
@param width: pixel width.
@param height: pixel height.
@param pitch: line pitch (in bytes).
@version: LibVLC 1.1.1 or later.
@bug: All pixel planes are expected to have the same pitch. To use the YCbCr color space with chrominance subsampling, consider using L{libvlc_video_set_format_callbacks}() instead.
'''
f = _Cfunctions.get('libvlc_video_set_format', None) or \
_Cfunction('libvlc_video_set_format', ((1,), (1,), (1,), (1,), (1,),), None,
None, MediaPlayer, ctypes.c_char_p, ctypes.c_uint, ctypes.c_uint, ctypes.c_uint)
return f(mp, chroma, width, height, pitch)
def glibc_version_string():
"Returns glibc version string, or None if not using glibc."
# ctypes.CDLL(None) internally calls dlopen(NULL), and as the dlopen
# manpage says, "If filename is NULL, then the returned handle is for the
# main program". This way we can let the linker do the work to figure out
# which libc our process is actually using.
process_namespace = ctypes.CDLL(None)
try:
gnu_get_libc_version = process_namespace.gnu_get_libc_version
except AttributeError:
# Symbol doesn't exist -> therefore, we are not linked to
# glibc.
return None
# Call gnu_get_libc_version, which returns a string like "2.5"
gnu_get_libc_version.restype = ctypes.c_char_p
version_str = gnu_get_libc_version()
# py2 / py3 compatibility:
if not isinstance(version_str, str):
version_str = version_str.decode("ascii")
return version_str
# Separated out from have_compatible_glibc for easier unit testing
def check_for_error(error_code):
from nidaqmx._lib import lib_importer
if error_code < 0:
error_buffer = ctypes.create_string_buffer(2048)
cfunc = lib_importer.windll.DAQmxGetExtendedErrorInfo
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [ctypes.c_char_p, ctypes.c_uint]
cfunc(error_buffer, 2048)
raise DaqError(error_buffer.value.decode("utf-8"), error_code)
elif error_code > 0:
error_buffer = ctypes.create_string_buffer(2048)
cfunc = lib_importer.windll.DAQmxGetErrorString
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [ctypes.c_int, ctypes.c_char_p,
ctypes.c_uint]
cfunc(error_code, error_buffer, 2048)
warnings.warn(DaqWarning(
error_buffer.value.decode("utf-8"), error_code))
def from_param(self, param):
if isinstance(param, six.text_type):
param = param.encode('ascii')
return ctypes.c_char_p(param)
def ai_input_srcs(self):
"""
List[str]: Indicates the list of input sources supported by the
channel. Channels may support using the signal from the I/O
connector or one of several calibration signals.
"""
cfunc = lib_importer.windll.DAQmxGetPhysicalChanAIInputSrcs
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def teds_version_letter(self):
"""
str: Indicates the version letter of the sensor.
"""
cfunc = lib_importer.windll.DAQmxGetPhysicalChanTEDSVersionLetter
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return val.value.decode('ascii')
def expir_trig_dig_edge_src(self):
"""
str: Specifies the name of a terminal where a digital signal
exists to use as the source of the Expiration Trigger.
"""
cfunc = lib_importer.windll.DAQmxGetDigEdgeWatchdogExpirTrigSrc
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return val.value.decode('ascii')
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevAIPhysicalChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevAOPhysicalChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevCIPhysicalChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevCOPhysicalChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevDILines
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevDIPorts
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevDOPorts
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def device_names(self):
"""
List[str]: Indicates the names of all devices on this device
collection.
"""
cfunc = lib_importer.windll.DAQmxGetSysDevNames
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
def global_channel_names(self):
"""
List[str]: The names of all the global channels on this
collection.
"""
cfunc = lib_importer.windll.DAQmxGetSysGlobalChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))