def SvcStop(self):
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
win32event.SetEvent(self.hWaitStop)
python类SetEvent()的实例源码
def testSetEvent(self):
event = win32event.CreateEvent(None, True, False, None)
self.assertNotSignaled(event)
res = win32event.SetEvent(event)
self.assertEquals(res, None)
self.assertSignaled(event)
event.close()
self.assertRaises(pywintypes.error, win32event.SetEvent, event)
def _DocumentStateChanged(self):
win32event.SetEvent(self.adminEvent)
def SignalStop(self):
win32event.SetEvent(self.stopEvent)
def _StopThread(self):
win32event.SetEvent(self.hStopThread)
self.hStopThread = None
def _testInterpInThread(self, stopEvent, interp):
try:
self._doTestInThread(interp)
finally:
win32event.SetEvent(stopEvent)
def OnDocumentComplete(self,
pDisp=pythoncom.Empty,
URL=pythoncom.Empty):
thread = win32api.GetCurrentThreadId()
print "OnDocumentComplete event processed on thread %d"%thread
# Set the event our main thread is waiting on.
win32event.SetEvent(self.event)
def OnQuit(self):
thread = win32api.GetCurrentThreadId()
print "OnQuit event processed on thread %d"%thread
win32event.SetEvent(self.event)
def OnDocumentComplete(self,
pDisp=pythoncom.Empty,
URL=pythoncom.Empty):
#
# Caution: Since the main thread and events thread(s) are different
# it may be necessary to serialize access to shared data. Because
# this is a simple test case, that is not required here. Your
# situation may be different. Caveat programmer.
#
thread = win32api.GetCurrentThreadId()
print "OnDocumentComplete event processed on thread %d"%thread
# Set the event our main thread is waiting on.
win32event.SetEvent(self.event)
def OnQuit(self):
thread = win32api.GetCurrentThreadId()
print "OnQuit event processed on thread %d"%thread
win32event.SetEvent(self.event)
def SvcStop(self):
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
win32event.SetEvent(self.hWaitStop)
self.isAlive = False
def JobTransferred(self, job):
print('Job Transferred', job)
job.Complete()
win32event.SetEvent(StopEvent) # exit msg pump
def _DocumentStateChanged(self):
win32event.SetEvent(self.adminEvent)
def SignalStop(self):
win32event.SetEvent(self.stopEvent)
def _StopThread(self):
win32event.SetEvent(self.hStopThread)
self.hStopThread = None
def increment (self, id, time):
print('x = %d' % self.x)
self.x = self.x + 1
# if we've reached the max count,
# kill off the timer.
if self.x > self.max:
# we could have used 'self.id' here, too
timer.kill_timer (id)
win32event.SetEvent(self.event)
# create a counter that will count from '1' thru '10', incrementing
# once a second, and then stop.
def SvcStop(self):
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
win32event.SetEvent(self.hWaitStop)
def testSetEvent(self):
event = win32event.CreateEvent(None, True, False, None)
self.assertNotSignaled(event)
res = win32event.SetEvent(event)
self.assertEquals(res, None)
self.assertSignaled(event)
event.close()
self.assertRaises(pywintypes.error, win32event.SetEvent, event)
def test_connect_with_payload(self):
giveup_event = win32event.CreateEvent(None, 0, 0, None)
t = threading.Thread(target=self.connect_thread_runner,
args=(True, giveup_event))
t.start()
time.sleep(0.1)
s2 = socket.socket()
ol = pywintypes.OVERLAPPED()
s2.bind(('0.0.0.0', 0)) # connectex requires the socket be bound beforehand
try:
win32file.ConnectEx(s2, self.addr, ol, str2bytes("some expected request"))
except win32file.error as exc:
win32event.SetEvent(giveup_event)
if exc.winerror == 10022: # WSAEINVAL
raise TestSkipped("ConnectEx is not available on this platform")
raise # some error error we don't expect.
win32file.GetOverlappedResult(s2.fileno(), ol, 1)
ol = pywintypes.OVERLAPPED()
buff = win32file.AllocateReadBuffer(1024)
win32file.WSARecv(s2, buff, ol, 0)
length = win32file.GetOverlappedResult(s2.fileno(), ol, 1)
self.response = buff[:length]
self.assertEqual(self.response, str2bytes('some expected response'))
self.assertEqual(self.request, str2bytes('some expected request'))
t.join(5)
self.failIf(t.isAlive(), "worker thread didn't terminate")
def _testInterpInThread(self, stopEvent, interp):
try:
self._doTestInThread(interp)
finally:
win32event.SetEvent(stopEvent)