python类WAIT_OBJECT_0的实例源码

edfexecution.py 文件源码 项目:isf 作者: w3h 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def connect_pipe(pipe, pipeName):
        overLap = pywintypes.OVERLAPPED()
        overLap.hEvent = win32event.CreateEvent(None, 1, 0, None)
        if overLap.hEvent == 0:
            raise PipeError('Could not create hEvent')

        try:
            # Wait for a pipe client connection
            ret = win32pipe.ConnectNamedPipe(pipe, overLap)
            if not ret in (0, ERROR_PIPE_CONNECTED):
                if ret == ERROR_IO_PENDING:
                    ret = win32event.WaitForSingleObject(overLap.hEvent, 
                                                         1000 * CONNECT_TIMEOUT_SECS)
                    if ret != win32event.WAIT_OBJECT_0:
                        # Timeout error
                        raise PipeError('Timeout error')
                else:
                    # API error
                    raise PipeError('API error')

                ret = win32pipe.GetOverlappedResult(pipe, overLap, True)
            if not ret in (0, ERROR_PIPE_CONNECTED):
                # API Error
                raise PipeError('API error 2')
        except PipeError:
            # Named pipe exception
            win32file.CancelIo(pipe)
            pipe.close()
            raise
        except BaseException, err:
            win32file.CancelIo(pipe)
            pipe.close()
            pipe = None
            raise PipeError('BaseException : ' + str(err))

        return pipe
winprocess.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def kill(self, gracePeriod=5000):
        """
        Kill process. Try for an orderly shutdown via WM_CLOSE.  If
        still running after gracePeriod (5 sec. default), terminate.
        """
        win32gui.EnumWindows(self.__close__, 0)
        if self.wait(gracePeriod) != win32event.WAIT_OBJECT_0:
            win32process.TerminateProcess(self.hProcess, 0)
            win32api.Sleep(100) # wait for resources to be released
timer_demo.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def demo (delay=1000, stop=10):
    g = glork(delay, stop)
    # Timers are message based - so we need
    # To run a message loop while waiting for our timers
    # to expire.
    start_time = time.time()
    while 1:
        # We can't simply give a timeout of 30 seconds, as
        # we may continouusly be recieving other input messages,
        # and therefore never expire.
        rc = win32event.MsgWaitForMultipleObjects(
                (g.event,), # list of objects
                0, # wait all
                500,  # timeout
                win32event.QS_ALLEVENTS, # type of input
                )
        if rc == win32event.WAIT_OBJECT_0:
            # Event signalled.
            break
        elif rc == win32event.WAIT_OBJECT_0+1:
            # Message waiting.
            if win32gui.PumpWaitingMessages():
                raise RuntimeError("We got an unexpected WM_QUIT message!")
        else:
            # This wait timed-out.
            if time.time()-start_time > 30:
                raise RuntimeError("We timed out waiting for the timers to expire!")
rastest.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def Connect(entryName, bUseCallback):
    if bUseCallback:
        theCallback = Callback
        win32event.ResetEvent(callbackEvent)
    else:
        theCallback = None
    #       in order to *use* the username/password of a particular dun entry, one must
    #       explicitly get those params under win95....
    try:
        dp, b = win32ras.GetEntryDialParams( None, entryName )
    except:
        print "Couldn't find DUN entry: %s" % entryName
    else:
        hras, rc = win32ras.Dial(None, None, (entryName, "", "", dp[ 3 ], dp[ 4 ], ""),theCallback)
    #       hras, rc = win32ras.Dial(None, None, (entryName, ),theCallback)
    #       print hras, rc
        if not bUseCallback and rc != 0:
            print "Could not dial the RAS connection:", win32ras.GetErrorString(rc)
            hras = HangUp( hras )
        #       don't wait here if there's no need to....
        elif bUseCallback and win32event.WaitForSingleObject(callbackEvent, 60000)!=win32event.WAIT_OBJECT_0:
            print "Gave up waiting for the process to complete!"
            #       sdk docs state one must explcitly hangup, even if there's an error....
            try:
                cs = win32ras.GetConnectStatus( hras )
            except:
                #       on error, attempt a hang up anyway....
                hras = HangUp( hras )
            else:
                if int( cs[ 0 ] ) == win32ras.RASCS_Disconnected:
                    hras = HangUp( hras )
    return hras, rc
