def execute (cmd, timeout = 0):
if timeout == 0:
timeout = win32event.INFINITE
info = win32process.CreateProcess(None, cmd, None, None, 0, 0, None, None, win32process.STARTUPINFO())
subprocess = info [0]
rc = win32event.WaitForSingleObject (subprocess, timeout)
if rc == win32event.WAIT_FAILED:
return -1
if rc == win32event.WAIT_TIMEOUT:
try:
win32process.TerminateProcess (subprocess, 0)
except pywintypes.error:
return -3
return -2
if rc == win32event.WAIT_OBJECT_0:
return win32process.GetExitCodeProcess(subprocess)
python类GetExitCodeProcess()的实例源码
def run_elevated(command, args, wait=True):
"""Run the given command as an elevated user and wait for it to return"""
ret = 1
if command.find(' ') > -1:
command = '"' + command + '"'
if platform.system() == 'Windows':
import win32api
import win32con
import win32event
import win32process
from win32com.shell.shell import ShellExecuteEx
from win32com.shell import shellcon
logging.debug(command + ' ' + args)
process_info = ShellExecuteEx(nShow=win32con.SW_HIDE,
fMask=shellcon.SEE_MASK_NOCLOSEPROCESS,
lpVerb='runas',
lpFile=command,
lpParameters=args)
if wait:
win32event.WaitForSingleObject(process_info['hProcess'], 600000)
ret = win32process.GetExitCodeProcess(process_info['hProcess'])
win32api.CloseHandle(process_info['hProcess'])
else:
ret = process_info
else:
logging.debug('sudo ' + command + ' ' + args)
ret = subprocess.call('sudo ' + command + ' ' + args, shell=True)
return ret
def timeout_execute (cmd, timeout = 0):
if timeout == 0:
timeout = win32event.INFINITE
info = win32process.CreateProcess(None, cmd, None, None, 0, 0, None, None, win32process.STARTUPINFO())
subprocess = info [0]
rc = win32event.WaitForSingleObject (subprocess, timeout)
if rc == win32event.WAIT_FAILED:
return -1
if rc == win32event.WAIT_TIMEOUT:
try:
win32process.TerminateProcess (subprocess, 0)
except pywintypes.error:
return -3
return -2
if rc == win32event.WAIT_OBJECT_0:
return win32process.GetExitCodeProcess(subprocess)
def eof(self):
### should be calling file.eof() here instead of file.close(), there
### may be data in the pipe or buffer after the process exits
if sys.platform == "win32":
r = win32event.WaitForMultipleObjects(self.wait_for, 1, 0)
if r == win32event.WAIT_OBJECT_0:
self.file.close()
self.file = None
return win32process.GetExitCodeProcess(self.child_pid)
return None
if self.thread and self.thread.isAlive():
return None
pid, status = os.waitpid(self.child_pid, os.WNOHANG)
if pid:
self.file.close()
self.file = None
return status
return None
def close(self):
if self.file:
self.file.close()
self.file = None
if sys.platform == "win32":
win32event.WaitForMultipleObjects(self.wait_for, 1, win32event.INFINITE)
return win32process.GetExitCodeProcess(self.child_pid)
else:
if self.thread:
self.thread.join()
if type(self.child_pid) == type([]):
for pid in self.child_pid:
exit = os.waitpid(pid, 0)[1]
return exit
else:
return os.waitpid(self.child_pid, 0)[1]
return None
def connectionLost(self, reason=None):
"""Shut down resources."""
# Get the exit status and notify the protocol
exitCode = win32process.GetExitCodeProcess(self.hProcess)
if exitCode == 0:
err = error.ProcessDone(exitCode)
else:
err = error.ProcessTerminated(exitCode)
self.protocol.processEnded(failure.Failure(err))
## IConsumer
def checkWork(self):
if win32event.WaitForSingleObject(self.proc.hProcess, 0) != win32event.WAIT_OBJECT_0:
return 0
exitCode = win32process.GetExitCodeProcess(self.proc.hProcess)
if exitCode == 0:
err = error.ProcessDone(exitCode)
else:
err = error.ProcessTerminated(exitCode)
self.deactivate()
self.proc.protocol.processEnded(failure.Failure(err))
return 0
def connectionLost(self, reason=None):
"""Shut down resources."""
# Get the exit status and notify the protocol
exitCode = win32process.GetExitCodeProcess(self.hProcess)
if exitCode == 0:
err = error.ProcessDone(exitCode)
else:
err = error.ProcessTerminated(exitCode)
self.protocol.processEnded(failure.Failure(err))
## IConsumer
def checkWork(self):
if win32event.WaitForSingleObject(self.proc.hProcess, 0) != win32event.WAIT_OBJECT_0:
return 0
exitCode = win32process.GetExitCodeProcess(self.proc.hProcess)
if exitCode == 0:
err = error.ProcessDone(exitCode)
else:
err = error.ProcessTerminated(exitCode)
self.deactivate()
self.proc.protocol.processEnded(failure.Failure(err))
return 0
def wait_for_elevated_process(process_info):
if platform.system() == 'Windows' and 'hProcess' in process_info:
import win32api
import win32con
import win32event
import win32process
win32event.WaitForSingleObject(process_info['hProcess'], 600000)
ret = win32process.GetExitCodeProcess(process_info['hProcess'])
win32api.CloseHandle(process_info['hProcess'])
return ret
# pylint: enable=E0611,E0401
# pylint: disable=E1101
def exitCode(self):
"""
Return process exit code.
"""
return win32process.GetExitCodeProcess(self.hProcess)
def join(self):
win32event.WaitForSingleObject(self.processHandle,
win32event.INFINITE)
self.exitCode = win32process.GetExitCodeProcess(self.processHandle)
def exitCode(self):
"""
Return process exit code.
"""
return win32process.GetExitCodeProcess(self.hProcess)
def join(self):
win32event.WaitForSingleObject(self.processHandle,
win32event.INFINITE)
self.exitCode = win32process.GetExitCodeProcess(self.processHandle)
def checkWork(self):
if win32event.WaitForSingleObject(self.proc.hProcess, 0) != win32event.WAIT_OBJECT_0:
return 0
exitCode = win32process.GetExitCodeProcess(self.proc.hProcess)
self.deactivate()
self.proc.processEnded(exitCode)
return 0
def test_wait(self):
handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
win32con.FALSE, self.pid)
self.addCleanup(win32api.CloseHandle, handle)
p = psutil.Process(self.pid)
p.terminate()
psutil_value = p.wait()
sys_value = win32process.GetExitCodeProcess(handle)
self.assertEqual(psutil_value, sys_value)
def run_as_admin():
procInfo = ShellExecuteEx(nShow=win32con.SW_SHOWNORMAL,
fMask=shellcon.SEE_MASK_NOCLOSEPROCESS,
lpVerb='runas',
lpFile=sys.executable)
procHandle = procInfo['hProcess']
obj = win32event.WaitForSingleObject(procHandle, win32event.INFINITE)
return win32process.GetExitCodeProcess(procHandle)
def runAsAdmin(cmdLine=None, wait=True):
if os.name != 'nt':
raise RuntimeError, "This function is only implemented on Windows."
import win32api
import win32con
import win32event
import win32process
from win32com.shell.shell import ShellExecuteEx
from win32com.shell import shellcon
python_exe = sys.executable
if cmdLine is None:
cmdLine = [python_exe] + sys.argv
elif type(cmdLine) not in (types.TupleType, types.ListType):
raise ValueError, "cmdLine is not a sequence."
cmd = '"%s"' % (cmdLine[0],)
# XXX TODO: isn't there a function or something we can call to massage command line params?
params = " ".join(['"%s"' % (x,) for x in cmdLine[1:]])
cmdDir = ''
#showCmd = win32con.SW_SHOWNORMAL
showCmd = win32con.SW_HIDE
lpVerb = 'runas' # causes UAC elevation prompt.
# print "Running", cmd, params
# ShellExecute() doesn't seem to allow us to fetch the PID or handle
# of the process, so we can't get anything useful from it. Therefore
# the more complex ShellExecuteEx() must be used.
# procHandle = win32api.ShellExecute(0, lpVerb, cmd, params, cmdDir, showCmd)
procInfo = ShellExecuteEx(nShow=showCmd,
fMask=shellcon.SEE_MASK_NOCLOSEPROCESS,
lpVerb=lpVerb,
lpFile=cmd,
lpParameters=params)
if wait:
procHandle = procInfo['hProcess']
obj = win32event.WaitForSingleObject(procHandle, win32event.INFINITE)
rc = win32process.GetExitCodeProcess(procHandle)
# print "Process handle %s returned code %s" % (procHandle, rc)
else:
rc = None
return rc
def runAsAdmin(cmdLine=None, wait=True):
if os.name != 'nt':
raise RuntimeError, "This function is only implemented on Windows."
import win32api, win32con, win32event, win32process
from win32com.shell.shell import ShellExecuteEx
from win32com.shell import shellcon
python_exe = sys.executable
if cmdLine is None:
cmdLine = [python_exe] + sys.argv
elif type(cmdLine) not in (types.TupleType,types.ListType):
raise ValueError, "cmdLine is not a sequence."
cmd = '"%s"' % (cmdLine[0],)
# XXX TODO: isn't there a function or something we can call to massage command line params?
params = " ".join(['"%s"' % (x,) for x in cmdLine[1:]])
cmdDir = ''
showCmd = win32con.SW_SHOWNORMAL
#showCmd = win32con.SW_HIDE
lpVerb = 'runas' # causes UAC elevation prompt.
# print "Running", cmd, params
# ShellExecute() doesn't seem to allow us to fetch the PID or handle
# of the process, so we can't get anything useful from it. Therefore
# the more complex ShellExecuteEx() must be used.
# procHandle = win32api.ShellExecute(0, lpVerb, cmd, params, cmdDir, showCmd)
procInfo = ShellExecuteEx(nShow=showCmd,
fMask=shellcon.SEE_MASK_NOCLOSEPROCESS,
lpVerb=lpVerb,
lpFile=cmd,
lpParameters=params)
if wait:
procHandle = procInfo['hProcess']
obj = win32event.WaitForSingleObject(procHandle, win32event.INFINITE)
rc = win32process.GetExitCodeProcess(procHandle)
#print "Process handle %s returned code %s" % (procHandle, rc)
else:
rc = None
return rc
def runAsAdmin(cmdLine=None, wait=True):
if os.name != 'nt':
raise RuntimeError, "This function is only implemented on Windows."
import win32api, win32con, win32event, win32process
from win32com.shell.shell import ShellExecuteEx
from win32com.shell import shellcon
python_exe = sys.executable
if cmdLine is None:
cmdLine = [python_exe] + sys.argv
elif type(cmdLine) not in (types.TupleType,types.ListType):
raise ValueError, "cmdLine is not a sequence."
cmd = '"%s"' % (cmdLine[0],)
# XXX TODO: isn't there a function or something we can call to massage command line params?
params = " ".join(['"%s"' % (x,) for x in cmdLine[1:]])
cmdDir = ''
showCmd = win32con.SW_SHOWNORMAL
#showCmd = win32con.SW_HIDE
lpVerb = 'runas' # causes UAC elevation prompt.
# print "Running", cmd, params
# ShellExecute() doesn't seem to allow us to fetch the PID or handle
# of the process, so we can't get anything useful from it. Therefore
# the more complex ShellExecuteEx() must be used.
# procHandle = win32api.ShellExecute(0, lpVerb, cmd, params, cmdDir, showCmd)
procInfo = ShellExecuteEx(nShow=showCmd,
fMask=shellcon.SEE_MASK_NOCLOSEPROCESS,
lpVerb=lpVerb,
lpFile=cmd,
lpParameters=params)
if wait:
procHandle = procInfo['hProcess']
obj = win32event.WaitForSingleObject(procHandle, win32event.INFINITE)
rc = win32process.GetExitCodeProcess(procHandle)
#print "Process handle %s returned code %s" % (procHandle, rc)
else:
rc = None
return rc