def user(pid):
"""
Provides the user a process is running under.
:param int pid: process id of the process to be queried
:returns: **str** with the username a process is running under, **None** if
it can't be determined
"""
if not isinstance(pid, int) or pid < 0:
return None
if stem.util.proc.is_available():
try:
import pwd # only available on unix platforms
uid = stem.util.proc.uid(pid)
if uid and uid.isdigit():
return pwd.getpwuid(int(uid)).pw_name
except:
pass
if is_available('ps'):
results = call('ps -o user %s' % pid, [])
if len(results) >= 2:
return results[1].strip()
return None
python类util()的实例源码
def start_time(pid):
"""
Provides the unix timestamp when the given process started.
:param int pid: process id of the process to be queried
:returns: **float** for the unix timestamp when the process began, **None**
if it can't be determined
"""
if not isinstance(pid, int) or pid < 0:
return None
if stem.util.proc.is_available():
try:
return float(stem.util.proc.stats(pid, stem.util.proc.Stat.START_TIME)[0])
except IOError:
pass
try:
ps_results = call('ps -p %s -o etime' % pid, [])
if len(ps_results) >= 2:
etime = ps_results[1].strip()
return time.time() - stem.util.str_tools.parse_short_time_label(etime)
except:
pass
return None
def set_process_name(process_name):
"""
Renames our current process from "python <args>" to a custom name. This is
best-effort, not necessarily working on all platforms.
**Note:** This might have issues on FreeBSD (:trac:`9804`).
:param str process_name: new name for our process
"""
# This is mostly based on...
#
# http://www.rhinocerus.net/forum/lang-python/569677-setting-program-name-like-0-perl.html#post2272369
#
# ... and an adaptation by Jake...
#
# https://github.com/ioerror/chameleon
#
# A cleaner implementation is available at...
#
# https://github.com/cream/libs/blob/b38970e2a6f6d2620724c828808235be0445b799/cream/util/procname.py
#
# but I'm not quite clear on their implementation, and it only does targeted
# argument replacement (ie, replace argv[0], argv[1], etc but with a string
# the same size).
_set_argv(process_name)
if platform.system() == 'Linux':
_set_prctl_name(process_name)
elif platform.system() in ('Darwin', 'FreeBSD', 'OpenBSD'):
_set_proc_title(process_name)
def _set_prctl_name(process_name):
"""
Sets the prctl name, which is used by top and killall. This appears to be
Linux specific and has the max of 15 characters.
This is from...
http://stackoverflow.com/questions/564695/is-there-a-way-to-change-effective-process-name-in-python/923034#923034
"""
libc = ctypes.CDLL(ctypes.util.find_library('c'))
name_buffer = ctypes.create_string_buffer(len(process_name) + 1)
name_buffer.value = stem.util.str_tools._to_bytes(process_name)
libc.prctl(PR_SET_NAME, ctypes.byref(name_buffer), 0, 0, 0)
def mprotect_libc(addr, size, flags):
libc = ctypes.CDLL(ctypes.util.find_library('libc'), use_errno=True)
libc.mprotect.argtypes = [c_void_p, c_size_t, c_int]
libc.mprotect.restype = c_int
addr_align = addr & ~(PAGE_SIZE - 1)
mem_end = (addr + size) & ~(PAGE_SIZE - 1)
if (addr + size) > mem_end:
mem_end += PAGE_SIZE
memlen = mem_end - addr_align
ret = libc.mprotect(addr_align, memlen, flags)
if ret == -1:
e = ctypes.get_errno()
raise OSError(e, errno.errorcodes[e], os.strerror(e))
def __init__(self, stream=None):
super(WindowsColorStreamHandler, self).__init__(stream)
# get file handle for the stream
import ctypes, ctypes.util
crtname = ctypes.util.find_msvcrt()
crtlib = ctypes.cdll.LoadLibrary(crtname)
self._outhdl = crtlib._get_osfhandle(self.stream.fileno())
def loadOpenSSL():
global OpenSSL
from os import path, environ
from ctypes.util import find_library
libdir = []
if getattr(sys,'frozen', None):
if 'darwin' in sys.platform:
libdir.extend([
path.join(environ['RESOURCEPATH'], '..', 'Frameworks','libcrypto.dylib'),
path.join(environ['RESOURCEPATH'], '..', 'Frameworks','libcrypto.1.0.0.dylib')
])
elif 'win32' in sys.platform or 'win64' in sys.platform:
libdir.append(path.join(sys._MEIPASS, 'libeay32.dll'))
else:
libdir.extend([
path.join(sys._MEIPASS, 'libcrypto.so'),
path.join(sys._MEIPASS, 'libssl.so'),
path.join(sys._MEIPASS, 'libcrypto.so.1.0.0'),
path.join(sys._MEIPASS, 'libssl.so.1.0.0'),
])
if 'darwin' in sys.platform:
libdir.extend(['libcrypto.dylib', '/usr/local/opt/openssl/lib/libcrypto.dylib'])
elif 'win32' in sys.platform or 'win64' in sys.platform:
libdir.append('libeay32.dll')
else:
libdir.append('libcrypto.so')
libdir.append('libssl.so')
if 'linux' in sys.platform or 'darwin' in sys.platform or 'freebsd' in sys.platform:
libdir.append(find_library('ssl'))
elif 'win32' in sys.platform or 'win64' in sys.platform:
libdir.append(find_library('libeay32'))
for library in libdir:
try:
OpenSSL = _OpenSSL(library)
return
except:
pass
raise Exception("Couldn't find and load the OpenSSL library. You must install it.")
def _init_lib():
if hasattr(_init_lib, '_dll'):
return _init_lib._dll
import ctypes.util
lib = ctypes.cdll.LoadLibrary(ctypes.util.find_library('gtk-3'))
_init_lib._dll = lib
lib.gtk_widget_get_toplevel.restype = LPCVOID
lib.gtk_widget_get_toplevel.argtypes = [LPCVOID]
lib.gtk_init.argtypes = [LPCVOID, LPCVOID]
lib.gtk_widget_get_toplevel.argtypes = [LPCVOID]
lib.gtk_widget_hide.argtypes = [LPCVOID]
lib.gtk_window_iconify.argtypes = [LPCVOID]
lib.gtk_window_maximize.argtypes = [LPCVOID]
lib.gtk_window_present.argtypes = [LPCVOID]
lib.gtk_window_close.argtypes = [LPCVOID]
lib.gtk_window_get_title.argtypes = [LPCVOID]
lib.gtk_window_set_title.argtypes = [LPCVOID, LPCSTR]
lib.gtk_main.argtypes = []
lib.gtk_main_quit.argtypes = []
lib.gtk_init(None, None)
return lib
#
def init(self):
assert ctypes
try_libc_name = 'c'
if sys.platform.startswith('freebsd'):
try_libc_name = 'inotify'
libc_name = None
try:
libc_name = ctypes.util.find_library(try_libc_name)
except (OSError, IOError):
pass # Will attemp to load it with None anyway.
self._libc = ctypes.CDLL(libc_name, use_errno=True)
self._get_errno_func = ctypes.get_errno
# Eventually check that libc has needed inotify bindings.
if (not hasattr(self._libc, 'inotify_init') or
not hasattr(self._libc, 'inotify_add_watch') or
not hasattr(self._libc, 'inotify_rm_watch')):
return False
self._libc.inotify_init.argtypes = []
self._libc.inotify_init.restype = ctypes.c_int
self._libc.inotify_add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p,
ctypes.c_uint32]
self._libc.inotify_add_watch.restype = ctypes.c_int
self._libc.inotify_rm_watch.argtypes = [ctypes.c_int, ctypes.c_int]
self._libc.inotify_rm_watch.restype = ctypes.c_int
return True
def init(self):
assert ctypes
try_libc_name = 'c'
if sys.platform.startswith('freebsd'):
try_libc_name = 'inotify'
libc_name = None
try:
libc_name = ctypes.util.find_library(try_libc_name)
except (OSError, IOError):
pass # Will attemp to load it with None anyway.
self._libc = ctypes.CDLL(libc_name, use_errno=True)
self._get_errno_func = ctypes.get_errno
# Eventually check that libc has needed inotify bindings.
if (not hasattr(self._libc, 'inotify_init') or
not hasattr(self._libc, 'inotify_add_watch') or
not hasattr(self._libc, 'inotify_rm_watch')):
return False
self._libc.inotify_init.argtypes = []
self._libc.inotify_init.restype = ctypes.c_int
self._libc.inotify_add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p,
ctypes.c_uint32]
self._libc.inotify_add_watch.restype = ctypes.c_int
self._libc.inotify_rm_watch.argtypes = [ctypes.c_int, ctypes.c_int]
self._libc.inotify_rm_watch.restype = ctypes.c_int
return True
def test_ctypes_CDLL_c(pyi_builder):
# Make sure we are able to load the MSVCRXX.DLL resp. libc.so we are
# currently bound. This is some of a no-brainer since the resp. dll/so
# is collected anyway.
pyi_builder.test_source(
"""
import ctypes, ctypes.util
lib = ctypes.CDLL(ctypes.util.find_library('c'))
assert lib is not None
""")
def _fallocate():
libc_name = ctypes.util.find_library('c')
libc = ctypes.CDLL(libc_name)
raw_fallocate = libc.fallocate
raw_fallocate.restype = ctypes.c_int
raw_fallocate.argtypes = [
ctypes.c_int, ctypes.c_int, ctypes.c_int64, ctypes.c_int64]
def fallocate(fd, offs, size, mode=FALLOC_FL_KEEP_SIZE):
ret = raw_fallocate(fd, mode, offs, size)
if ret != 0:
raise IOError(ctypes.get_errno())
return fallocate
def find_library():
if sys.platform == 'win32':
raise Exception('Cannot auto-load opus on Windows, please specify full library path')
return ctypes.util.find_library('opus')
def make_legacy_pyc(source):
"""Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location.
The choice of .pyc or .pyo extension is done based on the __debug__ flag
value.
:param source: The file system path to the source file. The source file
does not need to exist, however the PEP 3147 pyc file must exist.
:return: The file system path to the legacy pyc file.
"""
pyc_file = importlib.util.cache_from_source(source)
up_one = os.path.dirname(os.path.abspath(source))
legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o'))
os.rename(pyc_file, legacy_pyc)
return legacy_pyc
def locate_library (candidates, find_library=ctypes.util.find_library):
"""Tries to locate a library listed in candidates using the given
find_library() function (or ctypes.util.find_library).
Returns the first library found, which can be the library's name
or the path to the library file, depending on find_library().
Returns None if no library is found.
arguments:
* candidates -- iterable with library names
* find_library -- function that takes one positional arg (candidate)
and returns a non-empty str if a library has been found.
Any "false" value (None,False,empty str) is interpreted
as "library not found".
Defaults to ctypes.util.find_library if not given or
None.
"""
if find_library is None:
find_library = ctypes.util.find_library
use_dll_workaround = (
sys.platform == 'win32' and find_library is ctypes.util.find_library
)
for candidate in candidates:
# Workaround for CPython 3.3 issue#16283 / pyusb #14
if use_dll_workaround:
candidate += '.dll'
libname = find_library(candidate)
if libname:
return libname
# -- end for
return None
def make_legacy_pyc(source):
"""Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location.
The choice of .pyc or .pyo extension is done based on the __debug__ flag
value.
:param source: The file system path to the source file. The source file
does not need to exist, however the PEP 3147 pyc file must exist.
:return: The file system path to the legacy pyc file.
"""
pyc_file = importlib.util.cache_from_source(source)
up_one = os.path.dirname(os.path.abspath(source))
legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o'))
os.rename(pyc_file, legacy_pyc)
return legacy_pyc
def __init__(self, stream=None):
logging.StreamHandler.__init__(self, stream)
# get file handle for the stream
import ctypes.util
# for some reason find_msvcrt() sometimes doesn't find msvcrt.dll on my system?
crtname = ctypes.util.find_msvcrt()
if not crtname:
crtname = ctypes.util.find_library("msvcrt")
crtlib = ctypes.cdll.LoadLibrary(crtname)
self._outhdl = crtlib._get_osfhandle(self.stream.fileno())
def setup_readline():
"""Sets up the readline module and completion supression, if available."""
global RL_COMPLETION_SUPPRESS_APPEND, RL_LIB, RL_CAN_RESIZE
if RL_COMPLETION_SUPPRESS_APPEND is not None:
return
try:
import readline
except ImportError:
return
import ctypes
import ctypes.util
readline.set_completer_delims(' \t\n')
if not readline.__file__.endswith('.py'):
RL_LIB = lib = ctypes.cdll.LoadLibrary(readline.__file__)
try:
RL_COMPLETION_SUPPRESS_APPEND = ctypes.c_int.in_dll(
lib, 'rl_completion_suppress_append')
except ValueError:
# not all versions of readline have this symbol, ie Macs sometimes
RL_COMPLETION_SUPPRESS_APPEND = None
RL_CAN_RESIZE = hasattr(lib, 'rl_reset_screen_size')
env = builtins.__xonsh_env__
# reads in history
readline.set_history_length(-1)
ReadlineHistoryAdder()
# sets up IPython-like history matching with up and down
readline.parse_and_bind('"\e[B": history-search-forward')
readline.parse_and_bind('"\e[A": history-search-backward')
# Setup Shift-Tab to indent
readline.parse_and_bind('"\e[Z": "{0}"'.format(env.get('INDENT')))
# handle tab completion differences found in libedit readline compatibility
# as discussed at http://stackoverflow.com/a/7116997
if readline.__doc__ and 'libedit' in readline.__doc__:
readline.parse_and_bind("bind ^I rl_complete")
else:
readline.parse_and_bind("tab: complete")