def handle(self, *args, **options):
commit_msg_path = os.path.join(self.HOOK_PATH, 'commit-msg')
hook_exists = os.path.exists(commit_msg_path)
if hook_exists:
with open(commit_msg_path, 'r') as fp:
hook_content = fp.read()
else:
hook_content = '#!/usr/bin/env bash\n\n'
if 'ZERODOWNTIME_COMMIT_MSG_HOOK' not in hook_content:
hook_content += COMMIT_MSG_HOOK
with open(commit_msg_path, 'w') as fp:
fp.write(hook_content)
st = os.stat(commit_msg_path)
os.chmod(commit_msg_path, st.st_mode | stat.S_IEXEC)
python类S_IEXEC的实例源码
def _clean_upgrade(binary_ok, binary_path, path, temp_path):
if binary_ok:
import stat
# save the permissions from the current binary
old_stat = os.stat(binary_path)
# rename the current binary in order to replace it with the latest
os.rename(binary_path, path + "/old")
os.rename(temp_path, binary_path)
# set the same permissions that had the previous binary
os.chmod(binary_path, old_stat.st_mode | stat.S_IEXEC)
# delete the old binary
os.remove(path + "/old")
print("mongoaudit updated, restarting...")
os.execl(binary_path, binary_path, *sys.argv)
else:
os.remove(temp_path)
print("couldn't download the latest binary")
def download_executable(self):
"""
Downloads the executable file
:return: None
"""
# noinspection PyBroadException
try:
self.logger.info("Downloading executable file")
with open(self.executable_file, 'wb') as destination:
data = requests.get(self.define_executable_file_url()).content
destination.write(data)
self.logger.info("Download Complete")
# Set executable permissions
st = os.stat(self.executable_file)
os.chmod(self.executable_file, st.st_mode | stat.S_IEXEC)
except Exception as e:
self.logger.error(
"Could not download executable. Disabling Service.")
self.logger.debug("Cause of download failure: " + str(e))
if os.path.isfile(self.executable_file):
os.remove(self.executable_file)
def run(last_run_version):
if config.get(["modules", "launcher", "auto_start"], 0):
import autorun
autorun.enable()
if os.path.isdir(os.path.join(top_path, 'launcher')):
shutil.rmtree(os.path.join(top_path, 'launcher')) # launcher is for auto-update from 2.X
if older_or_equal(last_run_version, '3.0.4'):
xlog.info("migrating to 3.x.x")
for filename in os.listdir(top_path):
filepath = os.path.join(top_path, filename)
if os.path.isfile(filepath):
if sys.platform != 'win32' and filename == 'start':
st = os.stat(filepath)
os.chmod(filepath, st.st_mode | stat.S_IEXEC)
if filename in ['start.sh', 'start.command', 'start.lnk', 'LICENSE.txt', 'download.md', 'version.txt', 'xxnet', 'xxnet.bat', 'xxnet.vbs', 'xx_net.sh']:
os.remove(filepath)
else:
if filename in ['goagent', 'python27', 'gae_proxy', 'php_proxy', 'x_tunnel', 'python3', 'Python3', 'lib', 'SwitchySharp']:
shutil.rmtree(filepath)
def run(last_run_version):
if config.get(["modules", "launcher", "auto_start"], 0):
import autorun
autorun.enable()
if os.path.isdir(os.path.join(top_path, 'launcher')):
shutil.rmtree(os.path.join(top_path, 'launcher')) # launcher is for auto-update from 2.X
if older_or_equal(last_run_version, '3.0.4'):
xlog.info("migrating to 3.x.x")
for filename in os.listdir(top_path):
filepath = os.path.join(top_path, filename)
if os.path.isfile(filepath):
if sys.platform != 'win32' and filename == 'start':
st = os.stat(filepath)
os.chmod(filepath, st.st_mode | stat.S_IEXEC)
if filename in ['start.sh', 'start.command', 'start.lnk', 'LICENSE.txt', 'download.md', 'version.txt', 'xxnet', 'xxnet.bat', 'xxnet.vbs', 'xx_net.sh']:
os.remove(filepath)
else:
if filename in ['goagent', 'python27', 'gae_proxy', 'php_proxy', 'x_tunnel', 'python3', 'Python3', 'lib', 'SwitchySharp']:
shutil.rmtree(filepath)
def get(env, platform=None):
if not platform:
platform = env.platform
# Only try to download/extract if it doesn't already exist
if not os.path.exists(env.love_binary(platform=platform)):
# Only try to download if the .zip doesn't already exist
if not os.path.exists(env.download_file(platform=platform)):
print('Downloading LOVE (%s; %s)...' %
(env.conf.love_version, platform))
download(env.download_url(platform=platform),
env.download_file(platform=platform))
print('Download complete!')
print('Extracting LOVE (%s; %s)...' % (env.conf.love_version,
platform))
extract(platform, env.download_file(platform=platform))
st = os.stat(env.love_binary(platform=platform))
os.chmod(env.love_binary(platform=platform),
st.st_mode | stat.S_IEXEC)
print('Extraction complete!')
def to_disk(self, path=None, name=None):
"""read from ram, overwrite disk"""
if not self._inram:
raise Exception('Not in ram. Cant write to disk.')
if path is None:
path = self.path
if name is None:
name = self.name
if not os.path.isdir(path):
os.makedirs(path)
with cd(path):
with open(name, 'wb') as fil_out:
fil_out.write(self.data)
if self.executable:
os.chmod(self.name, stat.S_IEXEC)
self._ondisk = True
def _clean_upgrade(binary_ok, binary_path, path, temp_path):
if binary_ok:
import stat
# save the permissions from the current binary
old_stat = os.stat(binary_path)
# rename the current binary in order to replace it with the latest
os.rename(binary_path, path + "/old")
os.rename(temp_path, binary_path)
# set the same permissions that had the previous binary
os.chmod(binary_path, old_stat.st_mode | stat.S_IEXEC)
# delete the old binary
os.remove(path + "/old")
print("mongoaudit updated, restarting...")
os.execl(binary_path, binary_path, *sys.argv)
else:
os.remove(temp_path)
print("couldn't download the latest binary")
def copy_menu():
"""Copies interactive menu to the executable directory."""
menu_source_path = source_directory + "/menu.py"
menu_bin_path = executable_directory + "/tm"
try:
if not os.path.exists(executable_directory):
os.makedirs(executable_directory)
# Create a new file 'tm' in the executable directory.
copyfile(menu_source_path, menu_bin_path)
# Make the file executable.
st = os.stat(menu_bin_path)
os.chmod(menu_bin_path, st.st_mode | stat.S_IEXEC)
print("Menu successfully copied to {}.".format(executable_directory))
return True
except IOError as detail:
print("Menu file could not be copied to {}. ".
format(executable_directory) + str(detail))
return False
def save(filename):
try:
os.mkdir('saves/')
except OSError:
pass
try:
data = unicorn.get_pixels()
print(filename, data)
file = open('saves/' + filename + '.py', 'w')
file.write('#!/usr/bin/env python\n')
file.write('import unicornhat, signal\n')
file.write('pixels = ' + str(unicorn.get_pixels()) + '\n')
file.write('unicornhat.set_pixels(pixels)\n')
file.write('unicornhat.show()\n')
file.write('signal.pause()')
file.close()
os.chmod('saves/' + filename + '.py', 0o777 | stat.S_IEXEC)
return("ok" + str(unicorn.get_pixels()))
except AttributeError:
print("Unable to save, please update")
print("unicornhat library!")
return("fail")
def run(last_run_version):
if config.get(["modules", "launcher", "auto_start"], 0):
import autorun
autorun.enable()
if os.path.isdir(os.path.join(top_path, 'launcher')):
shutil.rmtree(os.path.join(top_path, 'launcher')) # launcher is for auto-update from 2.X
if older_or_equal(last_run_version, '3.0.4'):
xlog.info("migrating to 3.x.x")
for filename in os.listdir(top_path):
filepath = os.path.join(top_path, filename)
if os.path.isfile(filepath):
if sys.platform != 'win32' and filename == 'start':
st = os.stat(filepath)
os.chmod(filepath, st.st_mode | stat.S_IEXEC)
if filename in ['start.sh', 'start.command', 'start.lnk', 'LICENSE.txt', 'download.md', 'version.txt', 'xxnet', 'xxnet.bat', 'xxnet.vbs', 'xx_net.sh']:
os.remove(filepath)
else:
if filename in ['goagent', 'python27', 'gae_proxy', 'php_proxy', 'x_tunnel', 'python3', 'Python3', 'lib', 'SwitchySharp']:
shutil.rmtree(filepath)
def CopyTool(source_path):
"""Copies the given tool to the current directory, including a warning not
to edit it."""
with open(source_path) as source_file:
tool_source = source_file.readlines()
# Add header and write it out to the current directory (which should be the
# root build dir).
out_path = 'gyp-mac-tool'
with open(out_path, 'w') as tool_file:
tool_file.write(''.join([tool_source[0],
'# Generated by setup_toolchain.py do not edit.\n']
+ tool_source[1:]))
st = os.stat(out_path)
os.chmod(out_path, st.st_mode | stat.S_IEXEC)
# Find the tool source, it's the first argument, and copy it.
def execute_installscript(options, files_to_archive, module_use):
current_perm = os.stat(options.install_script.name)
if not options.csgteam: # too often this fail for csgteam
os.chmod(options.install_script.name, current_perm.st_mode | stat.S_IEXEC)
print term.bold_green("Running ./" + options.install_script.name + "...")
stop_logging_current_session() # log the output of the script in a different dir
log = "hpci." + os.path.basename(options.install_script.name) + "-" + str(
datetime.datetime.now().isoformat().split(".")[0].replace("-", "").replace(":", "")) + ".log" # 20161116T114145
start_logging_current_session(files_to_archive, log=log)
p = subcall(module_use + "./" + options.install_script.name, use_popen=True, debug=options.debug)
process_output = " "
while process_output != "": # continue while the process is running, it'll be "" when EOF is reached
process_output = p.stdout.readline() # needs to do this instead of using subprocess.call to allow
print process_output, # 'tee' to log the process output
p.wait()
stop_logging_current_session()
files_to_archive.append(log)
start_logging_current_session(files_to_archive, continuation=True)
print term.bold_green("Done running ./" + options.install_script.name + " - exited with code " + str(p.returncode))
if p.returncode != 0:
ask_confirmation_for(True, "Running " + options.install_script.name + " failed. Archive logs anyway? ")
files_to_archive.append(options.install_script.name)
def setUp(self):
content = """
#!/usr/bin/env bash
OUTPUT_FILENAME=/tmp/SultanRunScript/lorum.txt
mkdir -p /tmp/SultanRunScript
echo 'Lorem ipsum dolor sit amet, consectetur adipiscing elit.\n' > $OUTPUT_FILENAME
echo 'Nunc in enim dictum, consectetur ex vehicula, fermentum orci.\n' > $OUTPUT_FILENAME
echo 'Donec sapien turpis, mattis vel urna sed, iaculis aliquam purus.\n' > $OUTPUT_FILENAME
"""
self.dir_path = '/tmp/SultanRunScript'
self.script_filepath = '/tmp/SultanRunScript/myscript'
self.output_filepath = '/tmp/SultanRunScript/lorum.txt'
if os.path.exists(self.dir_path): shutil.rmtree(self.dir_path)
s = Sultan()
os.mkdir(self.dir_path)
with open(self.script_filepath, 'w') as f:
f.write(content)
st = os.stat(self.script_filepath)
os.chmod(self.script_filepath, st.st_mode | stat.S_IEXEC)
def run(last_run_version):
if config.get(["modules", "launcher", "auto_start"], 0):
import autorun
autorun.enable()
if os.path.isdir(os.path.join(top_path, 'launcher')):
shutil.rmtree(os.path.join(top_path, 'launcher')) # launcher is for auto-update from 2.X
if older_or_equal(last_run_version, '3.0.4'):
xlog.info("migrating to 3.x.x")
for filename in os.listdir(top_path):
filepath = os.path.join(top_path, filename)
if os.path.isfile(filepath):
if sys.platform != 'win32' and filename == 'start':
st = os.stat(filepath)
os.chmod(filepath, st.st_mode | stat.S_IEXEC)
if not filename.startswith('.') and filename not in ['README.md', 'start', 'start.bat', 'start.vbs']:
os.remove(filepath)
else:
if not filename.startswith('.') and filename not in ['code', 'data']:
shutil.rmtree(filepath)
def add_jupyter_here():
if not os.path.exists(NPATH):
print("Nothing done. Currently only Gnome with Nautilus as file ",
"manager is supported.")
return
if not os.path.exists(SPATH):
os.makedirs(SPATH)
logo_path = os.path.expandvars(os.path.join(
os.path.dirname(__file__), 'icons'))
logos = {'qtconsole': os.path.join(logo_path, 'jupyter-qtconsole.png'),
'notebook': os.path.join(logo_path, 'jupyter.png')}
for terminal in ["qtconsole", "notebook"]:
script_path = os.path.join(SPATH, "Jupyter %s here" % terminal)
if not os.path.exists(script_path):
with open(script_path, "w") as f:
f.write(script % (terminal, terminal))
st = os.stat(script_path)
os.chmod(script_path, st.st_mode | stat.S_IEXEC)
call(['gio', 'set', '-t', 'string', '%s' % script_path,
'metadata::custom-icon', 'file://%s' % logos[terminal]])
print('Jupyter %s here created.' % terminal)
def render_template(context, template, path=None, executable=False):
"""
Renders the ``template`` template with the given ``context``. The result is
written to ``path``. If ``path`` is not specified, the result is
returned as a string
"""
env = _init_template_env()
data = env.get_template(template).render(context)
if not path:
return data
with open(path, 'w') as f:
f.write(data)
if executable:
st = os.stat(path)
os.chmod(path, st.st_mode | stat.S_IEXEC)
def _download_repo(repo_path):
"""
Download Google's repo.
"""
logger.info('Fetching repo')
repo_url = CONSTANTS['repo']['url']
response = requests.get(repo_url)
if response.status_code != 200:
raise CommandError('Unable to download repo from %s' % repo_url)
with open(repo_path, 'wb') as f:
f.write(response.content)
logger.success('Fetched repo')
# Ensure that the repo binary is executable
st = os.stat(repo_path)
os.chmod(repo_path, st.st_mode | stat.S_IEXEC)
def test_in_shebang(self, remove_shell_envs, capfd):
"""Test that conda-shell works from within a shebang line."""
tempfd = tempfile.NamedTemporaryFile(mode='w', delete=False)
tempfd.write('''#!/usr/bin/env conda-shell
#!conda-shell -i python python=3.6 numpy=1.12
import numpy as np
print(f\'np.arange(10): {np.arange(10)}\')
''')
tempfd.flush()
tempfd.close()
stats = os.stat(tempfd.name)
os.chmod(tempfd.name, stats.st_mode | stat.S_IEXEC)
subprocess.check_call([tempfd.name],
universal_newlines=True,
env=remove_shell_envs)
out, err = capfd.readouterr()
assert 'np.arange(10): [0 1 2 3 4 5 6 7 8 9]' in out+err
def test_in_shebang_multiline(self, remove_shell_envs, capfd):
"""Test that conda-shell works from within a shebang line."""
tempfd = tempfile.NamedTemporaryFile(mode='w', delete=False)
tempfd.write('''#!/usr/bin/env conda-shell
#!conda-shell -i python python=3.6 numpy=1.12
#!conda-shell -c conda-forge pandas pydap
import numpy as np
import pandas as pd
import pydap
print(f\'np.arange(10): {np.arange(10)}\')
''')
tempfd.flush()
tempfd.close()
stats = os.stat(tempfd.name)
os.chmod(tempfd.name, stats.st_mode | stat.S_IEXEC)
subprocess.check_call([tempfd.name],
universal_newlines=True,
env=remove_shell_envs)
out, err = capfd.readouterr()
assert 'np.arange(10): [0 1 2 3 4 5 6 7 8 9]' in out+err
def chmodX (thefile):
"""
Convert script file to executable or add extract shebang to cmd line
@params:
`thefile`: the script file
@returns:
A list with or without the path of the interpreter as the first element and the script file as the last element
"""
thefile = path.realpath(thefile)
ret = [thefile]
try:
st = stat (thefile)
chmod (thefile, st.st_mode | S_IEXEC)
except Exception as e1:
try:
shebang = open (thefile).read().strip().splitlines()[0]
if not shebang.startswith("#!"): # pragma: no cover
raise
ret = shebang[2:].strip().split() + [thefile] # pragma: no cover
except Exception as e2:
raise Exception("Cannot change %s as executable or read the shebang from it:\n%s\n%s" % (thefile, e1, e2))
return ret
def submit(self):
# `chmod +x' first, because we will execute the script locally
os.chmod(self._script_filename,
os.stat(self._script_filename).st_mode | stat.S_IEXEC)
# Run from the absolute path
self._f_stdout = open(self.stdout, 'w+')
self._f_stderr = open(self.stderr, 'w+')
# The new process starts also a new session (session leader), so that
# we can later kill any other processes that this might spawn by just
# killing this one.
self._proc = os_ext.run_command_async(
os.path.abspath(self._script_filename),
stdout=self._f_stdout,
stderr=self._f_stderr,
start_new_session=True)
# Update job info
self._jobid = self._proc.pid
def execute(self, name, options, parameters):
try:
self._cmdLock.acquire()
self._log.debug("execute(%s, %s, %s)", name, options, parameters)
if not name.startswith("/"):
raise CF.InvalidFileName(CF.CF_EINVAL, "Filename must be absolute")
if self.isLocked(): raise CF.Device.InvalidState("System is locked down")
if self.isDisabled(): raise CF.Device.InvalidState("System is disabled")
priority = 0
stack_size = 4096
invalidOptions = []
for option in options:
val = option.value.value()
if option.id == CF.ExecutableDevice.PRIORITY_ID:
if ((not isinstance(val, int)) and (not isinstance(val, long))):
invalidOptions.append(option)
else:
priority = val
elif option.id == CF.ExecutableDevice.STACK_SIZE_ID:
if ((not isinstance(val, int)) and (not isinstance(val, long))):
invalidOptions.append(option)
else:
stack_size = val
if len(invalidOptions) > 0:
self._log.error("execute() received invalid options %s", invalidOptions)
raise CF.ExecutableDevice.InvalidOptions(invalidOptions)
command = name[1:] # This is relative to our CWD
self._log.debug("Running %s %s", command, os.getcwd())
if not os.path.isfile(command):
raise CF.InvalidFileName(CF.CF_EINVAL, "File could not be found %s" % command)
os.chmod(command, os.stat(command)[0] | stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE)
finally:
self._cmdLock.release()
return self._execute(command, options, parameters)
def create_file(filename):
print(filename)
template = getattr(templates, filename)
target_file = os.path.join(target_directory, '{}.bsh'.format(filename))
with open(target_file, 'w') as f:
f.write(template.format(**arguments[filename]))
st = os.stat(target_file)
os.chmod(target_file, st.st_mode | stat.S_IEXEC)
def test_find_protoc(self):
'''
Can we can find protoc?
'''
binary = os.path.join(self.tempdir, 'protoc')
with open(binary, 'wb+') as f:
pass
mode = os.stat(binary)
os.chmod(binary, mode.st_mode | stat.S_IEXEC)
found_binary = pbimport.find_protoc(self.tempdir)
self.assertEqual(found_binary, binary)
def find_all_executables(folder):
"""
:param folder:
:return:
"""
executable_flag = stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH
return [f for f in os.listdir(folder) if os.stat(os.path.join(folder, f)).st_mode & executable_flag]
def find_download_exe(self):
exe_file = join(self.rt.paths.user_config, self.exe_name)
if exe_file:
return exe_file
try:
if call(self.exe_name + ' < /dev/null', shell=True) == 0:
return self.exe_name
except OSError:
pass
import platform
import stat
def snd_msg(cmd):
"""Send message to faceplate"""
Popen('echo "' + cmd + '" > /dev/ttyAMA0', shell=True)
arch = platform.machine()
exe_file = expanduser('~/.mycroft/precise/' + self.exe_name)
url = self.url_base + 'dist/' + arch + '/' + self.exe_name
snd_msg('mouth.text=Updating Listener...')
self.download(url, exe_file)
snd_msg('mouth.reset')
os.chmod(exe_file, os.stat(exe_file).st_mode | stat.S_IEXEC)
Popen('echo "mouth.reset" > /dev/ttyAMA0', shell=True)
return exe_file
def chmod_plus_x(executable_path):
current_st = os.stat(executable_path)
os.chmod(executable_path, current_st.st_mode | stat.S_IEXEC)
def chmod_plus_x(executable_path):
current_st = os.stat(executable_path)
os.chmod(executable_path, current_st.st_mode | stat.S_IEXEC)
def build(service_directory):
"""
Builds a Service using the service configuration
inside the service directory.
:param service_directory: The location of the service directory
:return: The path to the generated executable file
"""
service_name = os.path.basename(service_directory)
current_dir = os.getcwd()
os.chdir(service_directory)
with open("service.json", 'r') as f:
config = json.load(f)
try:
Popen(config["build_commands"]).wait()
except BaseException as e:
print(e)
os.chdir(current_dir)
return
output = config["output_file"]
st = os.stat(output)
os.chmod(output, st.st_mode | stat.S_IEXEC) # Make executable
if os.path.basename(output) != service_name:
new_output = os.path.join(os.path.dirname(output), service_name)
os.rename(output, new_output)
output = new_output
os.chdir(current_dir)
return os.path.join(service_directory, output)