test_win32event.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def testWaitableFireLong(self):
        h = win32event.CreateWaitableTimer(None, 0, None)
        dt = int2long(-160) # 160 ns.
        win32event.SetWaitableTimer(h, dt, 0, None, None, 0)
        rc = win32event.WaitForSingleObject(h, 1000)
        self.failUnlessEqual(rc, win32event.WAIT_OBJECT_0)
test_win32event.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def assertSignaled(self, event):
        self.assertEquals(win32event.WaitForSingleObject(event, 0),
                          win32event.WAIT_OBJECT_0)
test_win32event.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def testReleaseMutex(self):
        mutex = win32event.CreateMutex(None, True, None)
        res = win32event.ReleaseMutex(mutex)
        self.assertEqual(res, None)
        res = win32event.WaitForSingleObject(mutex, 0)
        self.assertEqual(res, win32event.WAIT_OBJECT_0)
        mutex.close()
        self.assertRaises(pywintypes.error, win32event.ReleaseMutex, mutex)
test_win32file.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _watcherThreadOverlapped(self, dn, dh, changes):
        flags = win32con.FILE_NOTIFY_CHANGE_FILE_NAME
        buf = win32file.AllocateReadBuffer(8192)
        overlapped = pywintypes.OVERLAPPED()
        overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
        while 1:
            win32file.ReadDirectoryChangesW(dh,
                                            buf,
                                            False, #sub-tree
                                            flags,
                                            overlapped)
            # Wait for our event, or for 5 seconds.
            rc = win32event.WaitForSingleObject(overlapped.hEvent, 5000)
            if rc == win32event.WAIT_OBJECT_0:
                # got some data!  Must use GetOverlappedResult to find out
                # how much is valid!  0 generally means the handle has
                # been closed.  Blocking is OK here, as the event has
                # already been set.
                nbytes = win32file.GetOverlappedResult(dh, overlapped, True)
                if nbytes:
                    bits = win32file.FILE_NOTIFY_INFORMATION(buf, nbytes)
                    changes.extend(bits)
                else:
                    # This is "normal" exit - our 'tearDown' closes the
                    # handle.
                    # print "looks like dir handle was closed!"
                    return
            else:
                print "ERROR: Watcher thread timed-out!"
                return # kill the thread!
test_win32file.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def connect_thread_runner(self, expect_payload, giveup_event):
        # As Windows 2000 doesn't do ConnectEx, we need to use a non-blocking
        # accept, as our test connection may never come.  May as well use
        # AcceptEx for this...
        listener = socket.socket()
        self.addr = ('localhost', random.randint(10000,64000))
        listener.bind(self.addr)
        listener.listen(1)

        # create accept socket
        accepter = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # An overlapped
        overlapped = pywintypes.OVERLAPPED()
        overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
        # accept the connection.
        if expect_payload:
            buf_size = 1024
        else:
            # when we don't expect data we must be careful to only pass the
            # exact number of bytes for the endpoint data...
            buf_size = win32file.CalculateSocketEndPointSize(listener)

        buffer = win32file.AllocateReadBuffer(buf_size)
        win32file.AcceptEx(listener, accepter, buffer, overlapped)
        # wait for the connection or our test to fail.
        events = giveup_event, overlapped.hEvent
        rc = win32event.WaitForMultipleObjects(events, False, 2000)
        if rc == win32event.WAIT_TIMEOUT:
            self.fail("timed out waiting for a connection")
        if rc == win32event.WAIT_OBJECT_0:
            # Our main thread running the test failed and will never connect.
            return
        # must be a connection.
        nbytes = win32file.GetOverlappedResult(listener.fileno(), overlapped, False)
        if expect_payload:
            self.request = buffer[:nbytes]
        accepter.send(str2bytes('some expected response'))
