def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")
python类geteuid()的实例源码
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
def getPrivateKeys(self):
ks = {}
euid,egid = os.geteuid(), os.getegid()
os.setegid(0) # gain priviledges
os.seteuid(0)
for file in os.listdir(self.dataRoot):
if file[:9] == 'ssh_host_' and file[-4:]=='_key':
try:
k = keys.getPrivateKeyObject(self.dataRoot+'/'+file)
t = keys.objectType(k)
ks[t] = k
except Exception, e:
log.msg('bad private key file %s: %s' % (file, e))
os.setegid(egid) # drop them just as quickily
os.seteuid(euid)
return ks
def callIntoPAM(service, user, conv):
"""A testing hook.
"""
pam = PAM.pam()
pam.start(service)
pam.set_item(PAM.PAM_USER, user)
pam.set_item(PAM.PAM_CONV, conv)
gid = os.getegid()
uid = os.geteuid()
os.setegid(0)
os.seteuid(0)
try:
pam.authenticate() # these will raise
pam.acct_mgmt()
return 1
finally:
os.setegid(gid)
os.seteuid(uid)
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError, e:
raise ExtractError("could not change owner")
def check_access(filename, write_required=True):
"""
Checks if user has read and optionaly write access to specified file.
Uses acl first and possix file permisions if acl cannot be used.
Returns true only if user has both required access rights.
"""
if HAVE_POSIX1E:
for pset in posix1e.ACL(file=filename):
if pset.tag_type == posix1e.ACL_USER and pset.qualifier == os.geteuid():
if pset.permset.test(posix1e.ACL_READ) and (not write_required or pset.permset.test(posix1e.ACL_WRITE)):
return True
if pset.tag_type == posix1e.ACL_GROUP and pset.qualifier in os.getgroups():
if pset.permset.test(posix1e.ACL_READ) and (not write_required or pset.permset.test(posix1e.ACL_WRITE)):
return True
if write_required:
return os.access(filename, os.R_OK | os.W_OK)
return os.access(filename, os.R_OK)
def check_enableusersite():
"""Check if user site directory is safe for inclusion
The function tests for the command line flag (including environment var),
process uid/gid equal to effective uid/gid.
None: Disabled for security reasons
False: Disabled by user (command line option)
True: Safe and enabled
"""
if hasattr(sys, 'flags') and getattr(sys.flags, 'no_user_site', False):
return False
if hasattr(os, "getuid") and hasattr(os, "geteuid"):
# check process uid == effective uid
if os.geteuid() != os.getuid():
return None
if hasattr(os, "getgid") and hasattr(os, "getegid"):
# check process gid == effective gid
if os.getegid() != os.getgid():
return None
return True
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
def check_enableusersite():
"""Check if user site directory is safe for inclusion
The function tests for the command line flag (including environment var),
process uid/gid equal to effective uid/gid.
None: Disabled for security reasons
False: Disabled by user (command line option)
True: Safe and enabled
"""
if hasattr(sys, 'flags') and getattr(sys.flags, 'no_user_site', False):
return False
if hasattr(os, "getuid") and hasattr(os, "geteuid"):
# check process uid == effective uid
if os.geteuid() != os.getuid():
return None
if hasattr(os, "getgid") and hasattr(os, "getegid"):
# check process gid == effective gid
if os.getegid() != os.getgid():
return None
return True
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")
def Update():
# Check for ROOT
# If "uls --update" is not run as ROOT, notify user & exit.
print('Checking for ROOT...')
if os.geteuid() != 0:
print("ERR_1005: 'uls --update' must be run as ROOT. Use 'sudo uls --update' instead.")
exit(1005)
# Check for Internet connection before update
print('Checking for Internet connection...')
rPing = os.popen("ping -c 3 raw.githubusercontent.com | grep '0 received' | wc -l")
strPing = rPing.read().strip('\n')
rPing.close()
# If Internet is unavailable, exit.
if strPing == '1':
print('ERR_1003: Internet is unavailable. Check your connection before running update.')
exit(1003)
# Now, do the update
os.system("wget --no-check-certificate -O /usr/share/uls/uls_update.sh https://raw.githubusercontent.com/CYRO4S/Universal-Linux-Script/master/uls_update.sh && bash /usr/share/uls/uls_update.sh")
exit(0)
# Echo
def main(argv):
"""
Used to fetch the snabb instance information from the JET app.
:param argv: Arguments for the command
:return: Dictionary of instances state information
"""
try:
server = xmlrpclib.ServerProxy('http://127.0.0.1:9191', verbose=False)
output = server.lwaftr()
except Exception as e:
output = "Failed to connect to jetapp " + e.message+ " user: "+ str(os.geteuid())
print "<snabb><instance><rpc_error>"+output+"</rpc_error></instance></snabb>"
return
if (output != None):
snabb_state(output)
else:
print "<snabb><instance><rpc_error>"+output+"</rpc_error></instance></snabb>"
def main(argv):
"""
Used to fetch the snabb instance information from the JET app.
:param argv: Arguments for the command
:return: Dictionary of instances state information
"""
try:
server = xmlrpclib.ServerProxy('http://127.0.0.1:9191', verbose=False)
output = server.lwaftr()
except Exception as e:
output = "Failed to connect to jetapp " + e.message+ " user: "+ str(os.geteuid())
print "<snabb><instance><rpc_error>"+output+"</rpc_error></instance></snabb>"
return
if (output != None):
snabb_state(output)
else:
print "<snabb><instance><rpc_error>"+output+"</rpc_error></instance></snabb>"
def effectivelyReadable(self):
uid = os.getuid()
euid = os.geteuid()
gid = os.getgid()
egid = os.getegid()
# This is probably true most of the time, so just let os.access()
# handle it. Avoids potential bugs in the rest of this function.
if uid == euid and gid == egid:
return os.access(self.name, os.R_OK)
st = os.stat(self.name)
# This may be wrong depending on the semantics of your OS.
# i.e. if the file is -------r--, does the owner have access or not?
if st.st_uid == euid:
return st.st_mode & stat.S_IRUSR != 0
# See comment for UID check above.
groups = os.getgroups()
if st.st_gid == egid or st.st_gid in groups:
return st.st_mode & stat.S_IRGRP != 0
return st.st_mode & stat.S_IROTH != 0
def effectivelyReadable(self):
uid = os.getuid()
euid = os.geteuid()
gid = os.getgid()
egid = os.getegid()
# This is probably true most of the time, so just let os.access()
# handle it. Avoids potential bugs in the rest of this function.
if uid == euid and gid == egid:
return os.access(self.name, os.R_OK)
st = os.stat(self.name)
# This may be wrong depending on the semantics of your OS.
# i.e. if the file is -------r--, does the owner have access or not?
if st.st_uid == euid:
return st.st_mode & stat.S_IRUSR != 0
# See comment for UID check above.
groups = os.getgroups()
if st.st_gid == egid or st.st_gid in groups:
return st.st_mode & stat.S_IRGRP != 0
return st.st_mode & stat.S_IROTH != 0
def check_enableusersite():
"""Check if user site directory is safe for inclusion
The function tests for the command line flag (including environment var),
process uid/gid equal to effective uid/gid.
None: Disabled for security reasons
False: Disabled by user (command line option)
True: Safe and enabled
"""
if hasattr(sys, 'flags') and getattr(sys.flags, 'no_user_site', False):
return False
if hasattr(os, "getuid") and hasattr(os, "geteuid"):
# check process uid == effective uid
if os.geteuid() != os.getuid():
return None
if hasattr(os, "getgid") and hasattr(os, "getegid"):
# check process gid == effective gid
if os.getegid() != os.getgid():
return None
return True
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
"""
if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
# We have to be root to do so.
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
else:
if sys.platform != "os2emx":
os.chown(targetpath, u, g)
except EnvironmentError as e:
raise ExtractError("could not change owner")
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)