python类callFromThread()的实例源码

twistedreactor.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        """
        Initialization method.

        Note that we can't call reactor methods directly here because
        it's not thread-safe, so we schedule the reactor/connection
        stuff to be run from the event loop thread when it gets the
        chance.
        """
        Connection.__init__(self, *args, **kwargs)

        self.is_closed = True
        self.connector = None

        reactor.callFromThread(self.add_connection)
        self._loop.maybe_start()
_twistd_unix.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def startLogging(logfilename, sysLog, prefix, nodaemon):
    if logfilename == '-':
        if not nodaemon:
            print 'daemons cannot log to stdout'
            os._exit(1)
        logFile = sys.stdout
    elif sysLog:
        syslog.startLogging(prefix)
    elif nodaemon and not logfilename:
        logFile = sys.stdout
    else:
        logFile = app.getLogFile(logfilename or 'twistd.log')
        try:
            import signal
        except ImportError:
            pass
        else:
            def rotateLog(signal, frame):
                from twisted.internet import reactor
                reactor.callFromThread(logFile.rotate)
            signal.signal(signal.SIGUSR1, rotateLog)

    if not sysLog:
        log.startLogging(logFile)
    sys.stdout.flush()
twistedtools.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def stop_reactor():
    """Stop the reactor and join the reactor thread until it stops.
    Call this function in teardown at the module or package level to
    reset the twisted system after your tests. You *must* do this if
    you mix tests using these tools and tests using twisted.trial.
    """
    global _twisted_thread

    def stop_reactor():
        '''Helper for calling stop from withing the thread.'''
        reactor.stop()

    reactor.callFromThread(stop_reactor)
    reactor_thread.join()
    for p in reactor.getDelayedCalls():
        if p.active():
            p.cancel()
    _twisted_thread = None
NeoNode.py 文件源码 项目:neo-python 作者: CityOfZion 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def MessageReceived(self, m):

        #        self.Log("Messagereceived and processed ...: %s " % m.Command)

        if m.Command == 'verack':
            self.HandleVerack()
        elif m.Command == 'version':
            self.HandleVersion(m.Payload)
        elif m.Command == 'getaddr':
            self.SendPeerInfo()
        elif m.Command == 'getdata':
            self.HandleGetDataMessageReceived(m.Payload)
        elif m.Command == 'inv':
            self.HandleInvMessage(m.Payload)
        elif m.Command == 'block':
            self.HandleBlockReceived(m.Payload)
        elif m.Command == 'headers':
            reactor.callFromThread(self.HandleBlockHeadersReceived, m.Payload)
#            self.HandleBlockHeadersReceived(m.Payload)
        elif m.Command == 'addr':
            self.HandlePeerInfoReceived(m.Payload)
        else:
            self.Log("Command %s not implemented " % m.Command)
_twistd_unix.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def startLogging(logfilename, sysLog, prefix, nodaemon):
    if logfilename == '-':
        if not nodaemon:
            print 'daemons cannot log to stdout'
            os._exit(1)
        logFile = sys.stdout
    elif sysLog:
        syslog.startLogging(prefix)
    elif nodaemon and not logfilename:
        logFile = sys.stdout
    else:
        logFile = app.getLogFile(logfilename or 'twistd.log')
        try:
            import signal
        except ImportError:
            pass
        else:
            def rotateLog(signal, frame):
                from twisted.internet import reactor
                reactor.callFromThread(logFile.rotate)
            signal.signal(signal.SIGUSR1, rotateLog)

    if not sysLog:
        log.startLogging(logFile)
    sys.stdout.flush()
twistedtools.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def stop_reactor():
    """Stop the reactor and join the reactor thread until it stops.
    Call this function in teardown at the module or package level to
    reset the twisted system after your tests. You *must* do this if
    you mix tests using these tools and tests using twisted.trial.
    """
    global _twisted_thread

    def stop_reactor():
        '''Helper for calling stop from withing the thread.'''
        reactor.stop()

    reactor.callFromThread(stop_reactor)
    reactor_thread.join()
    for p in reactor.getDelayedCalls():
        if p.active():
            p.cancel()
    _twisted_thread = None