testMarshal.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _DoTestMarshal(self, fn, bCoWait = 0):
        #print "The main thread is %d" % (win32api.GetCurrentThreadId())
        threads, events = fn(2)
        numFinished = 0
        while 1:
            try:
                if bCoWait:
                    rc = pythoncom.CoWaitForMultipleHandles(0, 2000, events)
                else:
                    # Specifying "bWaitAll" here will wait for messages *and* all events
                    # (which is pretty useless)
                    rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
                if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
                    numFinished = numFinished + 1
                    if numFinished >= len(events):
                        break
                elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
                    # This is critical - whole apartment model demo will hang.
                    pythoncom.PumpWaitingMessages()
                else: # Timeout
                    print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
            except KeyboardInterrupt:
                break
        for t in threads:
            t.join(2)
            self.failIf(t.isAlive(), "thread failed to stop!?")
        threads = None # threads hold references to args
        # Seems to be a leak here I can't locate :(
        #self.failUnlessEqual(pythoncom._GetInterfaceCount(), 0)
        #self.failUnlessEqual(pythoncom._GetGatewayCount(), 0)
eventsFreeThreaded.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def TestExplorerEvents():
    iexplore = win32com.client.DispatchWithEvents(
        "InternetExplorer.Application", ExplorerEvents)

    thread = win32api.GetCurrentThreadId()
    print 'TestExplorerEvents created IE object on thread %d'%thread

    iexplore.Visible = 1
    try:
        iexplore.Navigate(win32api.GetFullPathName('..\\readme.htm'))
    except pythoncom.com_error, details:
        print "Warning - could not open the test HTML file", details

    # In this free-threaded example, we can simply wait until an event has 
    # been set - we will give it 2 seconds before giving up.
    rc = win32event.WaitForSingleObject(iexplore.event, 2000)
    if rc != win32event.WAIT_OBJECT_0:
        print "Document load event FAILED to fire!!!"

    iexplore.Quit()
    # Now we can do the same thing to wait for exit!
    # Although Quit generates events, in this free-threaded world we
    # do *not* need to run any message pumps.

    rc = win32event.WaitForSingleObject(iexplore.event, 2000)
    if rc != win32event.WAIT_OBJECT_0:
        print "OnQuit event FAILED to fire!!!"

    iexplore = None
    print "Finished the IE event sample!"
winprocess.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def kill(self, gracePeriod=5000):
        """
        Kill process. Try for an orderly shutdown via WM_CLOSE.  If
        still running after gracePeriod (5 sec. default), terminate.
        """
        win32gui.EnumWindows(self.__close__, 0)
        if self.wait(gracePeriod) != win32event.WAIT_OBJECT_0:
            win32process.TerminateProcess(self.hProcess, 0)
            win32api.Sleep(100) # wait for resources to be released
timer_demo.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def demo (delay=1000, stop=10):
    g = glork(delay, stop)
    # Timers are message based - so we need
    # To run a message loop while waiting for our timers
    # to expire.
    start_time = time.time()
    while 1:
        # We can't simply give a timeout of 30 seconds, as
        # we may continouusly be recieving other input messages,
        # and therefore never expire.
        rc = win32event.MsgWaitForMultipleObjects(
                (g.event,), # list of objects
                0, # wait all
                500,  # timeout
                win32event.QS_ALLEVENTS, # type of input
                )
        if rc == win32event.WAIT_OBJECT_0:
            # Event signalled.
            break
        elif rc == win32event.WAIT_OBJECT_0+1:
            # Message waiting.
            if win32gui.PumpWaitingMessages():
                raise RuntimeError("We got an unexpected WM_QUIT message!")
        else:
            # This wait timed-out.
            if time.time()-start_time > 30:
                raise RuntimeError("We timed out waiting for the timers to expire!")
test_win32event.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def testWaitableFireLong(self):
        h = win32event.CreateWaitableTimer(None, 0, None)
        dt = int2long(-160) # 160 ns.
        win32event.SetWaitableTimer(h, dt, 0, None, None, 0)
        rc = win32event.WaitForSingleObject(h, 1000)
        self.failUnlessEqual(rc, win32event.WAIT_OBJECT_0)
test_win32event.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testWaitableFire(self):
        h = win32event.CreateWaitableTimer(None, 0, None)
        dt = -160 # 160 ns.
        win32event.SetWaitableTimer(h, dt, 0, None, None, 0)
        rc = win32event.WaitForSingleObject(h, 1000)
        self.failUnlessEqual(rc, win32event.WAIT_OBJECT_0)
