def test_send_message_check_not_present_graph(self):
"""C{send_message} checks the presence of the custom-graph script."""
uid = os.getuid()
info = pwd.getpwuid(uid)
username = info.pw_name
self.manager.dispatch_message(
{"type": "custom-graph-add",
"interpreter": "/bin/sh",
"code": "echo hi!",
"username": username,
"graph-id": 123})
filename = self.store.get_graph(123)[1]
os.unlink(filename)
self.graph_manager.exchange()
self.assertMessages(
self.broker_service.message_store.get_pending_messages(),
[{"api": b"3.2",
"data": {},
"timestamp": 0,
"type": "custom-graph"}])
python类getpwuid()的实例源码
def test_run_not_accepted_types(self):
"""
If "custom-graph" is not an accepted message-type anymore,
C{CustomGraphPlugin.run} shouldn't even run the graph scripts.
"""
self.broker_service.message_store.set_accepted_types([])
uid = os.getuid()
info = pwd.getpwuid(uid)
username = info.pw_name
self.manager.dispatch_message(
{"type": "custom-graph-add",
"interpreter": "/bin/sh",
"code": "echo 1.0",
"username": username,
"graph-id": 123})
factory = StubProcessFactory()
self.graph_manager.process_factory = factory
result = self.graph_manager.run()
self.assertEqual(len(factory.spawns), 0)
return result.addCallback(self.assertIdentical, None)
def uid_exists(uid):
"""Check if a uid exists"""
try:
pwd.getpwuid(uid)
uid_exists = True
except KeyError:
uid_exists = False
return uid_exists
def owner(path):
"""Returns a tuple containing the username & groupname owning the path.
:param str path: the string path to retrieve the ownership
:return tuple(str, str): A (username, groupname) tuple containing the
name of the user and group owning the path.
:raises OSError: if the specified path does not exist
"""
stat = os.stat(path)
username = pwd.getpwuid(stat.st_uid)[0]
groupname = grp.getgrgid(stat.st_gid)[0]
return username, groupname
def uid_exists(uid):
"""Check if a uid exists"""
try:
pwd.getpwuid(uid)
uid_exists = True
except KeyError:
uid_exists = False
return uid_exists
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
try:
g = grp.getgrgid(tarinfo.gid)[2]
except KeyError:
g = os.getgid()
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
try:
u = pwd.getpwuid(tarinfo.uid)[2]
except KeyError:
u = os.getuid()
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError, e:
raise ExtractError("could not change owner")
def expanduser(path):
"""Expand ~ and ~user constructions. If user or $HOME is unknown,
do nothing."""
if not path.startswith('~'):
return path
i = path.find('/', 1)
if i < 0:
i = len(path)
if i == 1:
if 'HOME' not in os.environ:
import pwd
userhome = pwd.getpwuid(os.getuid()).pw_dir
else:
userhome = os.environ['HOME']
else:
import pwd
try:
pwent = pwd.getpwnam(path[1:i])
except KeyError:
return path
userhome = pwent.pw_dir
userhome = userhome.rstrip('/') or userhome
return userhome + path[i:]
# Expand paths containing shell variable substitutions.
# This expands the forms $variable and ${variable} only.
# Non-existent variables are left unchanged.
def does_uid_exist(uid): # type: (int) -> bool
'''
Returns True if the given OS user id exists, False otherwise.
'''
try:
pwd.getpwuid(uid)
return True
except KeyError:
return False
def __get_username():
""" Returns the effective username of the current process. """
if sys.platform == 'win32':
return getpass.getuser()
import pwd
return pwd.getpwuid(os.geteuid()).pw_name
def _get_userdir(user=None):
"""Returns the user dir or None"""
if user is not None and not isinstance(user, fsnative):
raise TypeError
if is_win:
if "HOME" in environ:
path = environ["HOME"]
elif "USERPROFILE" in environ:
path = environ["USERPROFILE"]
elif "HOMEPATH" in environ and "HOMEDRIVE" in environ:
path = os.path.join(environ["HOMEDRIVE"], environ["HOMEPATH"])
else:
return
if user is None:
return path
else:
return os.path.join(os.path.dirname(path), user)
else:
import pwd
if user is None:
if "HOME" in environ:
return environ["HOME"]
else:
try:
return path2fsn(pwd.getpwuid(os.getuid()).pw_dir)
except KeyError:
return
else:
try:
return path2fsn(pwd.getpwnam(user).pw_dir)
except KeyError:
return
def uid_exists(uid):
"""Check if a uid exists"""
try:
pwd.getpwuid(uid)
uid_exists = True
except KeyError:
uid_exists = False
return uid_exists
def owner(path):
"""Returns a tuple containing the username & groupname owning the path.
:param str path: the string path to retrieve the ownership
:return tuple(str, str): A (username, groupname) tuple containing the
name of the user and group owning the path.
:raises OSError: if the specified path does not exist
"""
stat = os.stat(path)
username = pwd.getpwuid(stat.st_uid)[0]
groupname = grp.getgrgid(stat.st_gid)[0]
return username, groupname
def uid_exists(uid):
"""Check if a uid exists"""
try:
pwd.getpwuid(uid)
uid_exists = True
except KeyError:
uid_exists = False
return uid_exists
def uid_exists(uid):
"""Check if a uid exists"""
try:
pwd.getpwuid(uid)
uid_exists = True
except KeyError:
uid_exists = False
return uid_exists
def owner(path):
"""Returns a tuple containing the username & groupname owning the path.
:param str path: the string path to retrieve the ownership
:return tuple(str, str): A (username, groupname) tuple containing the
name of the user and group owning the path.
:raises OSError: if the specified path does not exist
"""
stat = os.stat(path)
username = pwd.getpwuid(stat.st_uid)[0]
groupname = grp.getgrgid(stat.st_gid)[0]
return username, groupname
def uid_exists(uid):
"""Check if a uid exists"""
try:
pwd.getpwuid(uid)
uid_exists = True
except KeyError:
uid_exists = False
return uid_exists
def uid_exists(uid):
"""Check if a uid exists"""
try:
pwd.getpwuid(uid)
uid_exists = True
except KeyError:
uid_exists = False
return uid_exists
def owner(path):
"""Returns a tuple containing the username & groupname owning the path.
:param str path: the string path to retrieve the ownership
:return tuple(str, str): A (username, groupname) tuple containing the
name of the user and group owning the path.
:raises OSError: if the specified path does not exist
"""
stat = os.stat(path)
username = pwd.getpwuid(stat.st_uid)[0]
groupname = grp.getgrgid(stat.st_gid)[0]
return username, groupname
def uid_exists(uid):
"""Check if a uid exists"""
try:
pwd.getpwuid(uid)
uid_exists = True
except KeyError:
uid_exists = False
return uid_exists
def uid_exists(uid):
"""Check if a uid exists"""
try:
pwd.getpwuid(uid)
uid_exists = True
except KeyError:
uid_exists = False
return uid_exists