python类stop()的实例源码

imitation_scrapy.py 文件源码 项目:Spider 作者: Ctrlsman 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _stop_reactor(self, _=None):
        reactor.stop()
tkunzip.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def doItTkinterly(opt):
    root=Tkinter.Tk()
    root.withdraw()
    root.title('One Moment.')
    root.protocol('WM_DELETE_WINDOW', reactor.stop)
    tksupport.install(root)

    prog=ProgressBar(root, value=0, labelColor="black", width=200)
    prog.pack()

    # callback immediately
    d=defer.succeed(root).addErrback(log.err)

    def deiconify(root):
        root.deiconify()
        return root

    d.addCallback(deiconify)

    if opt['zipfile']:
        uz=Progressor('Unpacking documentation...')
        max=zipstream.countZipFileChunks(opt['zipfile'], 4096)
        uz.setBar(prog, max)
        uz.setIterator(zipstream.unzipIterChunky(opt['zipfile'],
                                                 opt['ziptargetdir']))
        d.addCallback(uz.processAll)

    if opt['compiledir']:
        comp=Progressor('Compiling to pyc...')
        comp.setBar(prog, countPysRecursive(opt['compiledir']))
        comp.setIterator(compiler(opt['compiledir']))
        d.addCallback(comp.processAll)

    def stop(ignore):
        reactor.stop()
        root.destroy()
    d.addCallback(stop)

    reactor.run()
stdio.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def handle_QUIT(self):
        ColoredManhole.handle_QUIT(self)
        reactor.stop()
tkconch.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def deferredAskFrame(question, echo):
    if frame.callback:
        raise "can't ask 2 questions at once!"
    d = defer.Deferred()
    resp = []
    def gotChar(ch, resp=resp):
        if not ch: return
        if ch=='\x03': # C-c
            reactor.stop()
        if ch=='\r':
            frame.write('\r\n')
            stresp = ''.join(resp)
            del resp
            frame.callback = None
            d.callback(stresp)
            return
        elif 32 <= ord(ch) < 127:
            resp.append(ch)
            if echo:
                frame.write(ch)
        elif ord(ch) == 8 and resp: # BS
            if echo: frame.write('\x08 \x08')
            resp.pop()
    frame.callback = gotChar
    frame.write(question)
    frame.canvas.focus_force()
    return d
tkconch.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def handleError():
    from twisted.python import failure
    global exitStatus
    exitStatus = 2
    log.err(failure.Failure())
    reactor.stop()
    raise
tkconch.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def closed(self):
        log.msg('closed %s' % self)
        if len(self.conn.channels) == 1: # just us left
            reactor.stop()
cftp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handleError():
    from twisted.python import failure
    global exitStatus
    exitStatus = 2
    try:
        reactor.stop()
    except: pass
    log.err(failure.Failure())
    raise
cftp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _ebExit(f):
    #global exitStatus
    if hasattr(f.value, 'value'):
        s = f.value.value
    else:
        s = str(f)
    print s
    #exitStatus = "conch: exiting with error %s" % f
    try:
        reactor.stop()
    except: pass
conch.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _stopReactor():
    try:
        reactor.stop()
    except: pass
pyuisupport.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _guiUpdate(reactor, delay):
    pyui.draw()
    if pyui.update() == 0:
        pyui.quit()
        reactor.stop()
    else:
        reactor.callLater(delay, _guiUpdate, reactor, delay)
stdio_test_hostpeer.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def connectionLost(self, reason):
        reactor.stop()
stdio_test_consumer.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def connectionLost(self, reason):
        reactor.stop()
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def testIterate(self):
        """
        Test that reactor.iterate(0) doesn't block
        """
        start = time.time()
        # twisted timers are distinct from the underlying event loop's
        # timers, so this fail-safe probably won't keep a failure from
        # hanging the test
        t = reactor.callLater(10, reactor.crash)
        reactor.iterate(0) # shouldn't block
        stop = time.time()
        elapsed = stop - start
        #print "elapsed", elapsed
        self.failUnless(elapsed < 8)
        t.cancel()
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def stopProducing(self):
        self.events.append('stop')
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_unconnectedFileDescriptor(self):
        """
        Verify that registering a producer when the connection has already
        been closed invokes its stopProducing() method.
        """
        fd = abstract.FileDescriptor()
        fd.disconnected = 1
        dp = DummyProducer()
        fd.registerProducer(dp, 0)
        self.assertEquals(dp.events, ['stop'])
test_udp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def testShutdownFromDatagramReceived(self):
        """Test reactor shutdown while in a recvfrom() loop"""

        # udp.Port's doRead calls recvfrom() in a loop, as an optimization.
        # It is important this loop terminate under various conditions.
        # Previously, if datagramReceived synchronously invoked
        # reactor.stop(), under certain reactors, the Port's socket would
        # synchronously disappear, causing an AttributeError inside that
        # loop.  This was mishandled, causing the loop to spin forever.
        # This test is primarily to ensure that the loop never spins
        # forever.

        finished = defer.Deferred()
        pr = self.server.packetReceived = defer.Deferred()

        def pktRece(ignored):
            # Simulate reactor.stop() behavior :(
            self.server.transport.connectionLost()
            # Then delay this Deferred chain until the protocol has been
            # disconnected, as the reactor should do in an error condition
            # such as we are inducing.  This is very much a whitebox test.
            reactor.callLater(0, finished.callback, None)
        pr.addCallback(pktRece)

        def flushErrors(ignored):
            # We are breaking abstraction and calling private APIs, any
            # number of horrible errors might occur.  As long as the reactor
            # doesn't hang, this test is satisfied.  (There may be room for
            # another, stricter test.)
            self.flushLoggedErrors()
        finished.addCallback(flushErrors)
        self.server.transport.write('\0' * 64, ('127.0.0.1',
                                    self.server.transport.getHost().port))
        return finished


问题


面经


文章

微信
公众号

扫码关注公众号