def patchable_builtin(klass):
# It's important to create variables here, we want those objects alive
# within this whole scope.
name = klass.__name__
target = klass.__dict__
# Hardcore introspection to find the `PyProxyDict` object that contains the
# precious `dict` attribute.
proxy_dict = SlotsProxy.from_address(id(target))
namespace = {}
# This is the way I found to `cast` this `proxy_dict.dict` into a python
# object, cause the `from_address()` function returns the `py_object`
# version
ctypes.pythonapi.PyDict_SetItem(
ctypes.py_object(namespace),
ctypes.py_object(name),
proxy_dict.dict,
)
return namespace[name]
python类pythonapi()的实例源码
def setugid(user):
"""Change process user and group ID
Argument is a numeric user id or a user name"""
try:
from pwd import getpwuid
passwd = getpwuid(int(user))
except ValueError:
from pwd import getpwnam
passwd = getpwnam(user)
if hasattr(os, 'initgroups'): # python >= 2.7
os.initgroups(passwd.pw_name, passwd.pw_gid)
else:
import ctypes
if ctypes.CDLL(None).initgroups(passwd.pw_name, passwd.pw_gid) < 0:
err = ctypes.c_int.in_dll(ctypes.pythonapi,"errno").value
raise OSError(err, os.strerror(err), 'initgroups')
os.setgid(passwd.pw_gid)
os.setuid(passwd.pw_uid)
os.environ['HOME'] = passwd.pw_dir
def setugid(user):
"""Change process user and group ID
Argument is a numeric user id or a user name"""
try:
from pwd import getpwuid
passwd = getpwuid(int(user))
except ValueError:
from pwd import getpwnam
passwd = getpwnam(user)
if hasattr(os, 'initgroups'): # python >= 2.7
os.initgroups(passwd.pw_name, passwd.pw_gid)
else:
import ctypes
if ctypes.CDLL(None).initgroups(passwd.pw_name, passwd.pw_gid) < 0:
err = ctypes.c_int.in_dll(ctypes.pythonapi,"errno").value
raise OSError(err, os.strerror(err), 'initgroups')
os.setgid(passwd.pw_gid)
os.setuid(passwd.pw_uid)
os.environ['HOME'] = passwd.pw_dir
def setugid(user):
"""Change process user and group ID
Argument is a numeric user id or a user name"""
try:
from pwd import getpwuid
passwd = getpwuid(int(user))
except ValueError:
from pwd import getpwnam
passwd = getpwnam(user)
if hasattr(os, 'initgroups'): # python >= 2.7
os.initgroups(passwd.pw_name, passwd.pw_gid)
else:
import ctypes
if ctypes.CDLL(None).initgroups(passwd.pw_name, passwd.pw_gid) < 0:
err = ctypes.c_int.in_dll(ctypes.pythonapi,"errno").value
raise OSError(err, os.strerror(err), 'initgroups')
os.setgid(passwd.pw_gid)
os.setuid(passwd.pw_uid)
os.environ['HOME'] = passwd.pw_dir
def setugid(user):
"""Change process user and group ID
Argument is a numeric user id or a user name"""
try:
from pwd import getpwuid
passwd = getpwuid(int(user))
except ValueError:
from pwd import getpwnam
passwd = getpwnam(user)
if hasattr(os, 'initgroups'): # python >= 2.7
os.initgroups(passwd.pw_name, passwd.pw_gid)
else:
import ctypes
if ctypes.CDLL(None).initgroups(passwd.pw_name, passwd.pw_gid) < 0:
err = ctypes.c_int.in_dll(ctypes.pythonapi,"errno").value
raise OSError(err, os.strerror(err), 'initgroups')
os.setgid(passwd.pw_gid)
os.setuid(passwd.pw_uid)
os.environ['HOME'] = passwd.pw_dir
def get_pyos_inputhook(self):
"""DEPRECATED since IPython 5.0
Return the current PyOS_InputHook as a ctypes.c_void_p."""
warn("`get_pyos_inputhook` is deprecated since IPython 5.0 and will be removed in future versions.",
DeprecationWarning, stacklevel=2)
return ctypes.c_void_p.in_dll(ctypes.pythonapi,"PyOS_InputHook")
def get_pyos_inputhook_as_func(self):
"""DEPRECATED since IPython 5.0
Return the current PyOS_InputHook as a ctypes.PYFUNCYPE."""
warn("`get_pyos_inputhook_as_func` is deprecated since IPython 5.0 and will be removed in future versions.",
DeprecationWarning, stacklevel=2)
return self.PYFUNC.in_dll(ctypes.pythonapi,"PyOS_InputHook")
def get_pyos_inputhook(self):
"""DEPRECATED since IPython 5.0
Return the current PyOS_InputHook as a ctypes.c_void_p."""
warn("`get_pyos_inputhook` is deprecated since IPython 5.0 and will be removed in future versions.",
DeprecationWarning, stacklevel=2)
return ctypes.c_void_p.in_dll(ctypes.pythonapi,"PyOS_InputHook")
def get_pyos_inputhook_as_func(self):
"""DEPRECATED since IPython 5.0
Return the current PyOS_InputHook as a ctypes.PYFUNCYPE."""
warn("`get_pyos_inputhook_as_func` is deprecated since IPython 5.0 and will be removed in future versions.",
DeprecationWarning, stacklevel=2)
return self.PYFUNC.in_dll(ctypes.pythonapi,"PyOS_InputHook")
def copy_function_with_closure(old_func, updated_module,
updated_wrapped_func):
"""Copies a function, updating it's globals to point to updated_module.
This works for singly-wrapped function (ie, closure has len 1).
"""
assert old_func.__closure__ and len(old_func.__closure__) == 1
cell = make_cell()
PyCell_Set = ctypes.pythonapi.PyCell_Set
# ctypes.pythonapi functions need to have argtypes and restype set manually
PyCell_Set.argtypes = (ctypes.py_object, ctypes.py_object)
# restype actually defaults to c_int here, but we might as well be explicit
PyCell_Set.restype = ctypes.c_int
PyCell_Set(cell, updated_wrapped_func)
new_closure = (cell,)
new_func = types.FunctionType(old_func.__code__, updated_module.__dict__,
name=old_func.__name__,
argdefs=old_func.__defaults__,
closure=new_closure)
new_func.__dict__.update(old_func.__dict__)
new_func.__module__ = updated_module.__name__
return new_func
def setUp(self):
self.mox = mox.Mox()
self.mox.StubOutWithMock(ctypes.pythonapi, 'PyThreadState_SetAsyncExc')
self.request_state = request_state.RequestState('id')
def test_inject_exception(self):
ctypes.pythonapi.PyThreadState_SetAsyncExc(
CtypesComparator(ctypes.c_long(threading.current_thread().ident)),
CtypesComparator(ctypes.py_object(Exception)))
self.mox.ReplayAll()
self.request_state.inject_exception(Exception)
self.mox.VerifyAll()
def get_pyos_inputhook(self):
"""DEPRECATED since IPython 5.0
Return the current PyOS_InputHook as a ctypes.c_void_p."""
warn("`get_pyos_inputhook` is deprecated since IPython 5.0 and will be removed in future versions.",
DeprecationWarning, stacklevel=2)
return ctypes.c_void_p.in_dll(ctypes.pythonapi,"PyOS_InputHook")
def get_pyos_inputhook_as_func(self):
"""DEPRECATED since IPython 5.0
Return the current PyOS_InputHook as a ctypes.PYFUNCYPE."""
warn("`get_pyos_inputhook_as_func` is deprecated since IPython 5.0 and will be removed in future versions.",
DeprecationWarning, stacklevel=2)
return self.PYFUNC.in_dll(ctypes.pythonapi,"PyOS_InputHook")
def get_pyos_inputhook(self):
"""DEPRECATED since IPython 5.0
Return the current PyOS_InputHook as a ctypes.c_void_p."""
warn("`get_pyos_inputhook` is deprecated since IPython 5.0 and will be removed in future versions.",
DeprecationWarning, stacklevel=2)
return ctypes.c_void_p.in_dll(ctypes.pythonapi,"PyOS_InputHook")
def get_pyos_inputhook_as_func(self):
"""DEPRECATED since IPython 5.0
Return the current PyOS_InputHook as a ctypes.PYFUNCYPE."""
warn("`get_pyos_inputhook_as_func` is deprecated since IPython 5.0 and will be removed in future versions.",
DeprecationWarning, stacklevel=2)
return self.PYFUNC.in_dll(ctypes.pythonapi,"PyOS_InputHook")
def _processInput(self, input_data):
pyos_inputhook_ptr = ctypes.c_void_p.in_dll(
ctypes.pythonapi, "PyOS_InputHook")
old_pyos_inputhook_ptr = pyos_inputhook_ptr.value
pyos_inputhook_ptr.value = ctypes.c_void_p(None).value
ret = BaseDoor._processInput(self, input_data)
pyos_inputhook_ptr.value = old_pyos_inputhook_ptr
return ret
def __init__(self, py_file):
"""
Takes a python file handle and looks up the underlying FILE *
The purpose of the CFILE class is to be able to use python
file handles when calling C functions which expect a FILE
pointer. A CFILE instance should be created based on the
Python file handle, and that should be passed to the function
expecting a FILE pointer.
The implementation is based on the ctypes object
pythonapi which is ctypes wrapping of the CPython api.
C-function:
void fprintf_hello(FILE * stream , const char * msg);
Python wrapper:
lib = clib.load( "lib.so" )
fprintf_hello = Prototype(lib, "void fprintf_hello( FILE , char* )")
Python use:
py_fileH = open("file.txt" , "w")
fprintf_hello( CFILE( py_fileH ) , "Message ...")
py_fileH.close()
If the supplied argument is not of type py_file the function
will raise a TypeException.
Examples: ecl.ecl.ecl_kw.EclKW.fprintf_grdecl()
"""
c_ptr = self._as_file(py_file)
try:
super(CFILE, self).__init__(c_ptr)
except ValueError as e:
raise TypeError("Sorry - the supplied argument is not a valid Python file handle!")
self.py_file = py_file
def _init_ugly_crap():
"""This function implements a few ugly things so that we can patch the
traceback objects. The function returned allows resetting `tb_next` on
any python traceback object. Do not attempt to use this on non cpython
interpreters
"""
import ctypes
from types import TracebackType
# figure out side of _Py_ssize_t
if hasattr(ctypes.pythonapi, 'Py_InitModule4_64'):
_Py_ssize_t = ctypes.c_int64
else:
_Py_ssize_t = ctypes.c_int
# regular python
class _PyObject(ctypes.Structure):
pass
_PyObject._fields_ = [
('ob_refcnt', _Py_ssize_t),
('ob_type', ctypes.POINTER(_PyObject))
]
# python with trace
if hasattr(sys, 'getobjects'):
class _PyObject(ctypes.Structure):
pass
_PyObject._fields_ = [
('_ob_next', ctypes.POINTER(_PyObject)),
('_ob_prev', ctypes.POINTER(_PyObject)),
('ob_refcnt', _Py_ssize_t),
('ob_type', ctypes.POINTER(_PyObject))
]
class _Traceback(_PyObject):
pass
_Traceback._fields_ = [
('tb_next', ctypes.POINTER(_Traceback)),
('tb_frame', ctypes.POINTER(_PyObject)),
('tb_lasti', ctypes.c_int),
('tb_lineno', ctypes.c_int)
]
def tb_set_next(tb, next):
"""Set the tb_next attribute of a traceback object."""
if not (isinstance(tb, TracebackType) and
(next is None or isinstance(next, TracebackType))):
raise TypeError('tb_set_next arguments must be traceback objects')
obj = _Traceback.from_address(id(tb))
if tb.tb_next is not None:
old = _Traceback.from_address(id(tb.tb_next))
old.ob_refcnt -= 1
if next is None:
obj.tb_next = ctypes.POINTER(_Traceback)()
else:
next = _Traceback.from_address(id(next))
next.ob_refcnt += 1
obj.tb_next = ctypes.pointer(next)
return tb_set_next
def test_from_format(self):
support.import_module('ctypes')
from ctypes import pythonapi, py_object, c_int
#if sys.maxunicode == 65535:
# name = "PyUnicodeUCS2_FromFormat"
#else:
# name = "PyUnicodeUCS4_FromFormat"
_PyUnicode_FromFormat = getattr(pythonapi, 'PyUnicode_FromFormat')
_PyUnicode_FromFormat.restype = py_object
def PyUnicode_FromFormat(format, *args):
cargs = tuple(
py_object(arg) if isinstance(arg, str) else arg
for arg in args)
return _PyUnicode_FromFormat(format, *cargs)
# ascii format, non-ascii argument
text = PyUnicode_FromFormat(b'ascii\x7f=%U', 'unicode\xe9')
self.assertEqual(text, 'ascii\x7f=unicode\xe9')
# non-ascii format, ascii argument: ensure that PyUnicode_FromFormatV()
# raises an error
self.assertRaisesRegex(ValueError,
'^PyUnicode_FromFormatV\(\) expects an ASCII-encoded format '
'string, got a non-ASCII byte: 0xe9$',
PyUnicode_FromFormat, b'unicode\xe9=%s', 'ascii')
self.assertEqual(PyUnicode_FromFormat(b'%c', c_int(0xabcd)), '\uabcd')
self.assertEqual(PyUnicode_FromFormat(b'%c', c_int(0x10ffff)), '\U0010ffff')
# other tests
text = PyUnicode_FromFormat(b'%%A:%A', 'abc\xe9\uabcd\U0010ffff')
self.assertEqual(text, r"%A:'abc\xe9\uabcd\U0010ffff'")
text = PyUnicode_FromFormat(b'repr=%V', 'abc', b'xyz')
self.assertEqual(text, 'repr=abc')
# Test string decode from parameter of %s using utf-8.
# b'\xe4\xba\xba\xe6\xb0\x91' is utf-8 encoded byte sequence of
# '\u4eba\u6c11'
text = PyUnicode_FromFormat(b'repr=%V', None, b'\xe4\xba\xba\xe6\xb0\x91')
self.assertEqual(text, 'repr=\u4eba\u6c11')
#Test replace error handler.
text = PyUnicode_FromFormat(b'repr=%V', None, b'abc\xff')
self.assertEqual(text, 'repr=abc\ufffd')
# Test PyUnicode_AsWideChar()
def _init_ugly_crap():
"""This function implements a few ugly things so that we can patch the
traceback objects. The function returned allows resetting `tb_next` on
any python traceback object. Do not attempt to use this on non cpython
interpreters
"""
import ctypes
from types import TracebackType
# figure out side of _Py_ssize_t
if hasattr(ctypes.pythonapi, 'Py_InitModule4_64'):
_Py_ssize_t = ctypes.c_int64
else:
_Py_ssize_t = ctypes.c_int
# regular python
class _PyObject(ctypes.Structure):
pass
_PyObject._fields_ = [
('ob_refcnt', _Py_ssize_t),
('ob_type', ctypes.POINTER(_PyObject))
]
# python with trace
if hasattr(sys, 'getobjects'):
class _PyObject(ctypes.Structure):
pass
_PyObject._fields_ = [
('_ob_next', ctypes.POINTER(_PyObject)),
('_ob_prev', ctypes.POINTER(_PyObject)),
('ob_refcnt', _Py_ssize_t),
('ob_type', ctypes.POINTER(_PyObject))
]
class _Traceback(_PyObject):
pass
_Traceback._fields_ = [
('tb_next', ctypes.POINTER(_Traceback)),
('tb_frame', ctypes.POINTER(_PyObject)),
('tb_lasti', ctypes.c_int),
('tb_lineno', ctypes.c_int)
]
def tb_set_next(tb, next):
"""Set the tb_next attribute of a traceback object."""
if not (isinstance(tb, TracebackType) and
(next is None or isinstance(next, TracebackType))):
raise TypeError('tb_set_next arguments must be traceback objects')
obj = _Traceback.from_address(id(tb))
if tb.tb_next is not None:
old = _Traceback.from_address(id(tb.tb_next))
old.ob_refcnt -= 1
if next is None:
obj.tb_next = ctypes.POINTER(_Traceback)()
else:
next = _Traceback.from_address(id(next))
next.ob_refcnt += 1
obj.tb_next = ctypes.pointer(next)
return tb_set_next