def DoTestInterpInThread(cookie):
try:
pythoncom.CoInitialize()
myThread = win32api.GetCurrentThreadId()
GIT = CreateGIT()
interp = GIT.GetInterfaceFromGlobal(cookie, pythoncom.IID_IDispatch)
interp = win32com.client.Dispatch(interp)
TestInterp(interp)
interp.Exec("import win32api")
print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()"))
interp = None
pythoncom.CoUninitialize()
except:
traceback.print_exc()
python类GetCurrentThreadId()的实例源码
def test(fn):
print "The main thread is %d" % (win32api.GetCurrentThreadId())
GIT = CreateGIT()
interp = win32com.client.Dispatch("Python.Interpreter")
cookie = GIT.RegisterInterfaceInGlobal(interp._oleobj_, pythoncom.IID_IDispatch)
events = fn(4, cookie)
numFinished = 0
while 1:
try:
rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
numFinished = numFinished + 1
if numFinished >= len(events):
break
elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
# This is critical - whole apartment model demo will hang.
pythoncom.PumpWaitingMessages()
else: # Timeout
print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
except KeyboardInterrupt:
break
GIT.RevokeInterfaceFromGlobal(cookie)
del interp
del GIT
def DoTestInterpInThread(cookie):
try:
pythoncom.CoInitialize()
myThread = win32api.GetCurrentThreadId()
GIT = CreateGIT()
interp = GIT.GetInterfaceFromGlobal(cookie, pythoncom.IID_IDispatch)
interp = win32com.client.Dispatch(interp)
TestInterp(interp)
interp.Exec("import win32api")
print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()"))
interp = None
pythoncom.CoUninitialize()
except:
traceback.print_exc()
def test(fn):
print "The main thread is %d" % (win32api.GetCurrentThreadId())
GIT = CreateGIT()
interp = win32com.client.Dispatch("Python.Interpreter")
cookie = GIT.RegisterInterfaceInGlobal(interp._oleobj_, pythoncom.IID_IDispatch)
events = fn(4, cookie)
numFinished = 0
while 1:
try:
rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
numFinished = numFinished + 1
if numFinished >= len(events):
break
elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
# This is critical - whole apartment model demo will hang.
pythoncom.PumpWaitingMessages()
else: # Timeout
print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
except KeyboardInterrupt:
break
GIT.RevokeInterfaceFromGlobal(cookie)
del interp
del GIT
def ResetAXDebugging(self):
traceenter("ResetAXDebugging", self, "with refcount", len(self.recursiveData))
if win32api.GetCurrentThreadId()!=self.debuggingThread:
trace("ResetAXDebugging called on other thread")
return
if len(self.recursiveData)==0:
# print "ResetAXDebugging called for final time."
self.logicalbotframe = None
self.debuggingThread = None
self.currentframe = None
self.debuggingThreadStateHandle = None
return
self.logbotframe, self.stopframe, self.currentframe, self.debuggingThreadStateHandle = self.recursiveData[0]
self.recursiveData = self.recursiveData[1:]
def HandleOutput(self,message):
# debug("QueueOutput on thread %d, flags %d with '%s'...\n" % (win32api.GetCurrentThreadId(), self.writeQueueing, message ))
self.outputQueue.put(message)
if win32api.GetCurrentThreadId() != self.mainThreadId:
pass
# debug("not my thread - ignoring queue options!\n")
elif self.writeQueueing==flags.WQ_LINE:
pos = message.rfind('\n')
if pos>=0:
# debug("Line queueing - forcing flush\n")
self.QueueFlush()
return
elif self.writeQueueing==flags.WQ_NONE:
# debug("WQ_NONE - flushing!\n")
self.QueueFlush()
return
# Let our idle handler get it - wake it up
try:
win32ui.GetMainFrame().PostMessage(win32con.WM_USER) # Kick main thread off.
except win32ui.error:
# This can happen as the app is shutting down, so we send it to the C++ debugger
win32api.OutputDebugString(message)
# delegate certain fns to my view.
def DoTestInterpInThread(cookie):
try:
pythoncom.CoInitialize()
myThread = win32api.GetCurrentThreadId()
GIT = CreateGIT()
interp = GIT.GetInterfaceFromGlobal(cookie, pythoncom.IID_IDispatch)
interp = win32com.client.Dispatch(interp)
TestInterp(interp)
interp.Exec("import win32api")
print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()"))
interp = None
pythoncom.CoUninitialize()
except:
traceback.print_exc()
def test(fn):
print "The main thread is %d" % (win32api.GetCurrentThreadId())
GIT = CreateGIT()
interp = win32com.client.Dispatch("Python.Interpreter")
cookie = GIT.RegisterInterfaceInGlobal(interp._oleobj_, pythoncom.IID_IDispatch)
events = fn(4, cookie)
numFinished = 0
while 1:
try:
rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
numFinished = numFinished + 1
if numFinished >= len(events):
break
elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
# This is critical - whole apartment model demo will hang.
pythoncom.PumpWaitingMessages()
else: # Timeout
print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
except KeyboardInterrupt:
break
GIT.RevokeInterfaceFromGlobal(cookie)
del interp
del GIT
def ResetAXDebugging(self):
traceenter("ResetAXDebugging", self, "with refcount", len(self.recursiveData))
if win32api.GetCurrentThreadId()!=self.debuggingThread:
trace("ResetAXDebugging called on other thread")
return
if len(self.recursiveData)==0:
# print "ResetAXDebugging called for final time."
self.logicalbotframe = None
self.debuggingThread = None
self.currentframe = None
self.debuggingThreadStateHandle = None
return
self.logbotframe, self.stopframe, self.currentframe, self.debuggingThreadStateHandle = self.recursiveData[0]
self.recursiveData = self.recursiveData[1:]
def HandleOutput(self,message):
# debug("QueueOutput on thread %d, flags %d with '%s'...\n" % (win32api.GetCurrentThreadId(), self.writeQueueing, message ))
self.outputQueue.put(message)
if win32api.GetCurrentThreadId() != self.mainThreadId:
pass
# debug("not my thread - ignoring queue options!\n")
elif self.writeQueueing==flags.WQ_LINE:
pos = message.rfind('\n')
if pos>=0:
# debug("Line queueing - forcing flush\n")
self.QueueFlush()
return
elif self.writeQueueing==flags.WQ_NONE:
# debug("WQ_NONE - flushing!\n")
self.QueueFlush()
return
# Let our idle handler get it - wake it up
try:
win32ui.GetMainFrame().PostMessage(win32con.WM_USER) # Kick main thread off.
except win32ui.error:
# This can happen as the app is shutting down, so we send it to the C++ debugger
win32api.OutputDebugString(message)
# delegate certain fns to my view.
def DoTestInterpInThread(cookie):
try:
pythoncom.CoInitialize()
myThread = win32api.GetCurrentThreadId()
GIT = CreateGIT()
interp = GIT.GetInterfaceFromGlobal(cookie, pythoncom.IID_IDispatch)
interp = win32com.client.Dispatch(interp)
TestInterp(interp)
interp.Exec("import win32api")
print("The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()")))
interp = None
pythoncom.CoUninitialize()
except:
traceback.print_exc()
def test(fn):
print("The main thread is %d" % (win32api.GetCurrentThreadId()))
GIT = CreateGIT()
interp = win32com.client.Dispatch("Python.Interpreter")
cookie = GIT.RegisterInterfaceInGlobal(interp._oleobj_, pythoncom.IID_IDispatch)
events = fn(4, cookie)
numFinished = 0
while 1:
try:
rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
numFinished = numFinished + 1
if numFinished >= len(events):
break
elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
# This is critical - whole apartment model demo will hang.
pythoncom.PumpWaitingMessages()
else: # Timeout
print("Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount()))
except KeyboardInterrupt:
break
GIT.RevokeInterfaceFromGlobal(cookie)
del interp
del GIT
def serve(clsids):
infos = factory.RegisterClassFactories(clsids)
pythoncom.EnableQuitMessage(win32api.GetCurrentThreadId())
pythoncom.CoResumeClassObjects()
pythoncom.PumpMessages()
factory.RevokeClassFactories( infos )
pythoncom.CoUninitialize()
def _doTestInThread(self, interp):
pythoncom.CoInitialize()
myThread = win32api.GetCurrentThreadId()
if freeThreaded:
interp = pythoncom.CoGetInterfaceAndReleaseStream(interp, pythoncom.IID_IDispatch)
interp = win32com.client.Dispatch(interp)
interp.Exec("import win32api")
#print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()"))
pythoncom.CoUninitialize()
def _DoTestMarshal(self, fn, bCoWait = 0):
#print "The main thread is %d" % (win32api.GetCurrentThreadId())
threads, events = fn(2)
numFinished = 0
while 1:
try:
if bCoWait:
rc = pythoncom.CoWaitForMultipleHandles(0, 2000, events)
else:
# Specifying "bWaitAll" here will wait for messages *and* all events
# (which is pretty useless)
rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
numFinished = numFinished + 1
if numFinished >= len(events):
break
elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
# This is critical - whole apartment model demo will hang.
pythoncom.PumpWaitingMessages()
else: # Timeout
print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
except KeyboardInterrupt:
break
for t in threads:
t.join(2)
self.failIf(t.isAlive(), "thread failed to stop!?")
threads = None # threads hold references to args
# Seems to be a leak here I can't locate :(
#self.failUnlessEqual(pythoncom._GetInterfaceCount(), 0)
#self.failUnlessEqual(pythoncom._GetGatewayCount(), 0)
def OnQuit(self):
thread = win32api.GetCurrentThreadId()
print "OnQuit event processed on thread %d"%thread
win32event.SetEvent(self.event)
def TestExplorerEvents():
iexplore = win32com.client.DispatchWithEvents(
"InternetExplorer.Application", ExplorerEvents)
thread = win32api.GetCurrentThreadId()
print 'TestExplorerEvents created IE object on thread %d'%thread
iexplore.Visible = 1
try:
iexplore.Navigate(win32api.GetFullPathName('..\\readme.htm'))
except pythoncom.com_error, details:
print "Warning - could not open the test HTML file", details
# Wait for the event to be signalled while pumping messages.
if not WaitWhileProcessingMessages(iexplore.event):
print "Document load event FAILED to fire!!!"
iexplore.Quit()
#
# Give IE a chance to shutdown, else it can get upset on fast machines.
# Note, Quit generates events. Although this test does NOT catch them
# it is NECESSARY to pump messages here instead of a sleep so that the Quit
# happens properly!
if not WaitWhileProcessingMessages(iexplore.event):
print "OnQuit event FAILED to fire!!!"
iexplore = None
def OnDocumentComplete(self,
pDisp=pythoncom.Empty,
URL=pythoncom.Empty):
#
# Caution: Since the main thread and events thread(s) are different
# it may be necessary to serialize access to shared data. Because
# this is a simple test case, that is not required here. Your
# situation may be different. Caveat programmer.
#
thread = win32api.GetCurrentThreadId()
print "OnDocumentComplete event processed on thread %d"%thread
# Set the event our main thread is waiting on.
win32event.SetEvent(self.event)
def OnQuit(self):
thread = win32api.GetCurrentThreadId()
print "OnQuit event processed on thread %d"%thread
win32event.SetEvent(self.event)
def TestExplorerEvents():
iexplore = win32com.client.DispatchWithEvents(
"InternetExplorer.Application", ExplorerEvents)
thread = win32api.GetCurrentThreadId()
print 'TestExplorerEvents created IE object on thread %d'%thread
iexplore.Visible = 1
try:
iexplore.Navigate(win32api.GetFullPathName('..\\readme.htm'))
except pythoncom.com_error, details:
print "Warning - could not open the test HTML file", details
# In this free-threaded example, we can simply wait until an event has
# been set - we will give it 2 seconds before giving up.
rc = win32event.WaitForSingleObject(iexplore.event, 2000)
if rc != win32event.WAIT_OBJECT_0:
print "Document load event FAILED to fire!!!"
iexplore.Quit()
# Now we can do the same thing to wait for exit!
# Although Quit generates events, in this free-threaded world we
# do *not* need to run any message pumps.
rc = win32event.WaitForSingleObject(iexplore.event, 2000)
if rc != win32event.WAIT_OBJECT_0:
print "OnQuit event FAILED to fire!!!"
iexplore = None
print "Finished the IE event sample!"
def serve(clsids):
infos = factory.RegisterClassFactories(clsids)
pythoncom.EnableQuitMessage(win32api.GetCurrentThreadId())
pythoncom.CoResumeClassObjects()
pythoncom.PumpMessages()
factory.RevokeClassFactories( infos )
pythoncom.CoUninitialize()
def _doTestInThread(self, interp):
pythoncom.CoInitialize()
myThread = win32api.GetCurrentThreadId()
if freeThreaded:
interp = pythoncom.CoGetInterfaceAndReleaseStream(interp, pythoncom.IID_IDispatch)
interp = win32com.client.Dispatch(interp)
interp.Exec("import win32api")
#print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()"))
pythoncom.CoUninitialize()
def _DoTestMarshal(self, fn, bCoWait = 0):
#print "The main thread is %d" % (win32api.GetCurrentThreadId())
threads, events = fn(2)
numFinished = 0
while 1:
try:
if bCoWait:
rc = pythoncom.CoWaitForMultipleHandles(0, 2000, events)
else:
# Specifying "bWaitAll" here will wait for messages *and* all events
# (which is pretty useless)
rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
numFinished = numFinished + 1
if numFinished >= len(events):
break
elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
# This is critical - whole apartment model demo will hang.
pythoncom.PumpWaitingMessages()
else: # Timeout
print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
except KeyboardInterrupt:
break
for t in threads:
t.join(2)
self.failIf(t.isAlive(), "thread failed to stop!?")
threads = None # threads hold references to args
# Seems to be a leak here I can't locate :(
#self.failUnlessEqual(pythoncom._GetInterfaceCount(), 0)
#self.failUnlessEqual(pythoncom._GetGatewayCount(), 0)
def OnQuit(self):
thread = win32api.GetCurrentThreadId()
print "OnQuit event processed on thread %d"%thread
win32event.SetEvent(self.event)
def TestExplorerEvents():
iexplore = win32com.client.DispatchWithEvents(
"InternetExplorer.Application", ExplorerEvents)
thread = win32api.GetCurrentThreadId()
print 'TestExplorerEvents created IE object on thread %d'%thread
iexplore.Visible = 1
try:
iexplore.Navigate(win32api.GetFullPathName('..\\readme.htm'))
except pythoncom.com_error, details:
print "Warning - could not open the test HTML file", details
# Wait for the event to be signalled while pumping messages.
if not WaitWhileProcessingMessages(iexplore.event):
print "Document load event FAILED to fire!!!"
iexplore.Quit()
#
# Give IE a chance to shutdown, else it can get upset on fast machines.
# Note, Quit generates events. Although this test does NOT catch them
# it is NECESSARY to pump messages here instead of a sleep so that the Quit
# happens properly!
if not WaitWhileProcessingMessages(iexplore.event):
print "OnQuit event FAILED to fire!!!"
iexplore = None
def OnDocumentComplete(self,
pDisp=pythoncom.Empty,
URL=pythoncom.Empty):
#
# Caution: Since the main thread and events thread(s) are different
# it may be necessary to serialize access to shared data. Because
# this is a simple test case, that is not required here. Your
# situation may be different. Caveat programmer.
#
thread = win32api.GetCurrentThreadId()
print "OnDocumentComplete event processed on thread %d"%thread
# Set the event our main thread is waiting on.
win32event.SetEvent(self.event)
def OnQuit(self):
thread = win32api.GetCurrentThreadId()
print "OnQuit event processed on thread %d"%thread
win32event.SetEvent(self.event)
def TestExplorerEvents():
iexplore = win32com.client.DispatchWithEvents(
"InternetExplorer.Application", ExplorerEvents)
thread = win32api.GetCurrentThreadId()
print 'TestExplorerEvents created IE object on thread %d'%thread
iexplore.Visible = 1
try:
iexplore.Navigate(win32api.GetFullPathName('..\\readme.htm'))
except pythoncom.com_error, details:
print "Warning - could not open the test HTML file", details
# In this free-threaded example, we can simply wait until an event has
# been set - we will give it 2 seconds before giving up.
rc = win32event.WaitForSingleObject(iexplore.event, 2000)
if rc != win32event.WAIT_OBJECT_0:
print "Document load event FAILED to fire!!!"
iexplore.Quit()
# Now we can do the same thing to wait for exit!
# Although Quit generates events, in this free-threaded world we
# do *not* need to run any message pumps.
rc = win32event.WaitForSingleObject(iexplore.event, 2000)
if rc != win32event.WAIT_OBJECT_0:
print "OnQuit event FAILED to fire!!!"
iexplore = None
print "Finished the IE event sample!"
def waitForEvents(self, timeout):
from win32api import GetCurrentThreadId
from win32event import INFINITE
from win32event import MsgWaitForMultipleObjects, \
QS_ALLINPUT, WAIT_TIMEOUT, WAIT_OBJECT_0
from pythoncom import PumpWaitingMessages
import types
if not isinstance(timeout, int):
raise TypeError("The timeout argument is not an integer")
if self.tid != GetCurrentThreadId():
raise Exception("wait for events from the same thread you inited!")
if timeout < 0:
cMsTimeout = INFINITE
else:
cMsTimeout = timeout
rc = MsgWaitForMultipleObjects(self.handles, 0, cMsTimeout, QS_ALLINPUT)
if WAIT_OBJECT_0 <= rc < WAIT_OBJECT_0 + len(self.handles):
# is it possible?
rc = 2
elif rc == WAIT_OBJECT_0 + len(self.handles):
# Waiting messages
PumpWaitingMessages()
rc = 0
else:
# Timeout
rc = 1
# check for interruption
self.oIntCv.acquire()
if self.fInterrupted:
self.fInterrupted = False
rc = 1
self.oIntCv.release()
return rc
def trace(*args):
"""A function used instead of "print" for debugging output.
"""
if not debuggingTrace:
return
print win32api.GetCurrentThreadId(),
for arg in args:
print arg,
print
# Note that the DebugManager is not a COM gateway class for the
# debugger - but it does create and manage them.