util.py 文件源码 项目:py-ipv8 作者: qstokkink 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def blockingCallFromThread(reactor, f, *args, **kwargs):
    """
    Improved version of twisted's blockingCallFromThread that shows the complete
    stacktrace when an exception is raised on the reactor's thread.
    If being called from the reactor thread already, just return the result of execution of the callable.
    """
    if isInIOThread():
            return f(*args, **kwargs)
    else:
        queue = Queue.Queue()

        def _callFromThread():
            result = defer.maybeDeferred(f, *args, **kwargs)
            result.addBoth(queue.put)
        reactor.callFromThread(_callFromThread)
        result = queue.get()
        if isinstance(result, failure.Failure):
            other_thread_tb = traceback.extract_tb(result.getTracebackObject())
            this_thread_tb = traceback.extract_stack()
            logger.error("Exception raised on the reactor's thread %s: \"%s\".\n Traceback from this thread:\n%s\n"
                         " Traceback from the reactor's thread:\n %s", result.type.__name__, result.getErrorMessage(),
                         ''.join(traceback.format_list(this_thread_tb)), ''.join(traceback.format_list(other_thread_tb)))
            result.raiseException()
        return result
util.py 文件源码 项目:py-ipv8 作者: qstokkink 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def threaded_reactor():
    """
    Start the Twisted reactor in a separate thread, if not already done.
    Returns the reactor.
    """
    global _twisted_thread
    if not _twisted_thread:
        from threading import Thread

        _twisted_thread = Thread(target=lambda: reactor.run(installSignalHandlers=False), name="Twisted")
        _twisted_thread.setDaemon(True)
        _twisted_thread.start()

        def hook_observer():
            observer = log.PythonLoggingObserver()
            observer.start()
            import logging
            log.msg("PythonLoggingObserver hooked up", logLevel=logging.DEBUG)

        reactor.callFromThread(hook_observer)

    return reactor, _twisted_thread
serve.py 文件源码 项目:earmark 作者: lowerquality 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def re_run(self, utt):
        if 'wavpath' not in utt:
            return

        k = Kaldi(
            get_resource('data/nnet_a_gpu_online'),
            self.gen_hclg_filename,
            get_resource('PROTO_LANGDIR'))
        audio = numm3.sound2np(
            os.path.join(self.resources['attach'].attachdir, utt['wavpath']),
            nchannels=1,
            R=8000)
        k.push_chunk(audio.tostring())
        wds = k.get_final()
        k.stop()
        for wd in wds:
            del wd['phones']
        utt['command_words'] = wds
        utt['command'] = ' '.join([X['word'] for X in wds])

        reactor.callFromThread(self.db.onchange, None, {"type": "change",
                                                        "id": utt["_id"],
                                                        "doc": utt})
simulation.py 文件源码 项目:vulnsite 作者: itsZN 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def spawnAdmin(self, user):
        if user.permission==0:
            time.sleep(5)
        else:
            proc = subprocess.Popen(['phantomjs', 'phantom/checkMessages.js',
                globalVals.args.domain+':'+str(globalVals.args.port), ADMIN_PASS],
                stdout=subprocess.PIPE
            )
            log,_ = proc.communicate()
            print log
            f = open('phantom/messages.log','a')
            f.write(log)
            f.close()
            time.sleep(5)

        reactor.callFromThread(self.adminSendResponse, user)
paho.py 文件源码 项目:kotori 作者: daq-tools 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def connect(self):
        """
        Connect to MQTT broker.
        """
        # TODO: This is currently done synchronous which could have issues in timeout situations
        #       because it would block other subsystems.
        #       => Check if we can do asynchronous connection establishment.
        self.client = mqtt.Client(client_id=self.name, clean_session=True, userdata={'foo': 'bar'})

        if self.broker_username:
            self.client.username_pw_set(self.broker_username, self.broker_password)

        self.client.on_connect = lambda *args: reactor.callFromThread(self.on_connect, *args)
        self.client.on_message = lambda *args: reactor.callFromThread(self.on_message, *args)
        self.client.on_log     = lambda *args: reactor.callFromThread(self.on_log, *args)

        # Connect with retry
        self.connect_loop = LoopingCall(self.connect_with_retry)
        self.connect_loop.start(self.retry_interval, now=True)
