def GetDomainName():
try:
tok = win32security.OpenThreadToken(win32api.GetCurrentThread(),
TOKEN_QUERY, 1)
except win32api.error, details:
if details[0] != winerror.ERROR_NO_TOKEN:
raise
# attempt to open the process token, since no thread token
# exists
tok = win32security.OpenProcessToken(win32api.GetCurrentProcess(),
TOKEN_QUERY)
sid, attr = win32security.GetTokenInformation(tok, TokenUser)
win32api.CloseHandle(tok)
name, dom, typ = win32security.LookupAccountSid(None, sid)
return dom
python类CloseHandle()的实例源码
def run_elevated(command, args, wait=True):
"""Run the given command as an elevated user and wait for it to return"""
ret = 1
if command.find(' ') > -1:
command = '"' + command + '"'
if platform.system() == 'Windows':
import win32api
import win32con
import win32event
import win32process
from win32com.shell.shell import ShellExecuteEx
from win32com.shell import shellcon
logging.debug(command + ' ' + args)
process_info = ShellExecuteEx(nShow=win32con.SW_HIDE,
fMask=shellcon.SEE_MASK_NOCLOSEPROCESS,
lpVerb='runas',
lpFile=command,
lpParameters=args)
if wait:
win32event.WaitForSingleObject(process_info['hProcess'], 600000)
ret = win32process.GetExitCodeProcess(process_info['hProcess'])
win32api.CloseHandle(process_info['hProcess'])
else:
ret = process_info
else:
logging.debug('sudo ' + command + ' ' + args)
ret = subprocess.call('sudo ' + command + ' ' + args, shell=True)
return ret
def subprocess_terminate( proc ) :
try:
proc.terminate()
except AttributeError:
print " no terminate method to Popen.."
try:
import signal
os.kill( proc.pid , signal.SIGTERM)
except AttributeError:
print " no os.kill, using win32api.."
try:
import win32api
PROCESS_TERMINATE = 1
handle = win32api.OpenProcess( PROCESS_TERMINATE, False, proc.pid)
win32api.TerminateProcess(handle,-1)
win32api.CloseHandle(handle)
except ImportError:
print " ERROR: could not terminate process."
def testCleanup1(self):
# We used to clobber all outstanding exceptions.
def f1(invalidate):
import win32event
h = win32event.CreateEvent(None, 0, 0, None)
if invalidate:
win32api.CloseHandle(int(h))
1/0
# If we invalidated, then the object destruction code will attempt
# to close an invalid handle. We don't wan't an exception in
# this case
def f2(invalidate):
""" This function should throw an IOError. """
try:
f1(invalidate)
except ZeroDivisionError, exc:
raise IOError("raise 2")
self.assertRaises(IOError, f2, False)
# Now do it again, but so the auto object destruction
# actually fails.
self.assertRaises(IOError, f2, True)
def GetDomainName():
try:
tok = win32security.OpenThreadToken(win32api.GetCurrentThread(),
TOKEN_QUERY, 1)
except win32api.error as details:
if details[0] != winerror.ERROR_NO_TOKEN:
raise
# attempt to open the process token, since no thread token
# exists
tok = win32security.OpenProcessToken(win32api.GetCurrentProcess(),
TOKEN_QUERY)
sid, attr = win32security.GetTokenInformation(tok, TokenUser)
win32api.CloseHandle(tok)
name, dom, typ = win32security.LookupAccountSid(None, sid)
return dom
def testCleanup1(self):
# We used to clobber all outstanding exceptions.
def f1(invalidate):
import win32event
h = win32event.CreateEvent(None, 0, 0, None)
if invalidate:
win32api.CloseHandle(int(h))
1/0
# If we invalidated, then the object destruction code will attempt
# to close an invalid handle. We don't wan't an exception in
# this case
def f2(invalidate):
""" This function should throw an IOError. """
try:
f1(invalidate)
except ZeroDivisionError as exc:
raise IOError("raise 2")
self.assertRaises(IOError, f2, False)
# Now do it again, but so the auto object destruction
# actually fails.
self.assertRaises(IOError, f2, True)
def _closeStdin(self):
if hasattr(self, "hChildStdinWr"):
win32file.CloseHandle(self.hChildStdinWr)
del self.hChildStdinWr
self.closingStdin = False
self.closedStdin = True
def closeStderr(self):
if hasattr(self, "hChildStderrRd"):
win32file.CloseHandle(self.hChildStderrRd)
del self.hChildStderrRd
self.closedStderr = True
self.connectionLostNotify()
def closeStdout(self):
if hasattr(self, "hChildStdoutRd"):
win32file.CloseHandle(self.hChildStdoutRd)
del self.hChildStdoutRd
self.closedStdout = True
self.connectionLostNotify()
def close(self):
try:
win32api.CloseHandle(self.pipe)
except pywintypes.error:
# You can't close std handles...?
pass
def writeConnectionLost(self):
self.deactivate()
try:
win32api.CloseHandle(self.writePipe)
except pywintypes.error:
# OMG what
pass
self.lostCallback()
def _closeStdin(self):
if hasattr(self, "hChildStdinWr"):
win32file.CloseHandle(self.hChildStdinWr)
del self.hChildStdinWr
self.closingStdin = False
self.closedStdin = True
def closeStderr(self):
if hasattr(self, "hChildStderrRd"):
win32file.CloseHandle(self.hChildStderrRd)
del self.hChildStderrRd
self.closedStderr = True
self.connectionLostNotify()
def closeStdout(self):
if hasattr(self, "hChildStdoutRd"):
win32file.CloseHandle(self.hChildStdoutRd)
del self.hChildStdoutRd
self.closedStdout = True
self.connectionLostNotify()
def close(self):
try:
win32api.CloseHandle(self.pipe)
except pywintypes.error:
# You can't close std handles...?
pass
def writeConnectionLost(self):
self.deactivate()
try:
win32api.CloseHandle(self.writePipe)
except pywintypes.error:
# OMG what
pass
self.lostCallback()
def wait_for_elevated_process(process_info):
if platform.system() == 'Windows' and 'hProcess' in process_info:
import win32api
import win32con
import win32event
import win32process
win32event.WaitForSingleObject(process_info['hProcess'], 600000)
ret = win32process.GetExitCodeProcess(process_info['hProcess'])
win32api.CloseHandle(process_info['hProcess'])
return ret
# pylint: enable=E0611,E0401
# pylint: disable=E1101
def __del__(self):
if self.locked:
self.release()
win32api.CloseHandle(self.handle)
def killProcName(procname):
# Change suggested by Dan Knierim, who found that this performed a
# "refresh", allowing us to kill processes created since this was run
# for the first time.
try:
win32pdhutil.GetPerformanceAttributes('Process','ID Process',procname)
except:
pass
pids = win32pdhutil.FindPerformanceAttributesByName(procname)
# If _my_ pid in there, remove it!
try:
pids.remove(win32api.GetCurrentProcessId())
except ValueError:
pass
if len(pids)==0:
result = "Can't find %s" % procname
elif len(pids)>1:
result = "Found too many %s's - pids=`%s`" % (procname,pids)
else:
handle = win32api.OpenProcess(win32con.PROCESS_TERMINATE, 0,pids[0])
win32api.TerminateProcess(handle,0)
win32api.CloseHandle(handle)
result = ""
return result
def testCleanup2(self):
# Cause an exception during object destruction.
# The worst this does is cause an ".XXX undetected error (why=3)"
# So avoiding that is the goal
import win32event
h = win32event.CreateEvent(None, 0, 0, None)
# Close the handle underneath the object.
win32api.CloseHandle(int(h))
# Object destructor runs with the implicit close failing
h = None
def testCleanup3(self):
# And again with a class - no __del__
import win32event
class Test:
def __init__(self):
self.h = win32event.CreateEvent(None, 0, 0, None)
win32api.CloseHandle(int(self.h))
t=Test()
t = None
def _getInvalidHandleException(self):
try:
win32api.CloseHandle(1)
except win32api.error, exc:
return exc
self.fail("Didn't get invalid-handle exception.")
def testSimple(self):
self.assertRaises(pywintypes.error, win32api.CloseHandle, 1)
def testFuncIndex(self):
exc = self._getInvalidHandleException()
self._testExceptionIndex(exc, 1, "CloseHandle")
def testUnpack(self):
try:
win32api.CloseHandle(1)
self.fail("expected exception!")
except win32api.error, exc:
self.failUnlessEqual(exc.winerror, winerror.ERROR_INVALID_HANDLE)
self.failUnlessEqual(exc.funcname, "CloseHandle")
expected_msg = win32api.FormatMessage(winerror.ERROR_INVALID_HANDLE).rstrip()
self.failUnlessEqual(exc.strerror, expected_msg)
def testAsStr(self):
exc = self._getInvalidHandleException()
err_msg = win32api.FormatMessage(winerror.ERROR_INVALID_HANDLE).rstrip()
# early on the result actually *was* a tuple - it must always look like one
err_tuple = (winerror.ERROR_INVALID_HANDLE, 'CloseHandle', err_msg)
self.failUnlessEqual(str(exc), str(err_tuple))
def testAttributes(self):
exc = self._getInvalidHandleException()
err_msg = win32api.FormatMessage(winerror.ERROR_INVALID_HANDLE).rstrip()
self.failUnlessEqual(exc.winerror, winerror.ERROR_INVALID_HANDLE)
self.failUnlessEqual(exc.strerror, err_msg)
self.failUnlessEqual(exc.funcname, 'CloseHandle')
# some tests for 'insane' args.
def killProcName(procname):
# Change suggested by Dan Knierim, who found that this performed a
# "refresh", allowing us to kill processes created since this was run
# for the first time.
try:
win32pdhutil.GetPerformanceAttributes('Process','ID Process',procname)
except:
pass
pids = win32pdhutil.FindPerformanceAttributesByName(procname)
# If _my_ pid in there, remove it!
try:
pids.remove(win32api.GetCurrentProcessId())
except ValueError:
pass
if len(pids)==0:
result = "Can't find %s" % procname
elif len(pids)>1:
result = "Found too many %s's - pids=`%s`" % (procname,pids)
else:
handle = win32api.OpenProcess(win32con.PROCESS_TERMINATE, 0,pids[0])
win32api.TerminateProcess(handle,0)
win32api.CloseHandle(handle)
result = ""
return result
def testCleanup2(self):
# Cause an exception during object destruction.
# The worst this does is cause an ".XXX undetected error (why=3)"
# So avoiding that is the goal
import win32event
h = win32event.CreateEvent(None, 0, 0, None)
# Close the handle underneath the object.
win32api.CloseHandle(int(h))
# Object destructor runs with the implicit close failing
h = None
def testCleanup3(self):
# And again with a class - no __del__
import win32event
class Test:
def __init__(self):
self.h = win32event.CreateEvent(None, 0, 0, None)
win32api.CloseHandle(int(self.h))
t=Test()
t = None