def execute (cmd, timeout = 0):
if timeout == 0:
timeout = win32event.INFINITE
info = win32process.CreateProcess(None, cmd, None, None, 0, 0, None, None, win32process.STARTUPINFO())
subprocess = info [0]
rc = win32event.WaitForSingleObject (subprocess, timeout)
if rc == win32event.WAIT_FAILED:
return -1
if rc == win32event.WAIT_TIMEOUT:
try:
win32process.TerminateProcess (subprocess, 0)
except pywintypes.error:
return -3
return -2
if rc == win32event.WAIT_OBJECT_0:
return win32process.GetExitCodeProcess(subprocess)
python类error()的实例源码
def checkWork(self):
finished = 0
fullDataRead = []
while 1:
try:
buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)
# finished = (result == -1)
if not bytesToRead:
break
hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)
fullDataRead.append(data)
except win32api.error:
finished = 1
break
dataBuf = ''.join(fullDataRead)
if dataBuf:
self.receivedCallback(dataBuf)
if finished:
self.cleanup()
return len(dataBuf)
def _create_named_pipe(template, sids=None):
"""INTERNAL: create a named pipe."""
if sids is None:
sattrs = None
else:
sattrs = _create_security_attributes(*sids)
for i in range(100):
name = template % random.randint(0, 999999)
try:
pipe = CreateNamedPipe(name, PIPE_ACCESS_DUPLEX,
0, 1, 1, 1, 100000, sattrs)
SetHandleInformation(pipe, HANDLE_FLAG_INHERIT, 0)
except WindowsError, e:
if e.winerror != ERROR_PIPE_BUSY:
raise
else:
return pipe, name
raise ExceptionPexpect, 'Could not create pipe after 100 attempts.'
def run(self, *args, **kwargs):
try:
self.spectate = VRSpectate()
self.spectate.run()
except ProcessException as e:
logging.exception(e)
if "get pid from name" in str(e):
self.error.emit("LoL client was not found")
elif "ReadProcessMemory" in str(e):
self.error.emit("Either the LoL client or this program is outdated")
else:
self.error.emit(str(e))
except pywintypes.error as e:
logging.exception(e)
if "SetSecurityInfo" in str(e):
self.error.emit("Unable to access the League of Legends client, you have to run LoLVRSpectate at the "
"same privilege level as the LoL client. \nEX. if the LoL client is running as admin "
"LoLVRSpectate also has to run as admin")
except Exception as e:
logging.exception(e)
self.error.emit("Unknown error, please submit a bug report at https://github.com/Fire-Proof/LoLVRSpectate "
"(please include the LoLVRSpectate.log file)")
def __init__(self, writePipe, lostCallback):
self.disconnecting = False
self.producer = None
self.producerPaused = 0
self.streamingProducer = 0
self.outQueue = []
self.writePipe = writePipe
self.lostCallback = lostCallback
try:
win32pipe.SetNamedPipeHandleState(writePipe,
win32pipe.PIPE_NOWAIT,
None,
None)
except pywintypes.error:
# Maybe it's an invalid handle. Who knows.
pass
def supported(self):
"""
A class method that returns True if the current platform supports
coloring terminal output using this method. Returns False otherwise.
"""
# assuming stderr
# isatty() returns False when SSHd into Win32 machine
if 'CYGWIN' in os.environ:
return True
if not sys.stderr.isatty():
return False # auto color only on TTYs
try:
import curses
curses.setupterm()
return curses.tigetnum("colors") > 2
except:
# guess false in case of error
return False
def supported(self):
try:
import win32console
screenBuffer = win32console.GetStdHandle(
win32console.STD_OUTPUT_HANDLE)
except ImportError:
return False
import pywintypes
try:
screenBuffer.SetConsoleTextAttribute(
win32console.FOREGROUND_RED |
win32console.FOREGROUND_GREEN |
win32console.FOREGROUND_BLUE)
except pywintypes.error:
return False
else:
return True
def loadTestsFromName(self, name, module=None):
test = unittest.TestLoader.loadTestsFromName(self, name, module)
if isinstance(test, unittest.TestSuite):
pass # hmmm? print "Don't wrap suites yet!", test._tests
elif isinstance(test, unittest.TestCase):
test = self._getTestWrapper(test)
else:
print "XXX - what is", test
return test
# Lots of classes necessary to support one simple feature: we want a 3rd
# test result state - "SKIPPED" - to indicate that the test wasn't able
# to be executed for various reasons. Inspired by bzr's tests, but it
# has other concepts, such as "Expected Failure", which we don't bother
# with.
# win32 error codes that probably mean we need to be elevated (ie, if we
# aren't elevated, we treat these error codes as 'skipped')
def _GetServiceShortName(longName):
# looks up a services name
# from the display name
# Thanks to Andy McKay for this code.
access = win32con.KEY_READ | win32con.KEY_ENUMERATE_SUB_KEYS | win32con.KEY_QUERY_VALUE
hkey = win32api.RegOpenKey(win32con.HKEY_LOCAL_MACHINE, "SYSTEM\\CurrentControlSet\\Services", 0, access)
num = win32api.RegQueryInfoKey(hkey)[0]
longName = longName.lower()
# loop through number of subkeys
for x in range(0, num):
# find service name, open subkey
svc = win32api.RegEnumKey(hkey, x)
skey = win32api.RegOpenKey(hkey, svc, 0, access)
try:
# find display name
thisName = str(win32api.RegQueryValueEx(skey, "DisplayName")[0])
if thisName.lower() == longName:
return svc
except win32api.error:
# in case there is no key called DisplayName
pass
return None
# Open a service given either it's long or short name.
def RemoveService(serviceName):
try:
import perfmon
perfmon.UnloadPerfCounterTextStrings("python.exe "+serviceName)
except (ImportError, win32api.error):
pass
hscm = win32service.OpenSCManager(None,None,win32service.SC_MANAGER_ALL_ACCESS)
try:
hs = SmartOpenService(hscm, serviceName, win32service.SERVICE_ALL_ACCESS)
win32service.DeleteService(hs)
win32service.CloseServiceHandle(hs)
finally:
win32service.CloseServiceHandle(hscm)
import win32evtlogutil
try:
win32evtlogutil.RemoveSourceFromRegistry(serviceName)
except win32api.error:
pass
def __FindSvcDeps(findName):
if type(findName) is pywintypes.UnicodeType: findName = str(findName)
dict = {}
k = win32api.RegOpenKey(win32con.HKEY_LOCAL_MACHINE, "SYSTEM\\CurrentControlSet\\Services")
num = 0
while 1:
try:
svc = win32api.RegEnumKey(k, num)
except win32api.error:
break
num = num + 1
sk = win32api.RegOpenKey(k, svc)
try:
deps, typ = win32api.RegQueryValueEx(sk, "DependOnService")
except win32api.error:
deps = ()
for dep in deps:
dep = dep.lower()
dep_on = dict.get(dep, [])
dep_on.append(svc)
dict[dep]=dep_on
return __ResolveDeps(findName, dict)
def RestartService(serviceName, args = None, waitSeconds = 30, machine = None):
"Stop the service, and then start it again (with some tolerance for allowing it to stop.)"
try:
StopService(serviceName, machine)
except pywintypes.error, exc:
# Allow only "service not running" error
if exc.winerror!=winerror.ERROR_SERVICE_NOT_ACTIVE:
raise
# Give it a few goes, as the service may take time to stop
for i in range(waitSeconds):
try:
StartService(serviceName, args, machine)
break
except pywintypes.error, exc:
if exc.winerror!=winerror.ERROR_SERVICE_ALREADY_RUNNING:
raise
win32api.Sleep(1000)
else:
print "Gave up waiting for the old service to stop!"
def GetServiceClassString(cls, argv = None):
if argv is None:
argv = sys.argv
import pickle
modName = pickle.whichmodule(cls, cls.__name__)
if modName == '__main__':
try:
fname = win32api.GetFullPathName(argv[0])
path = os.path.split(fname)[0]
# Eaaaahhhh - sometimes this will be a short filename, which causes
# problems with 1.5.1 and the silly filename case rule.
# Get the long name
fname = os.path.join(path, win32api.FindFiles(fname)[0][8])
except win32api.error:
raise error("Could not resolve the path name '%s' to a full path" % (argv[0]))
modName = os.path.splitext(fname)[0]
return modName + "." + cls.__name__
def timeout_execute (cmd, timeout = 0):
if timeout == 0:
timeout = win32event.INFINITE
info = win32process.CreateProcess(None, cmd, None, None, 0, 0, None, None, win32process.STARTUPINFO())
subprocess = info [0]
rc = win32event.WaitForSingleObject (subprocess, timeout)
if rc == win32event.WAIT_FAILED:
return -1
if rc == win32event.WAIT_TIMEOUT:
try:
win32process.TerminateProcess (subprocess, 0)
except pywintypes.error:
return -3
return -2
if rc == win32event.WAIT_OBJECT_0:
return win32process.GetExitCodeProcess(subprocess)
def attach(self, device):
if self.device is not None:
print "Warning: already attached to a device."
if device is not self.device:
self.detach()
handle = device.hwnd
def callback(hwnd, extra):
extra.add(hwnd)
return True
self.watched_hwnds.add(handle)
try:
# EnumChildWindows may crash for windows have no any child.
# refs: https://mail.python.org/pipermail/python-win32/2005-March/003042.html
win32gui.EnumChildWindows(handle, callback, self.watched_hwnds)
except pywintypes.error:
pass
self.device = device
print "attach to device", device
def CheckAdminRun():
if not appconstants.AppUACAdmin or UserIsAdmin():
# if admin not needed or already admin ... go ahead
return True, None
if not appconstants.appisfrozen() or sys.platform != 'win32':
# Manifest cannot be used if not frozen or in unix-like systems
return RunAsAdmin()
elif not appconstants.AppUACManifest:
# frozen and win32 ... and no manifest
return RunAsAdmin()
else:
# frozen win32 ... and manifest
# we "must" be running as Admin, becasue if not the application
# would not have started
# (or else ... a bug in the manifest or embedding process has acted)
return False, ('Unknown error. Manifest must request Administrator'
' permissions before launch')
def RunAsAdminWin32():
# FIXME ... what happens if "frozen"
script = os.path.abspath(sys.argv[0])
if sys.executable != script:
params = ' '.join([script] + sys.argv[1:])
else:
params = ' '.join(sys.argv[1:])
# fMask = 0
fMask = win32com.shell.shellcon.SEE_MASK_NO_CONSOLE
# fMask=win32com.shell.shellcon.SEE_MASK_NOCLOSEPROCESS
try:
win32com.shell.shell.ShellExecuteEx(
nShow=win32con.SW_SHOWNORMAL,
fMask=fMask,
lpVerb='runas',
lpFile=sys.executable,
lpParameters=params)
except pywintypes.error, e:
return False, e[2]
# If ShellExecuteEx was ok ... this will never be reached
# return True, None
# In any case exit to avoid a "single instance check" failure
sys.exit(0)
def terminate(self):
"""Terminate the child process. This also closes all the file
descriptors."""
if self.child_handle is None or self.terminated:
return
try:
TerminateProcess(self.child_handle, 1)
except WindowsError, e:
# ERROR_ACCESS_DENIED (also) happens when the child has already
# exited.
if e.winerror == ERROR_ACCESS_DENIED and not self.isalive():
pass
else:
raise
self.close()
self.wait()
self.terminated = True
def mem_used():
counter=r'\Memory\Committed Bytes'
machine, object, instance, parentInstance, index, counter = win32pdh.ParseCounterPath(counter)
instance = None
inum=-1
format = win32pdh.PDH_FMT_DOUBLE
machine=None
path = win32pdh.MakeCounterPath( (machine,object, instance, None, inum,counter) )
hq = win32pdh.OpenQuery()
try:
hc = win32pdh.AddCounter(hq, path)
try:
win32pdh.CollectQueryData(hq)
type, val = win32pdh.GetFormattedCounterValue(hc, format)
return int(val / 1024)
except pywintypes.error:
return 0
finally:
win32pdh.RemoveCounter(hc)
finally:
win32pdh.CloseQuery(hq)
def checkWork(self):
finished = 0
fullDataRead = []
while 1:
try:
buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)
# finished = (result == -1)
if not bytesToRead:
break
hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)
fullDataRead.append(data)
except win32api.error:
finished = 1
break
dataBuf = ''.join(fullDataRead)
if dataBuf:
self.receivedCallback(dataBuf)
if finished:
self.cleanup()
return len(dataBuf)
def __init__(self, writePipe, lostCallback):
self.disconnecting = False
self.producer = None
self.producerPaused = 0
self.streamingProducer = 0
self.outQueue = []
self.writePipe = writePipe
self.lostCallback = lostCallback
try:
win32pipe.SetNamedPipeHandleState(writePipe,
win32pipe.PIPE_NOWAIT,
None,
None)
except pywintypes.error:
# Maybe it's an invalid handle. Who knows.
pass
def supported(self):
"""
A class method that returns True if the current platform supports
coloring terminal output using this method. Returns False otherwise.
"""
# assuming stderr
# isatty() returns False when SSHd into Win32 machine
if 'CYGWIN' in os.environ:
return True
if not sys.stderr.isatty():
return False # auto color only on TTYs
try:
import curses
curses.setupterm()
return curses.tigetnum("colors") > 2
except:
# guess false in case of error
return False
def supported(self):
try:
import win32console
screenBuffer = win32console.GetStdHandle(
win32console.STD_OUTPUT_HANDLE)
except ImportError:
return False
import pywintypes
try:
screenBuffer.SetConsoleTextAttribute(
win32console.FOREGROUND_RED |
win32console.FOREGROUND_GREEN |
win32console.FOREGROUND_BLUE)
except pywintypes.error:
return False
else:
return True
def supported(cls, stream=sys.stdout):
"""
A class method that returns True if the current platform supports
coloring terminal output using this method. Returns False otherwise.
"""
if not stream.isatty():
return False # auto color only on TTYs
try:
import curses
except ImportError:
return False
else:
try:
try:
return curses.tigetnum("colors") > 2
except curses.error:
curses.setupterm()
return curses.tigetnum("colors") > 2
except:
# guess false in case of error
return False
def supported(cls, stream=sys.stdout):
try:
import win32console
screenBuffer = win32console.GetStdHandle(
win32console.STD_OUT_HANDLE)
except ImportError:
return False
import pywintypes
try:
screenBuffer.SetConsoleTextAttribute(
win32console.FOREGROUND_RED |
win32console.FOREGROUND_GREEN |
win32console.FOREGROUND_BLUE)
except pywintypes.error:
return False
else:
return True
def loadTestsFromName(self, name, module=None):
test = unittest.TestLoader.loadTestsFromName(self, name, module)
if isinstance(test, unittest.TestSuite):
pass # hmmm? print "Don't wrap suites yet!", test._tests
elif isinstance(test, unittest.TestCase):
test = self._getTestWrapper(test)
else:
print "XXX - what is", test
return test
# Lots of classes necessary to support one simple feature: we want a 3rd
# test result state - "SKIPPED" - to indicate that the test wasn't able
# to be executed for various reasons. Inspired by bzr's tests, but it
# has other concepts, such as "Expected Failure", which we don't bother
# with.
# win32 error codes that probably mean we need to be elevated (ie, if we
# aren't elevated, we treat these error codes as 'skipped')
def _GetServiceShortName(longName):
# looks up a services name
# from the display name
# Thanks to Andy McKay for this code.
access = win32con.KEY_READ | win32con.KEY_ENUMERATE_SUB_KEYS | win32con.KEY_QUERY_VALUE
hkey = win32api.RegOpenKey(win32con.HKEY_LOCAL_MACHINE, "SYSTEM\\CurrentControlSet\\Services", 0, access)
num = win32api.RegQueryInfoKey(hkey)[0]
longName = longName.lower()
# loop through number of subkeys
for x in range(0, num):
# find service name, open subkey
svc = win32api.RegEnumKey(hkey, x)
skey = win32api.RegOpenKey(hkey, svc, 0, access)
try:
# find display name
thisName = str(win32api.RegQueryValueEx(skey, "DisplayName")[0])
if thisName.lower() == longName:
return svc
except win32api.error:
# in case there is no key called DisplayName
pass
return None
# Open a service given either it's long or short name.
def RemoveService(serviceName):
try:
import perfmon
perfmon.UnloadPerfCounterTextStrings("python.exe "+serviceName)
except (ImportError, win32api.error):
pass
hscm = win32service.OpenSCManager(None,None,win32service.SC_MANAGER_ALL_ACCESS)
try:
hs = SmartOpenService(hscm, serviceName, win32service.SERVICE_ALL_ACCESS)
win32service.DeleteService(hs)
win32service.CloseServiceHandle(hs)
finally:
win32service.CloseServiceHandle(hscm)
import win32evtlogutil
try:
win32evtlogutil.RemoveSourceFromRegistry(serviceName)
except win32api.error:
pass
def __FindSvcDeps(findName):
if type(findName) is pywintypes.UnicodeType: findName = str(findName)
dict = {}
k = win32api.RegOpenKey(win32con.HKEY_LOCAL_MACHINE, "SYSTEM\\CurrentControlSet\\Services")
num = 0
while 1:
try:
svc = win32api.RegEnumKey(k, num)
except win32api.error:
break
num = num + 1
sk = win32api.RegOpenKey(k, svc)
try:
deps, typ = win32api.RegQueryValueEx(sk, "DependOnService")
except win32api.error:
deps = ()
for dep in deps:
dep = dep.lower()
dep_on = dict.get(dep, [])
dep_on.append(svc)
dict[dep]=dep_on
return __ResolveDeps(findName, dict)
def GetServiceClassString(cls, argv = None):
if argv is None:
argv = sys.argv
import pickle
modName = pickle.whichmodule(cls, cls.__name__)
if modName == '__main__':
try:
fname = win32api.GetFullPathName(argv[0])
path = os.path.split(fname)[0]
# Eaaaahhhh - sometimes this will be a short filename, which causes
# problems with 1.5.1 and the silly filename case rule.
# Get the long name
fname = os.path.join(path, win32api.FindFiles(fname)[0][8])
except win32api.error:
raise error("Could not resolve the path name '%s' to a full path" % (argv[0]))
modName = os.path.splitext(fname)[0]
return modName + "." + cls.__name__