twisted.py 文件源码 项目:kotori 作者: daq-tools 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def subscribe(self, *args):
        #d = self.protocol.subscribe("foo/bar/baz", 0)
        log.info(u"Subscribing to topics {subscriptions}. protocol={protocol}", subscriptions=self.subscriptions, protocol=self.protocol)
        for topic in self.subscriptions:
            log.info(u"Subscribing to topic '{topic}'", topic=topic)
            # Topic name **must not** be unicode, so casting to string
            e = self.protocol.subscribe(str(topic), 0)

        log.info(u"Setting callback handler: {callback}", callback=self.callback)
        self.protocol.setPublishHandler(self.on_message_twisted)
        """
        def cb(*args, **kwargs):
            log.info('publishHandler got called: name={name}, args={args}, kwargs={kwargs}', name=self.name, args=args, kwargs=kwargs)
            return reactor.callFromThread(self.callback, *args, **kwargs)
        self.protocol.setPublishHandler(cb)
        """
grpc_client.py 文件源码 项目:voltha 作者: opencord 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def start_packet_in_stream(self):

        def receive_packet_in_stream():
            streaming_rpc_method = self.local_stub.ReceivePacketsIn
            iterator = streaming_rpc_method(empty_pb2.Empty())
            try:
                for packet_in in iterator:
                    reactor.callFromThread(self.packet_in_queue.put,
                                           packet_in)
                    log.debug('enqued-packet-in',
                              packet_in=packet_in,
                              queue_len=len(self.packet_in_queue.pending))
            except _Rendezvous, e:
                if e.code() == StatusCode.UNAVAILABLE:
                    os.system("kill -15 {}".format(os.getpid()))

        reactor.callInThread(receive_packet_in_stream)
grpc_client.py 文件源码 项目:voltha 作者: opencord 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def start_change_event_in_stream(self):

        def receive_change_events():
            streaming_rpc_method = self.local_stub.ReceiveChangeEvents
            iterator = streaming_rpc_method(empty_pb2.Empty())
            try:
                for event in iterator:
                    reactor.callFromThread(self.change_event_queue.put, event)
                    log.debug('enqued-change-event',
                              change_event=event,
                              queue_len=len(self.change_event_queue.pending))
            except _Rendezvous, e:
                if e.code() == StatusCode.UNAVAILABLE:
                    os.system("kill -15 {}".format(os.getpid()))

        reactor.callInThread(receive_change_events)
frameio.py 文件源码 项目:voltha 作者: opencord 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def recv(self):
        """Called on the select thread when a packet arrives"""
        try:
            frame = self.rcv_frame()
        except RuntimeError as e:
            # we observed this happens sometimes right after the socket was
            # attached to a newly created veth interface. So we log it, but
            # allow to continue.
            log.warn('afpacket-recv-error', code=-1)
            return

        log.debug('frame-received', iface=self.iface_name, len=len(frame),
                  hex=hexify(frame))
        self.received +=1
        dispatched = False
        for proxy in self.proxies:
            if proxy.filter is None or proxy.filter(frame):
                log.debug('frame-dispatched')
                dispatched = True
                reactor.callFromThread(self._dispatch, proxy, frame)

        if not dispatched:
            self.discarded += 1
            log.debug('frame-discarded')
core.py 文件源码 项目:asgi_rabbitmq 作者: proofit404 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def receive_twisted(self, channels):
            """Twisted-native implementation of receive."""

            deferred = defer.Deferred()

            def resolve_deferred(future):

                reactor.callFromThread(deferred.callback, future.result())

            future = self.thread.twisted_schedule(RECEIVE_TWISTED, channels)
            future.add_done_callback(resolve_deferred)
            defer.returnValue((yield deferred))


# TODO: Is it optimal to read bytes from content frame, call python
# decode method to convert it to string and than parse it with
# msgpack?  We should minimize useless work on message receive.
strategy.py 文件源码 项目:frontera-docs-zh_CN 作者: xsren 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def work(self):
        batch, consumed = self.collect_batch()
        self.states_context.fetch()
        self.process_batch(batch)
        self.update_score.flush()
        self.states_context.release()

        # Exiting, if crawl is finished
        if self.strategy.finished():
            logger.info("Successfully reached the crawling goal.")
            logger.info("Closing crawling strategy.")
            self.strategy.close()
            logger.info("Finishing.")
            reactor.callFromThread(reactor.stop)

        self.stats['last_consumed'] = consumed
        self.stats['last_consumption_run'] = asctime()
        self.stats['consumed_since_start'] += consumed
