def update_and_restart(self, ctx):
await ctx.message.add_reaction("a:loading:393852367751086090")
await utils.run_subprocess("git pull --rebase")
await utils.run_subprocess("python -m pip install --upgrade -r requirements.txt")
await ctx.message.remove_reaction("a:loading:393852367751086090", ctx.me)
await ctx.message.add_reaction(":helYea:236243426662678528")
log.info("Restarting due to update_and_restart")
os.execve(sys.executable, ['python', '-m', 'dango'], os.environ)
python类execve()的实例源码
def activate(self):
"""Activate the virtual environment.
This actually restarts the pipless script using `os.execve()` to
replace itself with a subprocess that uses the correct python binary
from the venv/bin directory with the correct environment variables.
"""
if self.no_venv:
self._debug("no_venv was set, not activating")
return
new_environ = dict(os.environ)
new_environ["PATH"] = os.path.join(self.venv_home, "bin") + ":" + new_environ["PATH"]
new_environ["VIRTUAL_ENV"] = os.path.abspath(self.venv_home)
new_environ["_"] = os.path.join(self.venv_home, "bin", "python")
self._debug("replacing current process with new python in new env from venv")
self._debug("venv found at {!r}".format(self.venv_home))
venv_python_path = os.path.join(self.venv_home, "bin", "python")
new_args = [
venv_python_path,
"-m", "pipless",
"--no-venv",
# even though we say to not use the venv, this is still
# used to determine where to save the requirements.txt file
"--venv", self.venv_home
]
if self.debug:
new_args.append("--debug")
if self.quiet:
new_args.append("--quiet")
if self.no_requirements:
new_args.append("--no-requirements")
if self.venv_clear:
new_args.append("--clear")
if self._should_color():
new_args.append("--color")
if self.venv_system_site_packages:
new_args.append("--system-site-packages")
if self.venv_python is not None:
new_args.append("--python")
new_args.append(self.venv_python)
if self.python_opts.get("module", None) is not None:
new_args.append("-m")
new_args.append(self.python_opts.get("module"))
if self.python_opts.get("cmd", None) is not None:
new_args.append("-c")
new_args.append(self.python_opts.get("cmd"))
os.execve(
venv_python_path,
new_args + sys.argv,
new_environ
)
def patch_environ(nogui=True):
"""
Patch current environment variables so Chimera can start up and we can import its modules.
Be warned that calling this function WILL restart your interpreter. Otherwise, Python
won't catch the new LD_LIBRARY_PATH (or platform equivalent) and Chimera won't find its
libraries during import.
Parameters
----------
nogui : bool, optional, default=False
If the GUI is not going to be launched, try to locate a headless
Chimera build to enable inline Chimera visualization.
"""
if 'CHIMERA' in os.environ:
return
paths = guess_chimera_path(search_all=nogui)
CHIMERA_BASE = paths[0]
if nogui: # try finding a headless version
try:
CHIMERA_BASE = next(p for p in paths if 'headless' in p)
except StopIteration:
pass
os.environ['CHIMERA'] = CHIMERA_BASE
CHIMERA_LIB = os.path.join(CHIMERA_BASE, 'lib')
# Set Tcl/Tk for gui mode
if 'TCL_LIBRARY' in os.environ:
os.environ['CHIMERA_TCL_LIBRARY'] = os.environ['TCL_LIBRARY']
os.environ['TCL_LIBRARY'] = os.path.join(CHIMERA_LIB, 'tcl8.6')
if 'TCLLIBPATH' in os.environ:
os.environ['CHIMERA_TCLLIBPATH'] = os.environ['TCLLIBPATH']
os.environ['TCLLIBPATH'] = '{' + CHIMERA_LIB + '}'
if 'TK_LIBRARY' in os.environ:
os.environ['CHIMERA_TK_LIBRARY'] = os.environ['TK_LIBRARY']
del os.environ['TK_LIBRARY']
if 'TIX_LIBRARY' in os.environ:
os.environ['CHIMERA_TIX_LIBRARY'] = os.environ['TIX_LIBRARY']
del os.environ['TIX_LIBRARY']
if 'PYTHONNOUSERSITE' in os.environ:
os.environ['CHIMERA_PYTHONNOUSERSITE'] = os.environ['PYTHONNOUSERSITE']
os.environ['PYTHONNOUSERSITE'] = '1'
# Check interactive and IPython
if in_ipython() and hasattr(sys, 'ps1') and not sys.argv[0].endswith('ipython'):
sys.argv.insert(1, 'ipython')
# Platform-specific patches
patch_environ_for_platform(CHIMERA_BASE, CHIMERA_LIB, nogui=nogui)
os.execve(sys.executable, [sys.executable] + sys.argv, os.environ)
def _test_internal_execvpe(self, test_type):
program_path = os.sep + 'absolutepath'
if test_type is bytes:
program = b'executable'
fullpath = os.path.join(os.fsencode(program_path), program)
native_fullpath = fullpath
arguments = [b'progname', 'arg1', 'arg2']
else:
program = 'executable'
arguments = ['progname', 'arg1', 'arg2']
fullpath = os.path.join(program_path, program)
if os.name != "nt":
native_fullpath = os.fsencode(fullpath)
else:
native_fullpath = fullpath
env = {'spam': 'beans'}
# test os._execvpe() with an absolute path
with _execvpe_mockup() as calls:
self.assertRaises(RuntimeError,
os._execvpe, fullpath, arguments)
self.assertEqual(len(calls), 1)
self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
# test os._execvpe() with a relative path:
# os.get_exec_path() returns defpath
with _execvpe_mockup(defpath=program_path) as calls:
self.assertRaises(OSError,
os._execvpe, program, arguments, env=env)
self.assertEqual(len(calls), 1)
self.assertSequenceEqual(calls[0],
('execve', native_fullpath, (arguments, env)))
# test os._execvpe() with a relative path:
# os.get_exec_path() reads the 'PATH' variable
with _execvpe_mockup() as calls:
env_path = env.copy()
if test_type is bytes:
env_path[b'PATH'] = program_path
else:
env_path['PATH'] = program_path
self.assertRaises(OSError,
os._execvpe, program, arguments, env=env_path)
self.assertEqual(len(calls), 1)
self.assertSequenceEqual(calls[0],
('execve', native_fullpath, (arguments, env_path)))
def patch_new_process_functions():
# os.execl(path, arg0, arg1, ...)
# os.execle(path, arg0, arg1, ..., env)
# os.execlp(file, arg0, arg1, ...)
# os.execlpe(file, arg0, arg1, ..., env)
# os.execv(path, args)
# os.execve(path, args, env)
# os.execvp(file, args)
# os.execvpe(file, args, env)
monkey_patch_os('execl', create_execl)
monkey_patch_os('execle', create_execl)
monkey_patch_os('execlp', create_execl)
monkey_patch_os('execlpe', create_execl)
monkey_patch_os('execv', create_execv)
monkey_patch_os('execve', create_execve)
monkey_patch_os('execvp', create_execv)
monkey_patch_os('execvpe', create_execve)
# os.spawnl(mode, path, ...)
# os.spawnle(mode, path, ..., env)
# os.spawnlp(mode, file, ...)
# os.spawnlpe(mode, file, ..., env)
# os.spawnv(mode, path, args)
# os.spawnve(mode, path, args, env)
# os.spawnvp(mode, file, args)
# os.spawnvpe(mode, file, args, env)
monkey_patch_os('spawnl', create_spawnl)
monkey_patch_os('spawnle', create_spawnl)
monkey_patch_os('spawnlp', create_spawnl)
monkey_patch_os('spawnlpe', create_spawnl)
monkey_patch_os('spawnv', create_spawnv)
monkey_patch_os('spawnve', create_spawnve)
monkey_patch_os('spawnvp', create_spawnv)
monkey_patch_os('spawnvpe', create_spawnve)
if sys.platform != 'win32':
monkey_patch_os('fork', create_fork)
try:
import _posixsubprocess
monkey_patch_module(_posixsubprocess, 'fork_exec', create_fork_exec)
except ImportError:
pass
else:
# Windows
try:
import _subprocess
except ImportError:
import _winapi as _subprocess
monkey_patch_module(_subprocess, 'CreateProcess', create_CreateProcess)
def spawn_daemon(path_to_executable, env, *args):
"""Spawns a completely detached subprocess (i.e., a daemon). Taken from
http://stackoverflow.com/questions/972362/spawning-process-from-python
which in turn was inspired by
<http://code.activestate.com/recipes/278731/>.
:Parameters:
- `path_to_executable`: absolute path to the executable to be run
detatched
- `env`: environment
- `args`: all arguments to be passed to the subprocess
:type path_to_executable: str
:type env: dict mapping str to str
:type args: list of str
"""
try:
pid = os.fork()
except OSError, e:
raise RuntimeError("1st fork failed: %s [%d]" % (e.strerror, e.errno))
if pid != 0:
os.waitpid(pid, 0)
return
os.setsid()
try:
pid = os.fork()
except OSError, e:
raise RuntimeError("2nd fork failed: %s [%d]" % (e.strerror, e.errno))
if pid != 0:
os._exit(0)
try:
maxfd = os.sysconf(b"SC_OPEN_MAX")
except (AttributeError, ValueError):
maxfd = 1024
for fd in range(maxfd):
try:
os.close(fd)
except OSError:
pass
os.open(os.devnull, os.O_RDWR)
os.dup2(0, 1)
os.dup2(0, 2)
try:
os.execve(path_to_executable, [path_to_executable] + list(filter(lambda arg: arg is not None, args)), env)
except:
os._exit(255)
def _test_internal_execvpe(self, test_type):
program_path = os.sep + 'absolutepath'
if test_type is bytes:
program = b'executable'
fullpath = os.path.join(os.fsencode(program_path), program)
native_fullpath = fullpath
arguments = [b'progname', 'arg1', 'arg2']
else:
program = 'executable'
arguments = ['progname', 'arg1', 'arg2']
fullpath = os.path.join(program_path, program)
if os.name != "nt":
native_fullpath = os.fsencode(fullpath)
else:
native_fullpath = fullpath
env = {'spam': 'beans'}
# test os._execvpe() with an absolute path
with _execvpe_mockup() as calls:
self.assertRaises(RuntimeError,
os._execvpe, fullpath, arguments)
self.assertEqual(len(calls), 1)
self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
# test os._execvpe() with a relative path:
# os.get_exec_path() returns defpath
with _execvpe_mockup(defpath=program_path) as calls:
self.assertRaises(OSError,
os._execvpe, program, arguments, env=env)
self.assertEqual(len(calls), 1)
self.assertSequenceEqual(calls[0],
('execve', native_fullpath, (arguments, env)))
# test os._execvpe() with a relative path:
# os.get_exec_path() reads the 'PATH' variable
with _execvpe_mockup() as calls:
env_path = env.copy()
if test_type is bytes:
env_path[b'PATH'] = program_path
else:
env_path['PATH'] = program_path
self.assertRaises(OSError,
os._execvpe, program, arguments, env=env_path)
self.assertEqual(len(calls), 1)
self.assertSequenceEqual(calls[0],
('execve', native_fullpath, (arguments, env_path)))
def _test_internal_execvpe(self, test_type):
program_path = os.sep + 'absolutepath'
if test_type is bytes:
program = b'executable'
fullpath = os.path.join(os.fsencode(program_path), program)
native_fullpath = fullpath
arguments = [b'progname', 'arg1', 'arg2']
else:
program = 'executable'
arguments = ['progname', 'arg1', 'arg2']
fullpath = os.path.join(program_path, program)
if os.name != "nt":
native_fullpath = os.fsencode(fullpath)
else:
native_fullpath = fullpath
env = {'spam': 'beans'}
# test os._execvpe() with an absolute path
with _execvpe_mockup() as calls:
self.assertRaises(RuntimeError,
os._execvpe, fullpath, arguments)
self.assertEqual(len(calls), 1)
self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
# test os._execvpe() with a relative path:
# os.get_exec_path() returns defpath
with _execvpe_mockup(defpath=program_path) as calls:
self.assertRaises(OSError,
os._execvpe, program, arguments, env=env)
self.assertEqual(len(calls), 1)
self.assertSequenceEqual(calls[0],
('execve', native_fullpath, (arguments, env)))
# test os._execvpe() with a relative path:
# os.get_exec_path() reads the 'PATH' variable
with _execvpe_mockup() as calls:
env_path = env.copy()
if test_type is bytes:
env_path[b'PATH'] = program_path
else:
env_path['PATH'] = program_path
self.assertRaises(OSError,
os._execvpe, program, arguments, env=env_path)
self.assertEqual(len(calls), 1)
self.assertSequenceEqual(calls[0],
('execve', native_fullpath, (arguments, env_path)))
def _test_internal_execvpe(self, test_type):
program_path = os.sep + 'absolutepath'
if test_type is bytes:
program = b'executable'
fullpath = os.path.join(os.fsencode(program_path), program)
native_fullpath = fullpath
arguments = [b'progname', 'arg1', 'arg2']
else:
program = 'executable'
arguments = ['progname', 'arg1', 'arg2']
fullpath = os.path.join(program_path, program)
if os.name != "nt":
native_fullpath = os.fsencode(fullpath)
else:
native_fullpath = fullpath
env = {'spam': 'beans'}
# test os._execvpe() with an absolute path
with _execvpe_mockup() as calls:
self.assertRaises(RuntimeError,
os._execvpe, fullpath, arguments)
self.assertEqual(len(calls), 1)
self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
# test os._execvpe() with a relative path:
# os.get_exec_path() returns defpath
with _execvpe_mockup(defpath=program_path) as calls:
self.assertRaises(OSError,
os._execvpe, program, arguments, env=env)
self.assertEqual(len(calls), 1)
self.assertSequenceEqual(calls[0],
('execve', native_fullpath, (arguments, env)))
# test os._execvpe() with a relative path:
# os.get_exec_path() reads the 'PATH' variable
with _execvpe_mockup() as calls:
env_path = env.copy()
if test_type is bytes:
env_path[b'PATH'] = program_path
else:
env_path['PATH'] = program_path
self.assertRaises(OSError,
os._execvpe, program, arguments, env=env_path)
self.assertEqual(len(calls), 1)
self.assertSequenceEqual(calls[0],
('execve', native_fullpath, (arguments, env_path)))