def Run(self):
while 1:
handles = [self.stopEvent, self.adminEvent]
if self.watchEvent is not None:
handles.append(self.watchEvent)
rc = win32event.WaitForMultipleObjects(handles, 0, win32event.INFINITE)
if rc == win32event.WAIT_OBJECT_0:
break
elif rc == win32event.WAIT_OBJECT_0+1:
self.RefreshEvent()
else:
win32api.PostMessage(self.hwnd, MSG_CHECK_EXTERNAL_FILE, 0, 0)
try:
# If the directory has been removed underneath us, we get this error.
win32api.FindNextChangeNotification(self.watchEvent)
except win32api.error, exc:
print "Can not watch file", self.doc.GetPathName(), "for changes -", exc.strerror
break
# close a circular reference
self.doc = None
if self.watchEvent:
win32api.FindCloseChangeNotification(self.watchEvent)
python类WaitForMultipleObjects()的实例源码
def CollectorThread(stopEvent, file):
win32trace.InitRead()
handle = win32trace.GetHandle()
# Run this thread at a lower priority to the main message-loop (and printing output)
# thread can keep up
import win32process
win32process.SetThreadPriority(win32api.GetCurrentThread(), win32process.THREAD_PRIORITY_BELOW_NORMAL)
try:
while 1:
rc = win32event.WaitForMultipleObjects((handle, stopEvent), 0, win32event.INFINITE)
if rc == win32event.WAIT_OBJECT_0:
# About the only char we can't live with is \0!
file.write(win32trace.read().replace("\0", "<null>"))
else:
# Stop event
break
finally:
win32trace.TermRead()
print "Thread dieing"
def Run(self):
while 1:
handles = [self.stopEvent, self.adminEvent]
if self.watchEvent is not None:
handles.append(self.watchEvent)
rc = win32event.WaitForMultipleObjects(handles, 0, win32event.INFINITE)
if rc == win32event.WAIT_OBJECT_0:
break
elif rc == win32event.WAIT_OBJECT_0+1:
self.RefreshEvent()
else:
win32api.PostMessage(self.hwnd, MSG_CHECK_EXTERNAL_FILE, 0, 0)
try:
# If the directory has been removed underneath us, we get this error.
win32api.FindNextChangeNotification(self.watchEvent)
except win32api.error as exc:
print("Can not watch file", self.doc.GetPathName(), "for changes -", exc.strerror)
break
# close a circular reference
self.doc = None
if self.watchEvent:
win32api.FindCloseChangeNotification(self.watchEvent)
def CollectorThread(stopEvent, file):
win32trace.InitRead()
handle = win32trace.GetHandle()
# Run this thread at a lower priority to the main message-loop (and printing output)
# thread can keep up
import win32process
win32process.SetThreadPriority(win32api.GetCurrentThread(), win32process.THREAD_PRIORITY_BELOW_NORMAL)
try:
while 1:
rc = win32event.WaitForMultipleObjects((handle, stopEvent), 0, win32event.INFINITE)
if rc == win32event.WAIT_OBJECT_0:
# About the only char we can't live with is \0!
file.write(win32trace.read().replace("\0", "<null>"))
else:
# Stop event
break
finally:
win32trace.TermRead()
print("Thread dieing")
def wait(self, state, interval=0.1, channel=None):
"""Wait for the given state(s), KeyboardInterrupt or SystemExit.
Since this class uses native win32event objects, the interval
argument is ignored.
"""
if isinstance(state, (tuple, list)):
# Don't wait for an event that beat us to the punch ;)
if self.state not in state:
events = tuple([self._get_state_event(s) for s in state])
win32event.WaitForMultipleObjects(
events, 0, win32event.INFINITE)
else:
# Don't wait for an event that beat us to the punch ;)
if self.state != state:
event = self._get_state_event(state)
win32event.WaitForSingleObject(event, win32event.INFINITE)
def eof(self):
### should be calling file.eof() here instead of file.close(), there
### may be data in the pipe or buffer after the process exits
if sys.platform == "win32":
r = win32event.WaitForMultipleObjects(self.wait_for, 1, 0)
if r == win32event.WAIT_OBJECT_0:
self.file.close()
self.file = None
return win32process.GetExitCodeProcess(self.child_pid)
return None
if self.thread and self.thread.isAlive():
return None
pid, status = os.waitpid(self.child_pid, os.WNOHANG)
if pid:
self.file.close()
self.file = None
return status
return None
def close(self):
if self.file:
self.file.close()
self.file = None
if sys.platform == "win32":
win32event.WaitForMultipleObjects(self.wait_for, 1, win32event.INFINITE)
return win32process.GetExitCodeProcess(self.child_pid)
else:
if self.thread:
self.thread.join()
if type(self.child_pid) == type([]):
for pid in self.child_pid:
exit = os.waitpid(pid, 0)[1]
return exit
else:
return os.waitpid(self.child_pid, 0)[1]
return None
def wait(self, state, interval=0.1, channel=None):
"""Wait for the given state(s), KeyboardInterrupt or SystemExit.
Since this class uses native win32event objects, the interval
argument is ignored.
"""
if isinstance(state, (tuple, list)):
# Don't wait for an event that beat us to the punch ;)
if self.state not in state:
events = tuple([self._get_state_event(s) for s in state])
win32event.WaitForMultipleObjects(
events, 0, win32event.INFINITE)
else:
# Don't wait for an event that beat us to the punch ;)
if self.state != state:
event = self._get_state_event(state)
win32event.WaitForSingleObject(event, win32event.INFINITE)
def signalCheckerThread(self):
while not self.shutdown_requested:
handles = [self.admin_event_handle]
signums = [None]
for signum, handle in self.event_handles.items():
signums.append(signum)
handles.append(handle)
rc = win32event.WaitForMultipleObjects(handles, False,
win32event.INFINITE)
logger.debug("signalCheckerThread awake with %s" % rc)
signum = signums[rc - win32event.WAIT_OBJECT_0]
if signum is None:
# Admin event - either shutdown, or new event object created.
pass
else:
logger.debug("signalCheckerThread calling %s" % signum)
self.signalHandler(signum, None)
logger.debug("signalCheckerThread back")
logger.debug("signalCheckerThread stopped")
def pacemaker(self, timeout=60):
# This is a stand-alone heartbeat generator. To pulse from your own control loop,
# call your AbstractLog subclass instance event handler (e.g. AbstractLog['event']()
def __target(timeout=60):
if platform.uname()[0].lower() == "windows":
import win32con
import win32event
self.running = True
kill = win32event.CreateEvent(None, 1, 0, None)
pulse = win32event.CreateWaitableTimer(None, 0, None)
win32event.SetWaitableTimer(pulse, 0, timeout*1000, None, None, False)
while(self.running):
try:
result = win32event.WaitForMultipleObjects([kill, pulse], False, 1000)
# if kill signal received, break loop
if(result == win32con.WAIT_OBJECT_0): break
# elif timeout has passed, generate a pulse
elif(result == win32con.WAIT_OBJECT_0 + 1): self['event']()
except:
self.notifyOfError("Pacemaker shutdown. Heartbeats will not be generated.")
win32event.SetEvent(kill)
elif self.options['Verbose']: print "Pacemaker only supported in Windows at this time. "
try:
self.thread = threading.Thread(target=__target, args=(timeout,) )
self.thread.start()
except:
self.notifyOfError("Pacemaker thread exception. Heartbeats will not be generated.")
def connect_thread_runner(self, expect_payload, giveup_event):
# As Windows 2000 doesn't do ConnectEx, we need to use a non-blocking
# accept, as our test connection may never come. May as well use
# AcceptEx for this...
listener = socket.socket()
self.addr = ('localhost', random.randint(10000,64000))
listener.bind(self.addr)
listener.listen(1)
# create accept socket
accepter = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# An overlapped
overlapped = pywintypes.OVERLAPPED()
overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
# accept the connection.
if expect_payload:
buf_size = 1024
else:
# when we don't expect data we must be careful to only pass the
# exact number of bytes for the endpoint data...
buf_size = win32file.CalculateSocketEndPointSize(listener)
buffer = win32file.AllocateReadBuffer(buf_size)
win32file.AcceptEx(listener, accepter, buffer, overlapped)
# wait for the connection or our test to fail.
events = giveup_event, overlapped.hEvent
rc = win32event.WaitForMultipleObjects(events, False, 2000)
if rc == win32event.WAIT_TIMEOUT:
self.fail("timed out waiting for a connection")
if rc == win32event.WAIT_OBJECT_0:
# Our main thread running the test failed and will never connect.
return
# must be a connection.
nbytes = win32file.GetOverlappedResult(listener.fileno(), overlapped, False)
if expect_payload:
self.request = buffer[:nbytes]
accepter.send(str2bytes('some expected response'))
def connect_thread_runner(self, expect_payload, giveup_event):
# As Windows 2000 doesn't do ConnectEx, we need to use a non-blocking
# accept, as our test connection may never come. May as well use
# AcceptEx for this...
listener = socket.socket()
self.addr = ('localhost', random.randint(10000,64000))
listener.bind(self.addr)
listener.listen(1)
# create accept socket
accepter = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# An overlapped
overlapped = pywintypes.OVERLAPPED()
overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
# accept the connection.
if expect_payload:
buf_size = 1024
else:
# when we don't expect data we must be careful to only pass the
# exact number of bytes for the endpoint data...
buf_size = win32file.CalculateSocketEndPointSize(listener)
buffer = win32file.AllocateReadBuffer(buf_size)
win32file.AcceptEx(listener, accepter, buffer, overlapped)
# wait for the connection or our test to fail.
events = giveup_event, overlapped.hEvent
rc = win32event.WaitForMultipleObjects(events, False, 2000)
if rc == win32event.WAIT_TIMEOUT:
self.fail("timed out waiting for a connection")
if rc == win32event.WAIT_OBJECT_0:
# Our main thread running the test failed and will never connect.
return
# must be a connection.
nbytes = win32file.GetOverlappedResult(listener.fileno(), overlapped, False)
if expect_payload:
self.request = buffer[:nbytes]
accepter.send(str2bytes('some expected response'))