def getuser():
"""Get the username from the environment or password database.
First try various environment variables, then the password
database. This works on Windows as long as USERNAME is set.
"""
import os
for name in ('LOGNAME', 'USER', 'LNAME', 'USERNAME'):
user = os.environ.get(name)
if user:
return user
# If this fails, the exception will "explain" why
import pwd
return pwd.getpwuid(os.getuid())[0]
# Bind the name getpass to the appropriate function
python类getpwuid()的实例源码
def check_environ ():
"""Ensure that 'os.environ' has all the environment variables we
guarantee that users can use in config files, command-line options,
etc. Currently this includes:
HOME - user's home directory (Unix only)
PLAT - description of the current platform, including hardware
and OS (see 'get_platform()')
"""
global _environ_checked
if _environ_checked:
return
if os.name == 'posix' and 'HOME' not in os.environ:
import pwd
os.environ['HOME'] = pwd.getpwuid(os.getuid())[5]
if 'PLAT' not in os.environ:
os.environ['PLAT'] = get_platform()
_environ_checked = 1
def _find_grail_rc(self):
import glob
import pwd
import socket
import tempfile
tempdir = os.path.join(tempfile.gettempdir(),
".grail-unix")
user = pwd.getpwuid(os.getuid())[0]
filename = os.path.join(tempdir, user + "-*")
maybes = glob.glob(filename)
if not maybes:
return None
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
for fn in maybes:
# need to PING each one until we find one that's live
try:
s.connect(fn)
except socket.error:
# no good; attempt to clean it out, but don't fail:
try:
os.unlink(fn)
except IOError:
pass
else:
return s
def getuser():
"""Get the username from the environment or password database.
First try various environment variables, then the password
database. This works on Windows as long as USERNAME is set.
"""
import os
for name in ('LOGNAME', 'USER', 'LNAME', 'USERNAME'):
user = os.environ.get(name)
if user:
return user
# If this fails, the exception will "explain" why
import pwd
return pwd.getpwuid(os.getuid())[0]
# Bind the name getpass to the appropriate function
def test_make_archive_owner_group(self):
# testing make_archive with owner and group, with various combinations
# this works even if there's not gid/uid support
if UID_GID_SUPPORT:
group = grp.getgrgid(0)[0]
owner = pwd.getpwuid(0)[0]
else:
group = owner = 'root'
base_dir, root_dir, base_name = self._create_files()
base_name = os.path.join(self.mkdtemp() , 'archive')
res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
group=group)
self.assertTrue(os.path.exists(res))
res = make_archive(base_name, 'zip', root_dir, base_dir)
self.assertTrue(os.path.exists(res))
res = make_archive(base_name, 'tar', root_dir, base_dir,
owner=owner, group=group)
self.assertTrue(os.path.exists(res))
res = make_archive(base_name, 'tar', root_dir, base_dir,
owner='kjhkjhkjg', group='oihohoh')
self.assertTrue(os.path.exists(res))
def test_tarfile_root_owner(self):
tmpdir, tmpdir2, base_name = self._create_files()
old_dir = os.getcwd()
os.chdir(tmpdir)
group = grp.getgrgid(0)[0]
owner = pwd.getpwuid(0)[0]
try:
archive_name = make_tarball(base_name, 'dist', compress=None,
owner=owner, group=group)
finally:
os.chdir(old_dir)
# check if the compressed tarball was created
self.assertTrue(os.path.exists(archive_name))
# now checks the rights
archive = tarfile.open(archive_name)
try:
for member in archive.getmembers():
self.assertEqual(member.uid, 0)
self.assertEqual(member.gid, 0)
finally:
archive.close()
def compat_expanduser(path):
"""Expand ~ and ~user constructions. If user or $HOME is unknown,
do nothing."""
if not path.startswith('~'):
return path
i = path.find('/', 1)
if i < 0:
i = len(path)
if i == 1:
if 'HOME' not in os.environ:
import pwd
userhome = pwd.getpwuid(os.getuid()).pw_dir
else:
userhome = compat_getenv('HOME')
else:
import pwd
try:
pwent = pwd.getpwnam(path[1:i])
except KeyError:
return path
userhome = pwent.pw_dir
userhome = userhome.rstrip('/')
return (userhome + path[i:]) or '/'
def get_current_os_user():
"""
Find the real user who runs the current process. Return a tuple of uid, username, homedir.
:rtype: (int, str, str, int)
"""
user_name = os.getenv('SUDO_USER')
if not user_name:
user_name = os.getenv('USER')
if user_name:
pw = getpwnam(user_name)
user_uid = pw.pw_uid
else:
# If cannot find the user, use ruid instead.
user_uid = os.getresuid()[0]
pw = getpwuid(user_uid)
user_name = pw.pw_name
user_gid = pw.pw_gid
user_home = pw.pw_dir
return user_uid, user_name, user_home, user_gid
def username(self):
"""The name of the user that owns the process.
On UNIX this is calculated by using *real* process uid.
"""
if POSIX:
if pwd is None:
# might happen if python was installed from sources
raise ImportError(
"requires pwd module shipped with standard python")
real_uid = self.uids().real
try:
return pwd.getpwuid(real_uid).pw_name
except KeyError:
# the uid can't be resolved by the system
return str(real_uid)
else:
return self._proc.username()
def username(self):
"""The name of the user that owns the process.
On UNIX this is calculated by using *real* process uid.
"""
if POSIX:
if pwd is None:
# might happen if python was installed from sources
raise ImportError(
"requires pwd module shipped with standard python")
real_uid = self.uids().real
try:
return pwd.getpwuid(real_uid).pw_name
except KeyError:
# the uid can't be resolved by the system
return str(real_uid)
else:
return self._proc.username()
def chown(self, cpioinfo, cpiogetpath):
"""Set owner of cpiogetpath according to cpioinfo.
"""
if PWD and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = GRP.getgrgid(cpioinfo.gid)[2]
except KeyError:
g = os.getgid()
try:
u = PWD.getpwuid(cpioinfo.uid)[2]
except KeyError:
u = os.getuid()
try:
if cpioinfo.issym() and hasattr(os, "lchown"):
os.lchown(cpiogetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(cpiogetpath, u, g)
except EnvironmentError:
raise ExtractError("could not change owner")
def compat_expanduser(path):
"""Expand ~ and ~user constructions. If user or $HOME is unknown,
do nothing."""
if not path.startswith('~'):
return path
i = path.find('/', 1)
if i < 0:
i = len(path)
if i == 1:
if 'HOME' not in os.environ:
import pwd
userhome = pwd.getpwuid(os.getuid()).pw_dir
else:
userhome = compat_getenv('HOME')
else:
import pwd
try:
pwent = pwd.getpwnam(path[1:i])
except KeyError:
return path
userhome = pwent.pw_dir
userhome = userhome.rstrip('/')
return (userhome + path[i:]) or '/'
def getuser():
"""Get the username from the environment or password database.
First try various environment variables, then the password
database. This works on Windows as long as USERNAME is set.
"""
import os
for name in ('LOGNAME', 'USER', 'LNAME', 'USERNAME'):
user = os.environ.get(name)
if user:
return user
# If this fails, the exception will "explain" why
import pwd
return pwd.getpwuid(os.getuid())[0]
# Bind the name getpass to the appropriate function
def _find_grail_rc(self):
import glob
import pwd
import socket
import tempfile
tempdir = os.path.join(tempfile.gettempdir(),
".grail-unix")
user = pwd.getpwuid(os.getuid())[0]
filename = os.path.join(tempdir, user + "-*")
maybes = glob.glob(filename)
if not maybes:
return None
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
for fn in maybes:
# need to PING each one until we find one that's live
try:
s.connect(fn)
except socket.error:
# no good; attempt to clean it out, but don't fail:
try:
os.unlink(fn)
except IOError:
pass
else:
return s
def update_job_statuses(db, jobs):
# job_names_param = ",".join("gasp_" + x.id for x in jobs)
p = subprocess.Popen(["/usr/cluster/bin/sacct", "-u", pwd.getpwuid(os.getuid())[0], "--format", "jobid,state,exitcode,jobname", "--noheader", "-P", "-S", (datetime.date.today() - datetime.timedelta(days=30)).strftime("%Y-%m-%d")], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
squeue_out, squeue_err = p.communicate()
fake_data = """29646434 PENDING 0
29646435 COMPLETED 0
"""
# only keep last record for jobs that were re-run
slurm_jobs_found = dict()
for line in squeue_out.rstrip().split("\n"):
if line:
slurm_job = line.strip().split("|")
# strip off "gasp_"
slurm_jobs_found[slurm_job[3][5:]] = slurm_job
for slurm_job in slurm_jobs_found.values():
for j in jobs:
if slurm_job[3][5:] == j.id:
Tracker.update_job_status(db, j, slurm_job[1], slurm_job[2])
break
def get_queue():
cols = [["job_id", "%i"], ["job_name", "%j"], ["state", "%t"],
["time", "%M"], ["reason", "%R"]]
col_names = [x[0] for x in cols]
col_formats = [x[1] for x in cols]
cmd = ["/usr/cluster/bin/squeue",
"-u", pwd.getpwuid(os.getuid())[0],
"-p", "encore",
"-o", "|".join(col_formats),
"--noheader"]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
squeue_out, squeue_err = p.communicate()
queue = {"running": [], "queued": []}
for line in squeue_out.split("\n"):
values = line.split("|")
if len(values) != len(col_names):
continue
row = dict(zip(col_names, values))
row["job_name"] = row["job_name"][5:]
if row["state"]=="R":
queue["running"].append(row)
elif row["state"] == "PD":
queue["queued"].append(row)
return queue
def getuser():
"""Get the username from the environment or password database.
First try various environment variables, then the password
database. This works on Windows as long as USERNAME is set.
"""
import os
for name in ('LOGNAME', 'USER', 'LNAME', 'USERNAME'):
user = os.environ.get(name)
if user:
return user
# If this fails, the exception will "explain" why
import pwd
return pwd.getpwuid(os.getuid())[0]
# Bind the name getpass to the appropriate function
def _find_grail_rc(self):
import glob
import pwd
import socket
import tempfile
tempdir = os.path.join(tempfile.gettempdir(),
".grail-unix")
user = pwd.getpwuid(os.getuid())[0]
filename = os.path.join(tempdir, user + "-*")
maybes = glob.glob(filename)
if not maybes:
return None
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
for fn in maybes:
# need to PING each one until we find one that's live
try:
s.connect(fn)
except socket.error:
# no good; attempt to clean it out, but don't fail:
try:
os.unlink(fn)
except IOError:
pass
else:
return s
def username(self):
"""The name of the user that owns the process.
On UNIX this is calculated by using *real* process uid.
"""
if POSIX:
if pwd is None:
# might happen if python was installed from sources
raise ImportError(
"requires pwd module shipped with standard python")
real_uid = self.uids().real
try:
return pwd.getpwuid(real_uid).pw_name
except KeyError:
# the uid can't be resolved by the system
return str(real_uid)
else:
return self._proc.username()
def username(self):
"""The name of the user that owns the process.
On UNIX this is calculated by using *real* process uid.
"""
if POSIX:
if pwd is None:
# might happen if python was installed from sources
raise ImportError(
"requires pwd module shipped with standard python")
real_uid = self.uids().real
try:
return pwd.getpwuid(real_uid).pw_name
except KeyError:
# the uid can't be resolved by the system
return str(real_uid)
else:
return self._proc.username()
def compat_expanduser(path):
"""Expand ~ and ~user constructions. If user or $HOME is unknown,
do nothing."""
if not path.startswith('~'):
return path
i = path.find('/', 1)
if i < 0:
i = len(path)
if i == 1:
if 'HOME' not in os.environ:
import pwd
userhome = pwd.getpwuid(os.getuid()).pw_dir
else:
userhome = compat_getenv('HOME')
else:
import pwd
try:
pwent = pwd.getpwnam(path[1:i])
except KeyError:
return path
userhome = pwent.pw_dir
userhome = userhome.rstrip('/')
return (userhome + path[i:]) or '/'
def compat_expanduser(path):
"""Expand ~ and ~user constructions. If user or $HOME is unknown,
do nothing."""
if not path.startswith('~'):
return path
i = path.find('/', 1)
if i < 0:
i = len(path)
if i == 1:
if 'HOME' not in os.environ:
import pwd
userhome = pwd.getpwuid(os.getuid()).pw_dir
else:
userhome = compat_getenv('HOME')
else:
import pwd
try:
pwent = pwd.getpwnam(path[1:i])
except KeyError:
return path
userhome = pwent.pw_dir
userhome = userhome.rstrip('/')
return (userhome + path[i:]) or '/'
def getuser():
"""Get the username from the environment or password database.
First try various environment variables, then the password
database. This works on Windows as long as USERNAME is set.
"""
import os
for name in ('LOGNAME', 'USER', 'LNAME', 'USERNAME'):
user = os.environ.get(name)
if user:
return user
# If this fails, the exception will "explain" why
import pwd
return pwd.getpwuid(os.getuid())[0]
# Bind the name getpass to the appropriate function
def test_make_archive_owner_group(self):
# testing make_archive with owner and group, with various combinations
# this works even if there's not gid/uid support
if UID_GID_SUPPORT:
group = grp.getgrgid(0)[0]
owner = pwd.getpwuid(0)[0]
else:
group = owner = 'root'
base_dir, root_dir, base_name = self._create_files()
base_name = os.path.join(self.mkdtemp() , 'archive')
res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
group=group)
self.assertTrue(os.path.exists(res))
res = make_archive(base_name, 'zip', root_dir, base_dir)
self.assertTrue(os.path.exists(res))
res = make_archive(base_name, 'tar', root_dir, base_dir,
owner=owner, group=group)
self.assertTrue(os.path.exists(res))
res = make_archive(base_name, 'tar', root_dir, base_dir,
owner='kjhkjhkjg', group='oihohoh')
self.assertTrue(os.path.exists(res))
def test_tarfile_root_owner(self):
tmpdir, tmpdir2, base_name = self._create_files()
old_dir = os.getcwd()
os.chdir(tmpdir)
group = grp.getgrgid(0)[0]
owner = pwd.getpwuid(0)[0]
try:
archive_name = _make_tarball(base_name, 'dist', compress=None,
owner=owner, group=group)
finally:
os.chdir(old_dir)
# check if the compressed tarball was created
self.assertTrue(os.path.exists(archive_name))
# now checks the rights
archive = tarfile.open(archive_name)
try:
for member in archive.getmembers():
self.assertEqual(member.uid, 0)
self.assertEqual(member.gid, 0)
finally:
archive.close()
def test_initgroups(self):
# It takes a string and an integer; check that it raises a TypeError
# for other argument lists.
self.assertRaises(TypeError, posix.initgroups)
self.assertRaises(TypeError, posix.initgroups, None)
self.assertRaises(TypeError, posix.initgroups, 3, "foo")
self.assertRaises(TypeError, posix.initgroups, "foo", 3, object())
# If a non-privileged user invokes it, it should fail with OSError
# EPERM.
if os.getuid() != 0:
name = pwd.getpwuid(posix.getuid()).pw_name
try:
posix.initgroups(name, 13)
except OSError as e:
self.assertEqual(e.errno, errno.EPERM)
else:
self.fail("Expected OSError to be raised by initgroups")
def _find_grail_rc(self):
import glob
import pwd
import socket
import tempfile
tempdir = os.path.join(tempfile.gettempdir(),
".grail-unix")
user = pwd.getpwuid(os.getuid())[0]
filename = os.path.join(tempdir, user + "-*")
maybes = glob.glob(filename)
if not maybes:
return None
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
for fn in maybes:
# need to PING each one until we find one that's live
try:
s.connect(fn)
except socket.error:
# no good; attempt to clean it out, but don't fail:
try:
os.unlink(fn)
except IOError:
pass
else:
return s
def _find_grail_rc(self):
import glob
import pwd
import socket
import tempfile
tempdir = os.path.join(tempfile.gettempdir(),
".grail-unix")
user = pwd.getpwuid(os.getuid())[0]
filename = os.path.join(tempdir, user + "-*")
maybes = glob.glob(filename)
if not maybes:
return None
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
for fn in maybes:
# need to PING each one until we find one that's live
try:
s.connect(fn)
except socket.error:
# no good; attempt to clean it out, but don't fail:
try:
os.unlink(fn)
except IOError:
pass
else:
return s
def test_add_graph(self):
uid = os.getuid()
info = pwd.getpwuid(uid)
username = info.pw_name
self.manager.dispatch_message(
{"type": "custom-graph-add",
"interpreter": "/bin/sh",
"code": "echo hi!",
"username": username,
"graph-id": 123})
self.assertEqual(
self.store.get_graphs(),
[(123,
os.path.join(self.data_path, "custom-graph-scripts",
"graph-123"),
username)])
def test_send_message_add_stored_graph(self):
"""
C{send_message} send the graph with no data, to notify the server of
the existence of the script, even if the script hasn't been run yet.
"""
uid = os.getuid()
info = pwd.getpwuid(uid)
username = info.pw_name
self.manager.dispatch_message(
{"type": "custom-graph-add",
"interpreter": "/bin/sh",
"code": "echo hi!",
"username": username,
"graph-id": 123})
self.graph_manager.exchange()
self.assertMessages(
self.broker_service.message_store.get_pending_messages(),
[{"api": b"3.2",
"data": {123: {"error": u"",
"script-hash":
b"e00a2f44dbc7b6710ce32af2348aec9b",
"values": []}},
"timestamp": 0,
"type": "custom-graph"}])