def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
python类execvpe()的实例源码
external_search_command.py 文件源码
项目:splunk_ta_ps4_f1_2016
作者: jonathanvarley
项目源码
文件源码
阅读 34
收藏 0
点赞 0
评论 0
external_search_command.py 文件源码
项目:splunk_ta_ps4_f1_2016
作者: jonathanvarley
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.reexec_pid != 0:
self.log.warning("USR2 signal ignored. Child exists.")
return
if self.master_pid != 0:
self.log.warning("USR2 signal ignored. Parent exists")
return
master_pid = os.getpid()
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
return
self.cfg.pre_exec(self)
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
environ['GUNICORN_PID'] = str(master_pid)
os.chdir(self.START_CTX['cwd'])
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
external_search_command.py 文件源码
项目:cb-defense-splunk-app
作者: carbonblack
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
external_search_command.py 文件源码
项目:cb-defense-splunk-app
作者: carbonblack
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
return
# endregion
def test_execvpe_with_bad_arglist(self):
self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
def test_execvpe_with_bad_program(self):
self.assertRaises(OSError, os.execvpe, 'no such app-',
['no such app-'], None)
def test_execvpe_with_bad_arglist(self):
self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
def test_execvpe_with_bad_arglist(self):
self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.reexec_pid != 0:
self.log.warning("USR2 signal ignored. Child exists.")
return
if self.master_pid != 0:
self.log.warning("USR2 signal ignored. Parent exists")
return
master_pid = os.getpid()
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
return
self.cfg.pre_exec(self)
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
environ['GUNICORN_PID'] = str(master_pid)
os.chdir(self.START_CTX['cwd'])
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.reexec_pid != 0:
self.log.warning("USR2 signal ignored. Child exists.")
return
if self.master_pid != 0:
self.log.warning("USR2 signal ignored. Parent exists")
return
master_pid = os.getpid()
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
return
self.cfg.pre_exec(self)
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
environ['GUNICORN_PID'] = str(master_pid)
os.chdir(self.START_CTX['cwd'])
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.reexec_pid != 0:
self.log.warning("USR2 signal ignored. Child exists.")
return
if self.master_pid != 0:
self.log.warning("USR2 signal ignored. Parent exists")
return
master_pid = os.getpid()
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
return
self.cfg.pre_exec(self)
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
environ['GUNICORN_PID'] = str(master_pid)
os.chdir(self.START_CTX['cwd'])
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.reexec_pid != 0:
self.log.warning("USR2 signal ignored. Child exists.")
return
if self.master_pid != 0:
self.log.warning("USR2 signal ignored. Parent exists")
return
master_pid = os.getpid()
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
return
self.cfg.pre_exec(self)
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
environ['GUNICORN_PID'] = str(master_pid)
os.chdir(self.START_CTX['cwd'])
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def xexec(args, stdin=None):
"""Replaces current process with command specified and passes in the
current xonsh environment.
"""
env = builtins.__xonsh_env__
denv = env.detype()
if len(args) > 0:
try:
os.execvpe(args[0], args, denv)
except FileNotFoundError as e:
return 'xonsh: ' + e.args[1] + ': ' + args[0] + '\n'
else:
return 'xonsh: exec: no args specified\n'
def setup_lib_paths(fbdir, libdir):
"""This is a little bit of a hack, but it should work. If we detect that the EDFLIB_DIR is
not in LD_LIBRARY_PATH, restart after adding it.
"""
try:
libpath = os.environ['LD_LIBRARY_PATH']
except KeyError:
libpath = ""
if not (sys.platform == "win32") and (libdir not in libpath):
# To get the Fuzzbunch environment setup properly, we need to modify LD_LIBRARY_PATH.
# To do that, we need to rerun Fuzzbunch so that it picks up the new LD_LIBRARY_PATH
os.environ['LD_LIBRARY_PATH'] = "%s:%s" % (libdir,libpath)
#path = os.path.abspath(fbdir)
#args = ['"' + path + '"'] + sys.argv[1:]
#os.execvpe( 'python', ['python']+sys.argv, os.environ)
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.reexec_pid != 0:
self.log.warning("USR2 signal ignored. Child exists.")
return
if self.master_pid != 0:
self.log.warning("USR2 signal ignored. Parent exists.")
return
master_pid = os.getpid()
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
return
self.cfg.pre_exec(self)
environ = self.cfg.env_orig.copy()
environ['GUNICORN_PID'] = str(master_pid)
if self.systemd:
environ['LISTEN_PID'] = str(os.getpid())
environ['LISTEN_FDS'] = str(len(self.LISTENERS))
else:
environ['GUNICORN_FD'] = ','.join(
str(l.fileno()) for l in self.LISTENERS)
os.chdir(self.START_CTX['cwd'])
# exec the process using the original environment
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def test_errorDuringExec(self):
"""
When L{os.execvpe} raises an exception, it will format that exception
on stderr as UTF-8, regardless of system encoding information.
"""
def execvpe(*args, **kw):
# Ensure that real traceback formatting has some non-ASCII in it,
# by forcing the filename of the last frame to contain non-ASCII.
filename = u"<\N{SNOWMAN}>"
if not isinstance(filename, str):
filename = filename.encode("utf-8")
codeobj = compile("1/0", filename, "single")
eval(codeobj)
self.patch(os, "execvpe", execvpe)
self.patch(sys, "getfilesystemencoding", lambda: "ascii")
reactor = self.buildReactor()
output = io.BytesIO()
@reactor.callWhenRunning
def whenRunning():
class TracebackCatcher(ProcessProtocol, object):
errReceived = output.write
def processEnded(self, reason):
reactor.stop()
reactor.spawnProcess(TracebackCatcher(), pyExe,
[pyExe, b"-c", b""])
self.runReactor(reactor, timeout=30)
self.assertIn(u"\N{SNOWMAN}".encode("utf-8"), output.getvalue())