test_threads.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_callFromThread(self):
        """
        Test callFromThread functionality: from the main thread, and from
        another thread.
        """
        def cb(ign):
            firedByReactorThread = defer.Deferred()
            firedByOtherThread = defer.Deferred()

            def threadedFunc():
                reactor.callFromThread(firedByOtherThread.callback, None)

            reactor.callInThread(threadedFunc)
            reactor.callFromThread(firedByReactorThread.callback, None)

            return defer.DeferredList(
                [firedByReactorThread, firedByOtherThread],
                fireOnOneErrback=True)
        return self._waitForThread().addCallback(cb)
test_threads.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_wakerOverflow(self):
        """
        Try to make an overflow on the reactor waker using callFromThread.
        """
        def cb(ign):
            self.failure = None
            waiter = threading.Event()
            def threadedFunction():
                # Hopefully a hundred thousand queued calls is enough to
                # trigger the error condition
                for i in xrange(100000):
                    try:
                        reactor.callFromThread(lambda: None)
                    except:
                        self.failure = failure.Failure()
                        break
                waiter.set()
            reactor.callInThread(threadedFunction)
            waiter.wait(120)
            if not waiter.isSet():
                self.fail("Timed out waiting for event")
            if self.failure is not None:
                return defer.fail(self.failure)
        return self._waitForThread().addCallback(cb)
manager.py 文件源码 项目:txasgiresource 作者: JohnDoee 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _puller(self):
        logger.debug('Starting puller loop')
        while True:
            if not reactor.running or self._stop:
                logger.debug('Puller loop dying')
                reactor.callFromThread(self.stopped.callback, None)
                return

            channels = [self.send_channel] + list(self._pull_channels)
            if not channels:
                time.sleep(0.05)
                continue

            channel, message = self.channel_layer.receive(channels, block=False)
            if not channel:
                time.sleep(0.01)
                continue
            logger.debug('We got message on channel: %s' % (channel, ))

            reactor.callFromThread(self.handle_reply, channel, message)
data_router.py 文件源码 项目:rasa_nlu 作者: RasaHQ 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def deferred_from_future(future):
    """Converts a concurrent.futures.Future object to a twisted.internet.defer.Deferred obejct.
    See: https://twistedmatrix.com/pipermail/twisted-python/2011-January/023296.html
    """
    d = Deferred()

    def callback(future):
        e = future.exception()
        if e:
            if DEFERRED_RUN_IN_REACTOR_THREAD:
                reactor.callFromThread(d.errback, e)
            else:
                d.errback(e)
        else:
            if DEFERRED_RUN_IN_REACTOR_THREAD:
                reactor.callFromThread(d.callback, future.result())
            else:
                d.callback(future.result())

    future.add_done_callback(callback)
    return d
__init__.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def execute_from_command_line():
    # Limit concurrency in all thread-pools to ONE.
    from maasserver.utils import threads
    threads.install_default_pool(maxthreads=1)
    threads.install_database_unpool(maxthreads=1)
    # Disable all database connections in the reactor.
    from maasserver.utils import orm
    from twisted.internet import reactor
    assert not reactor.running, "The reactor has been started too early."
    reactor.callFromThread(orm.disable_all_database_connections)
    # Configure logging; Django is no longer responsible for this. Behave as
    # if we're always at an interactive terminal (i.e. do not wrap stdout or
    # stderr with log machinery).
    from provisioningserver import logger
    logger.configure(mode=logger.LoggingMode.COMMAND)
    # Hand over to Django.
    from django.core import management
    management.execute_from_command_line()
