def _isexecutable(cmd):
if os.path.isfile(cmd):
mode = os.stat(cmd)[stat.ST_MODE]
if mode & stat.S_IXUSR or mode & stat.S_IXGRP or mode & stat.S_IXOTH:
return True
return False
python类S_IXOTH的实例源码
def _isexecutable(cmd):
if os.path.isfile(cmd):
mode = os.stat(cmd)[stat.ST_MODE]
if mode & stat.S_IXUSR or mode & stat.S_IXGRP or mode & stat.S_IXOTH:
return True
return False
def _write_script(output_dir, name, script):
path = os.path.join(output_dir, name)
with open(path, 'w') as f:
f.write(script)
os.chmod(
path,
os.stat(path).st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
def is_executable(file):
if not os.path.exists(file):
return False
st = os.stat(file)
return bool(st.st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
def initialize(self):
"""
Called inside of :meth:`TerminalApplication.initialize` shortly after the
WebSocket is instantiated. Attaches our two `terminal:authenticate` events
(to create the user's .ssh dir and send our CSS template) and ensures that
the ssh_connect.py script is executable.
"""
ssh_connect_path = os.path.join(PLUGIN_PATH, 'scripts', 'ssh_connect.py')
if os.path.exists(ssh_connect_path):
import stat
st = os.stat(ssh_connect_path)
if not bool(st.st_mode & stat.S_IXOTH):
try:
os.chmod(ssh_connect_path, 0o755)
except OSError:
ssh_log.error(_(
"Could not set %s as executable. You will need to 'chmod "
"a+x' that script manually.") % ssh_connect_path)
user_msg = _(
"Error loading SSH plugin: The ssh_connect.py script is "
"not executable. See the logs for more details.")
send_msg = partial(self.ws.send_message, user_msg)
events = ["terminal:authenticate", "terminal:new_terminal"]
self.on(events, send_msg)
self.ssh_log = go_logger("gateone.terminal.ssh", plugin='ssh')
# NOTE: Why not use the 'Events' hook for these? You can't attach two
# functions to the same event via that mechanism because it's a dict
# (one would override the other).
# An alternative would be to write a single function say, on_auth() that
# calls both of these functions then assign it to 'terminal:authenticate' in
# the 'Events' hook. I think this way is better since it is more explicit.
self.on('terminal:authenticate', bind(send_ssh_css_template, self))
self.on('terminal:authenticate', bind(create_user_ssh_dir, self))
def stats_to_str(s):
return "%s%s%s%s" % (dbpcs(s.st_mode),
rwx(s.st_mode, stat.S_IRUSR, stat.S_IWUSR, stat.S_IXUSR),
rwx(s.st_mode, stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP),
rwx(s.st_mode, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH))
def add_exec(file_path):
"""Add execution rights for user, group and others to the given file"""
print("change permissions")
# change Permission with bitwies or and the constants from stat modul
os.chmod(file_path, stat.S_IRUSR | stat.S_IWUSR| stat.S_IXUSR | \
stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
# global vars are very bad but I think the only solution for this
def _isexecutable(cmd):
if os.path.isfile(cmd):
mode = os.stat(cmd)[stat.ST_MODE]
if mode & stat.S_IXUSR or mode & stat.S_IXGRP or mode & stat.S_IXOTH:
return True
return False
def is_executable_file(path):
"""Checks that path is an executable regular file (or a symlink to a file).
This is roughly ``os.path isfile(path) and os.access(path, os.X_OK)``, but
on some platforms :func:`os.access` gives us the wrong answer, so this
checks permission bits directly.
"""
# follow symlinks,
fpath = os.path.realpath(path)
# return False for non-files (directories, fifo, etc.)
if not os.path.isfile(fpath):
return False
# On Solaris, etc., "If the process has appropriate privileges, an
# implementation may indicate success for X_OK even if none of the
# execute file permission bits are set."
#
# For this reason, it is necessary to explicitly check st_mode
# get file mode using os.stat, and check if `other',
# that is anybody, may read and execute.
mode = os.stat(fpath).st_mode
if mode & stat.S_IROTH and mode & stat.S_IXOTH:
return True
# get current user's group ids, and check if `group',
# when matching ours, may read and execute.
user_gids = os.getgroups() + [os.getgid()]
if (os.stat(fpath).st_gid in user_gids and
mode & stat.S_IRGRP and mode & stat.S_IXGRP):
return True
# finally, if file owner matches our effective userid,
# check if `user', may read and execute.
user_gids = os.getgroups() + [os.getgid()]
if (os.stat(fpath).st_uid == os.geteuid() and
mode & stat.S_IRUSR and mode & stat.S_IXUSR):
return True
return False
def create_executable(exec_dir, install_dir):
create_dir(exec_dir)
exec_filepath = os.path.join(exec_dir, EXECUTABLE_NAME)
with open(exec_filepath, 'w') as exec_file:
exec_file.write(AZ_DISPATCH_TEMPLATE.format(install_dir=install_dir))
cur_stat = os.stat(exec_filepath)
os.chmod(exec_filepath, cur_stat.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
print_status("The executable is available at '{}'.".format(exec_filepath))
return exec_filepath
def dcos_install_cli(install_location=None, client_version='1.8'):
"""
Downloads the dcos command line from Mesosphere
"""
system = platform.system()
if not install_location:
raise CLIError(
"No install location specified and it could not be determined from the current platform '{}'".format(
system))
base_url = 'https://downloads.dcos.io/binaries/cli/{}/x86-64/dcos-{}/{}'
if system == 'Windows':
file_url = base_url.format('windows', client_version, 'dcos.exe')
elif system == 'Linux':
# TODO Support ARM CPU here
file_url = base_url.format('linux', client_version, 'dcos')
elif system == 'Darwin':
file_url = base_url.format('darwin', client_version, 'dcos')
else:
raise CLIError('Proxy server ({}) does not exist on the cluster.'.format(system))
logger.warning('Downloading client to %s', install_location)
try:
_urlretrieve(file_url, install_location)
os.chmod(install_location,
os.stat(install_location).st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
except IOError as err:
raise CLIError('Connection error while attempting to download client ({})'.format(err))
def k8s_install_cli(client_version='latest', install_location=None):
"""Install kubectl, a command-line interface for Kubernetes clusters."""
if client_version == 'latest':
context = _ssl_context()
version = urlopen('https://storage.googleapis.com/kubernetes-release/release/stable.txt',
context=context).read()
client_version = version.decode('UTF-8').strip()
else:
client_version = "v%s" % client_version
file_url = ''
system = platform.system()
base_url = 'https://storage.googleapis.com/kubernetes-release/release/{}/bin/{}/amd64/{}'
if system == 'Windows':
file_url = base_url.format(client_version, 'windows', 'kubectl.exe')
elif system == 'Linux':
# TODO: Support ARM CPU here
file_url = base_url.format(client_version, 'linux', 'kubectl')
elif system == 'Darwin':
file_url = base_url.format(client_version, 'darwin', 'kubectl')
else:
raise CLIError('Proxy server ({}) does not exist on the cluster.'.format(system))
logger.warning('Downloading client to %s from %s', install_location, file_url)
try:
_urlretrieve(file_url, install_location)
os.chmod(install_location,
os.stat(install_location).st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
except IOError as ex:
raise CLIError('Connection error while attempting to download client ({})'.format(ex))
def test_set_config_value_file_permissions(self):
with mock.patch('azure.cli.core._config.GLOBAL_CONFIG_DIR', self.config_dir), \
mock.patch('azure.cli.core._config.GLOBAL_CONFIG_PATH', self.config_path):
set_global_config_value('test_section', 'test_option', 'a_value')
file_mode = os.stat(self.config_path).st_mode
self.assertTrue(bool(file_mode & stat.S_IRUSR))
self.assertTrue(bool(file_mode & stat.S_IWUSR))
self.assertFalse(bool(file_mode & stat.S_IXUSR))
self.assertFalse(bool(file_mode & stat.S_IRGRP))
self.assertFalse(bool(file_mode & stat.S_IWGRP))
self.assertFalse(bool(file_mode & stat.S_IXGRP))
self.assertFalse(bool(file_mode & stat.S_IROTH))
self.assertFalse(bool(file_mode & stat.S_IWOTH))
self.assertFalse(bool(file_mode & stat.S_IXOTH))
def _rename_file(src, dst):
"""Rename the specified file"""
if os.name == 'nt':
# TODO: check that fs.rm_safe works on windows, and if not, fix
# fs.rm_safe.
fs.rm_safe(dst)
os.rename(src, dst)
mode = os.stat(dst).st_mode
mode |= (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
if _is_executable(dst):
mode |= (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
os.chmod(dst, mode)
def is_executable_file(path):
"""Checks that path is an executable regular file, or a symlink towards one.
This is roughly ``os.path isfile(path) and os.access(path, os.X_OK)``.
"""
# follow symlinks,
fpath = os.path.realpath(path)
if not os.path.isfile(fpath):
# non-files (directories, fifo, etc.)
return False
mode = os.stat(fpath).st_mode
if (sys.platform.startswith('sunos')
and os.getuid() == 0):
# When root on Solaris, os.X_OK is True for *all* files, irregardless
# of their executability -- instead, any permission bit of any user,
# group, or other is fine enough.
#
# (This may be true for other "Unix98" OS's such as HP-UX and AIX)
return bool(mode & (stat.S_IXUSR |
stat.S_IXGRP |
stat.S_IXOTH))
return os.access(fpath, os.X_OK)
def get_file_exec_bits(fpath):
if os.path.isfile(fpath):
import stat
f_st = os.lstat(fpath)
st_exe_bits = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
file_exe_bits = f_st.st_mode & st_exe_bits
return file_exe_bits
return 0
def _isexecutable(cmd):
if os.path.isfile(cmd):
mode = os.stat(cmd)[stat.ST_MODE]
if mode & stat.S_IXUSR or mode & stat.S_IXGRP or mode & stat.S_IXOTH:
return True
return False
def is_executable_file(path):
"""Checks that path is an executable regular file, or a symlink towards one.
This is roughly ``os.path isfile(path) and os.access(path, os.X_OK)``.
"""
# follow symlinks,
fpath = os.path.realpath(path)
if not os.path.isfile(fpath):
# non-files (directories, fifo, etc.)
return False
mode = os.stat(fpath).st_mode
if (sys.platform.startswith('sunos')
and os.getuid() == 0):
# When root on Solaris, os.X_OK is True for *all* files, irregardless
# of their executability -- instead, any permission bit of any user,
# group, or other is fine enough.
#
# (This may be true for other "Unix98" OS's such as HP-UX and AIX)
return bool(mode & (stat.S_IXUSR |
stat.S_IXGRP |
stat.S_IXOTH))
return os.access(fpath, os.X_OK)
def _isexecutable(cmd):
if os.path.isfile(cmd):
mode = os.stat(cmd)[stat.ST_MODE]
if mode & stat.S_IXUSR or mode & stat.S_IXGRP or mode & stat.S_IXOTH:
return True
return False
def is_executable_file(path):
"""Checks that path is an executable regular file (or a symlink to a file).
This is roughly ``os.path isfile(path) and os.access(path, os.X_OK)``, but
on some platforms :func:`os.access` gives us the wrong answer, so this
checks permission bits directly.
"""
# follow symlinks,
fpath = os.path.realpath(path)
# return False for non-files (directories, fifo, etc.)
if not os.path.isfile(fpath):
return False
# On Solaris, etc., "If the process has appropriate privileges, an
# implementation may indicate success for X_OK even if none of the
# execute file permission bits are set."
#
# For this reason, it is necessary to explicitly check st_mode
# get file mode using os.stat, and check if `other',
# that is anybody, may read and execute.
mode = os.stat(fpath).st_mode
if mode & stat.S_IROTH and mode & stat.S_IXOTH:
return True
# get current user's group ids, and check if `group',
# when matching ours, may read and execute.
user_gids = os.getgroups() + [os.getgid()]
if (os.stat(fpath).st_gid in user_gids and
mode & stat.S_IRGRP and mode & stat.S_IXGRP):
return True
# finally, if file owner matches our effective userid,
# check if `user', may read and execute.
user_gids = os.getgroups() + [os.getgid()]
if (os.stat(fpath).st_uid == os.geteuid() and
mode & stat.S_IRUSR and mode & stat.S_IXUSR):
return True
return False