test_win32event.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def assertSignaled(self, event):
        self.assertEquals(win32event.WaitForSingleObject(event, 0),
                          win32event.WAIT_OBJECT_0)
test_win32event.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def testReleaseMutex(self):
        mutex = win32event.CreateMutex(None, True, None)
        res = win32event.ReleaseMutex(mutex)
        self.assertEqual(res, None)
        res = win32event.WaitForSingleObject(mutex, 0)
        self.assertEqual(res, win32event.WAIT_OBJECT_0)
        mutex.close()
        self.assertRaises(pywintypes.error, win32event.ReleaseMutex, mutex)
test_win32file.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _watcherThreadOverlapped(self, dn, dh, changes):
        flags = win32con.FILE_NOTIFY_CHANGE_FILE_NAME
        buf = win32file.AllocateReadBuffer(8192)
        overlapped = pywintypes.OVERLAPPED()
        overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
        while 1:
            win32file.ReadDirectoryChangesW(dh,
                                            buf,
                                            False, #sub-tree
                                            flags,
                                            overlapped)
            # Wait for our event, or for 5 seconds.
            rc = win32event.WaitForSingleObject(overlapped.hEvent, 5000)
            if rc == win32event.WAIT_OBJECT_0:
                # got some data!  Must use GetOverlappedResult to find out
                # how much is valid!  0 generally means the handle has
                # been closed.  Blocking is OK here, as the event has
                # already been set.
                nbytes = win32file.GetOverlappedResult(dh, overlapped, True)
                if nbytes:
                    bits = win32file.FILE_NOTIFY_INFORMATION(buf, nbytes)
                    changes.extend(bits)
                else:
                    # This is "normal" exit - our 'tearDown' closes the
                    # handle.
                    # print "looks like dir handle was closed!"
                    return
            else:
                print("ERROR: Watcher thread timed-out!")
                return # kill the thread!
testMarshal.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def _DoTestMarshal(self, fn, bCoWait = 0):
        #print "The main thread is %d" % (win32api.GetCurrentThreadId())
        threads, events = fn(2)
        numFinished = 0
        while 1:
            try:
                if bCoWait:
                    rc = pythoncom.CoWaitForMultipleHandles(0, 2000, events)
                else:
                    # Specifying "bWaitAll" here will wait for messages *and* all events
                    # (which is pretty useless)
                    rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
                if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
                    numFinished = numFinished + 1
                    if numFinished >= len(events):
                        break
                elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
                    # This is critical - whole apartment model demo will hang.
                    pythoncom.PumpWaitingMessages()
                else: # Timeout
                    print("Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount()))
            except KeyboardInterrupt:
                break
        for t in threads:
            t.join(2)
            self.failIf(t.isAlive(), "thread failed to stop!?")
        threads = None # threads hold references to args
        # Seems to be a leak here I can't locate :(
        #self.failUnlessEqual(pythoncom._GetInterfaceCount(), 0)
        #self.failUnlessEqual(pythoncom._GetGatewayCount(), 0)
eventsFreeThreaded.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def TestExplorerEvents():
    iexplore = win32com.client.DispatchWithEvents(
        "InternetExplorer.Application", ExplorerEvents)

    thread = win32api.GetCurrentThreadId()
    print('TestExplorerEvents created IE object on thread %d'%thread)

    iexplore.Visible = 1
    try:
        iexplore.Navigate(win32api.GetFullPathName('..\\readme.htm'))
    except pythoncom.com_error as details:
        print("Warning - could not open the test HTML file", details)

    # In this free-threaded example, we can simply wait until an event has 
    # been set - we will give it 2 seconds before giving up.
    rc = win32event.WaitForSingleObject(iexplore.event, 2000)
    if rc != win32event.WAIT_OBJECT_0:
        print("Document load event FAILED to fire!!!")

    iexplore.Quit()
    # Now we can do the same thing to wait for exit!
    # Although Quit generates events, in this free-threaded world we
    # do *not* need to run any message pumps.

    rc = win32event.WaitForSingleObject(iexplore.event, 2000)
    if rc != win32event.WAIT_OBJECT_0:
        print("OnQuit event FAILED to fire!!!")

    iexplore = None
    print("Finished the IE event sample!")


问题


面经


文章

微信
公众号

扫码关注公众号