data_router.py 文件源码 项目:Rasa_NLU_Chi 作者: crownpku 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def deferred_from_future(future):
    """Converts a concurrent.futures.Future object to a twisted.internet.defer.Deferred obejct.
    See: https://twistedmatrix.com/pipermail/twisted-python/2011-January/023296.html
    """
    d = Deferred()

    def callback(future):
        e = future.exception()
        if e:
            if DEFERRED_RUN_IN_REACTOR_THREAD:
                reactor.callFromThread(d.errback, e)
            else:
                d.errback(e)
        else:
            if DEFERRED_RUN_IN_REACTOR_THREAD:
                reactor.callFromThread(d.callback, future.result())
            else:
                d.callback(future.result())

    future.add_done_callback(callback)
    return d
AutoTimerResource.py 文件源码 项目:enigma2-plugins 作者: opendreambox 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run(self):
        req = self._req
        code = http.OK
        try: ret = self._fnc(req)
        except Exception as e:
            ret = str(e)
            code = http.INTERNAL_SERVER_ERROR
        def finishRequest():
            req.setResponseCode(code)
            if code == http.OK:
                req.setHeader('Content-type', 'application/xhtml+xml')
            req.setHeader('charset', 'UTF-8')
            req.write(ret)
            req.finish()
        if self._stillAlive:
            reactor.callFromThread(finishRequest)
AutoTimerResource.py 文件源码 项目:enigma2-plugins 作者: opendreambox 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def render(self, req):
        self._req = req
        self._stillAlive = True
        if hasattr(req, 'notifyFinish'):
            req.notifyFinish().addErrback(self.connectionLost)

        d = autotimer.parseEPGAsync().addCallback(self.epgCallback).addErrback(self.epgErrback)
        def timeout():
            if not d.called and self._stillAlive:
                reactor.callFromThread(lambda: req.write("<ignore />"))
                reactor.callLater(50, timeout)
        reactor.callLater(50, timeout)

        req.setResponseCode(http.OK)
        req.setHeader('Content-type', 'application/xhtml+xml')
        req.setHeader('charset', 'UTF-8')
        req.write("""<?xml version=\"1.0\" encoding=\"UTF-8\" ?><e2simplexmlresult>""")
        return server.NOT_DONE_YET
AutoTimerResource.py 文件源码 项目:enigma2-plugins 作者: opendreambox 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run(self):
        req = self._req
        if self._stillAlive:
            req.setResponseCode(http.OK)
            req.setHeader('Content-type', 'application/xhtml+xml')
            req.setHeader('charset', 'UTF-8')
            reactor.callFromThread(lambda: req.write("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>\n<e2autotimersimulate api_version=\"" + str(API_VERSION) + "\">\n"))

        def finishRequest():
            req.write('</e2autotimersimulate>')
            req.finish()

        try: autotimer.parseEPG(simulateOnly=True, callback=self.intermediateWrite)
        except Exception as e:
            def finishRequest():
                req.write('<exception>'+str(e)+'</exception><|PURPOSEFULLYBROKENXML<')
                req.finish()

        if self._stillAlive:
            reactor.callFromThread(finishRequest)
twistedreactor.py 文件源码 项目:python-dse-driver 作者: datastax 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        """
        Initialization method.

        Note that we can't call reactor methods directly here because
        it's not thread-safe, so we schedule the reactor/connection
        stuff to be run from the event loop thread when it gets the
        chance.
        """
        Connection.__init__(self, *args, **kwargs)

        self.is_closed = True
        self.connector = None

        reactor.callFromThread(self.add_connection)
        self._loop.maybe_start()
twistedreactor.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _cleanup(self):
        if self._thread:
            reactor.callFromThread(reactor.stop)
            self._thread.join(timeout=1.0)
            if self._thread.is_alive():
                log.warning("Event loop thread could not be joined, so "
                            "shutdown may not be clean. Please call "
                            "Cluster.shutdown() to avoid this.")
            log.debug("Event loop thread was joined")
twistedreactor.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def add_timer(self, timer):
        self._timers.add_timer(timer)
        # callFromThread to schedule from the loop thread, where
        # the timeout task can safely be modified
        reactor.callFromThread(self._schedule_timeout, timer.end)
twistedreactor.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def push(self, data):
        """
        This function is called when outgoing data should be queued
        for sending.

        Note that we can't call transport.write() directly because
        it is not thread-safe, so we schedule it to run from within
        the event loop when it gets the chance.
        """
        reactor.callFromThread(self.connector.transport.write, data)


问题


面经


文章

微信
公众号

扫码关注公众号