def test(fn):
print "The main thread is %d" % (win32api.GetCurrentThreadId())
GIT = CreateGIT()
interp = win32com.client.Dispatch("Python.Interpreter")
cookie = GIT.RegisterInterfaceInGlobal(interp._oleobj_, pythoncom.IID_IDispatch)
events = fn(4, cookie)
numFinished = 0
while 1:
try:
rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
numFinished = numFinished + 1
if numFinished >= len(events):
break
elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
# This is critical - whole apartment model demo will hang.
pythoncom.PumpWaitingMessages()
else: # Timeout
print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
except KeyboardInterrupt:
break
GIT.RevokeInterfaceFromGlobal(cookie)
del interp
del GIT
python类MsgWaitForMultipleObjects()的实例源码
def WaitWhileProcessingMessages(event, timeout = 2):
start = time.clock()
while True:
# Wake 4 times a second - we can't just specify the
# full timeout here, as then it would reset for every
# message we process.
rc = win32event.MsgWaitForMultipleObjects( (event,), 0,
250,
win32event.QS_ALLEVENTS)
if rc == win32event.WAIT_OBJECT_0:
# event signalled - stop now!
return True
if (time.clock() - start) > timeout:
# Timeout expired.
return False
# must be a message.
pythoncom.PumpWaitingMessages()
def test(fn):
print "The main thread is %d" % (win32api.GetCurrentThreadId())
GIT = CreateGIT()
interp = win32com.client.Dispatch("Python.Interpreter")
cookie = GIT.RegisterInterfaceInGlobal(interp._oleobj_, pythoncom.IID_IDispatch)
events = fn(4, cookie)
numFinished = 0
while 1:
try:
rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
numFinished = numFinished + 1
if numFinished >= len(events):
break
elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
# This is critical - whole apartment model demo will hang.
pythoncom.PumpWaitingMessages()
else: # Timeout
print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
except KeyboardInterrupt:
break
GIT.RevokeInterfaceFromGlobal(cookie)
del interp
del GIT
def WaitWhileProcessingMessages(event, timeout = 2):
start = time.clock()
while True:
# Wake 4 times a second - we can't just specify the
# full timeout here, as then it would reset for every
# message we process.
rc = win32event.MsgWaitForMultipleObjects( (event,), 0,
250,
win32event.QS_ALLEVENTS)
if rc == win32event.WAIT_OBJECT_0:
# event signalled - stop now!
return True
if (time.clock() - start) > timeout:
# Timeout expired.
return False
# must be a message.
pythoncom.PumpWaitingMessages()
def test(fn):
print "The main thread is %d" % (win32api.GetCurrentThreadId())
GIT = CreateGIT()
interp = win32com.client.Dispatch("Python.Interpreter")
cookie = GIT.RegisterInterfaceInGlobal(interp._oleobj_, pythoncom.IID_IDispatch)
events = fn(4, cookie)
numFinished = 0
while 1:
try:
rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
numFinished = numFinished + 1
if numFinished >= len(events):
break
elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
# This is critical - whole apartment model demo will hang.
pythoncom.PumpWaitingMessages()
else: # Timeout
print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
except KeyboardInterrupt:
break
GIT.RevokeInterfaceFromGlobal(cookie)
del interp
del GIT
def test(fn):
print("The main thread is %d" % (win32api.GetCurrentThreadId()))
GIT = CreateGIT()
interp = win32com.client.Dispatch("Python.Interpreter")
cookie = GIT.RegisterInterfaceInGlobal(interp._oleobj_, pythoncom.IID_IDispatch)
events = fn(4, cookie)
numFinished = 0
while 1:
try:
rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
numFinished = numFinished + 1
if numFinished >= len(events):
break
elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
# This is critical - whole apartment model demo will hang.
pythoncom.PumpWaitingMessages()
else: # Timeout
print("Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount()))
except KeyboardInterrupt:
break
GIT.RevokeInterfaceFromGlobal(cookie)
del interp
del GIT
def start(self):
if self.instance_running():
try:
self.stop()
except SystemExit:
return win32gui.PostQuitMessage(0)
# Load today's count data back from data file
super(KeyCounter, self).start()
self.hook_keyboard()
self.create_window()
self.update_tray_icon()
while 1:
try:
win32event.MsgWaitForMultipleObjects(
[],
0,
self.MSPF,
win32event.QS_ALLEVENTS
)
win32gui.PumpWaitingMessages()
except SystemExit:
win32gui.PostQuitMessage(0)
def _DoTestMarshal(self, fn, bCoWait = 0):
#print "The main thread is %d" % (win32api.GetCurrentThreadId())
threads, events = fn(2)
numFinished = 0
while 1:
try:
if bCoWait:
rc = pythoncom.CoWaitForMultipleHandles(0, 2000, events)
else:
# Specifying "bWaitAll" here will wait for messages *and* all events
# (which is pretty useless)
rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
numFinished = numFinished + 1
if numFinished >= len(events):
break
elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
# This is critical - whole apartment model demo will hang.
pythoncom.PumpWaitingMessages()
else: # Timeout
print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
except KeyboardInterrupt:
break
for t in threads:
t.join(2)
self.failIf(t.isAlive(), "thread failed to stop!?")
threads = None # threads hold references to args
# Seems to be a leak here I can't locate :(
#self.failUnlessEqual(pythoncom._GetInterfaceCount(), 0)
#self.failUnlessEqual(pythoncom._GetGatewayCount(), 0)
def _DoTestMarshal(self, fn, bCoWait = 0):
#print "The main thread is %d" % (win32api.GetCurrentThreadId())
threads, events = fn(2)
numFinished = 0
while 1:
try:
if bCoWait:
rc = pythoncom.CoWaitForMultipleHandles(0, 2000, events)
else:
# Specifying "bWaitAll" here will wait for messages *and* all events
# (which is pretty useless)
rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
numFinished = numFinished + 1
if numFinished >= len(events):
break
elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
# This is critical - whole apartment model demo will hang.
pythoncom.PumpWaitingMessages()
else: # Timeout
print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
except KeyboardInterrupt:
break
for t in threads:
t.join(2)
self.failIf(t.isAlive(), "thread failed to stop!?")
threads = None # threads hold references to args
# Seems to be a leak here I can't locate :(
#self.failUnlessEqual(pythoncom._GetInterfaceCount(), 0)
#self.failUnlessEqual(pythoncom._GetGatewayCount(), 0)
def demo (delay=1000, stop=10):
g = glork(delay, stop)
# Timers are message based - so we need
# To run a message loop while waiting for our timers
# to expire.
start_time = time.time()
while 1:
# We can't simply give a timeout of 30 seconds, as
# we may continouusly be recieving other input messages,
# and therefore never expire.
rc = win32event.MsgWaitForMultipleObjects(
(g.event,), # list of objects
0, # wait all
500, # timeout
win32event.QS_ALLEVENTS, # type of input
)
if rc == win32event.WAIT_OBJECT_0:
# Event signalled.
break
elif rc == win32event.WAIT_OBJECT_0+1:
# Message waiting.
if win32gui.PumpWaitingMessages():
raise RuntimeError("We got an unexpected WM_QUIT message!")
else:
# This wait timed-out.
if time.time()-start_time > 30:
raise RuntimeError("We timed out waiting for the timers to expire!")
def testMsgWaitForMultipleObjects(self):
# this function used to segfault when called with an empty list
res = win32event.MsgWaitForMultipleObjects([], 0, 0, 0)
self.assertEquals(res, win32event.WAIT_TIMEOUT)
def testMsgWaitForMultipleObjects2(self):
# test with non-empty list
event = win32event.CreateEvent(None, 0, 0, None)
res = win32event.MsgWaitForMultipleObjects([event], 0, 0, 0)
self.assertEquals(res, win32event.WAIT_TIMEOUT)
def _DoTestMarshal(self, fn, bCoWait = 0):
#print "The main thread is %d" % (win32api.GetCurrentThreadId())
threads, events = fn(2)
numFinished = 0
while 1:
try:
if bCoWait:
rc = pythoncom.CoWaitForMultipleHandles(0, 2000, events)
else:
# Specifying "bWaitAll" here will wait for messages *and* all events
# (which is pretty useless)
rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
numFinished = numFinished + 1
if numFinished >= len(events):
break
elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
# This is critical - whole apartment model demo will hang.
pythoncom.PumpWaitingMessages()
else: # Timeout
print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
except KeyboardInterrupt:
break
for t in threads:
t.join(2)
self.failIf(t.isAlive(), "thread failed to stop!?")
threads = None # threads hold references to args
# Seems to be a leak here I can't locate :(
#self.failUnlessEqual(pythoncom._GetInterfaceCount(), 0)
#self.failUnlessEqual(pythoncom._GetGatewayCount(), 0)
def demo (delay=1000, stop=10):
g = glork(delay, stop)
# Timers are message based - so we need
# To run a message loop while waiting for our timers
# to expire.
start_time = time.time()
while 1:
# We can't simply give a timeout of 30 seconds, as
# we may continouusly be recieving other input messages,
# and therefore never expire.
rc = win32event.MsgWaitForMultipleObjects(
(g.event,), # list of objects
0, # wait all
500, # timeout
win32event.QS_ALLEVENTS, # type of input
)
if rc == win32event.WAIT_OBJECT_0:
# Event signalled.
break
elif rc == win32event.WAIT_OBJECT_0+1:
# Message waiting.
if win32gui.PumpWaitingMessages():
raise RuntimeError("We got an unexpected WM_QUIT message!")
else:
# This wait timed-out.
if time.time()-start_time > 30:
raise RuntimeError("We timed out waiting for the timers to expire!")
def testMsgWaitForMultipleObjects(self):
# this function used to segfault when called with an empty list
res = win32event.MsgWaitForMultipleObjects([], 0, 0, 0)
self.assertEquals(res, win32event.WAIT_TIMEOUT)
def testMsgWaitForMultipleObjects2(self):
# test with non-empty list
event = win32event.CreateEvent(None, 0, 0, None)
res = win32event.MsgWaitForMultipleObjects([event], 0, 0, 0)
self.assertEquals(res, win32event.WAIT_TIMEOUT)
def _DoTestMarshal(self, fn, bCoWait = 0):
#print "The main thread is %d" % (win32api.GetCurrentThreadId())
threads, events = fn(2)
numFinished = 0
while 1:
try:
if bCoWait:
rc = pythoncom.CoWaitForMultipleHandles(0, 2000, events)
else:
# Specifying "bWaitAll" here will wait for messages *and* all events
# (which is pretty useless)
rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
numFinished = numFinished + 1
if numFinished >= len(events):
break
elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
# This is critical - whole apartment model demo will hang.
pythoncom.PumpWaitingMessages()
else: # Timeout
print("Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount()))
except KeyboardInterrupt:
break
for t in threads:
t.join(2)
self.failIf(t.isAlive(), "thread failed to stop!?")
threads = None # threads hold references to args
# Seems to be a leak here I can't locate :(
#self.failUnlessEqual(pythoncom._GetInterfaceCount(), 0)
#self.failUnlessEqual(pythoncom._GetGatewayCount(), 0)