python类spawn_later()的实例源码

RateLimit.py 文件源码 项目:zeronet-debian 作者: bashrc 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def callAsync(event, allowed_again=10, func=None, *args, **kwargs):
    if isAllowed(event, allowed_again):  # Not called recently, call it now
        called(event)
        # print "Calling now"
        return gevent.spawn(func, *args, **kwargs)
    else:  # Called recently, schedule it for later
        time_left = allowed_again - max(0, time.time() - called_db[event])
        log.debug("Added to queue (%.2fs left): %s " % (time_left, event))
        if not queue_db.get(event):  # Function call not queued yet
            thread = gevent.spawn_later(time_left, lambda: callQueue(event))  # Call this function later
            queue_db[event] = (func, args, kwargs, thread)
            return thread
        else:  # Function call already queued, just update the parameters
            thread = queue_db[event][3]
            queue_db[event] = (func, args, kwargs, thread)
            return thread


# Rate limit and delay function call if needed
# Return: Wait for execution/delay then return value
endpoint_proxy.py 文件源码 项目:lc_cloud 作者: refractionPOINT 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def updateEndpoints( endpointActors, nextUpdate ):
    global currentEndpoints
    endpointActors.forceRefresh()
    responses = endpointActors.requestFromAll( 'report' )

    newEndpoints = Set()
    while responses.waitForResults( timeout = 10 ):
        for response in responses.getNewResults():
            if response.isSuccess and 'address' in response.data and 'port' in response.data:
                newEndpoints.add( ( response.data[ 'address' ], response.data[ 'port' ] ) )
        if responses.isFinished(): break

    currentEndpoints = newEndpoints

    tmpUpdate = nextUpdate
    if 0 == len( currentEndpoints ):
        tmpUpdate = 5

    print( "Updated list of endpoints, found %s" % len( currentEndpoints ) )
    gevent.spawn_later( tmpUpdate, updateEndpoints, endpointActors, nextUpdate )
app.py 文件源码 项目:lc_cloud 作者: refractionPOINT 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def pollBackendAvailability( isOneOff = True ):
    global IS_BACKEND_AVAILABLE
    aid = AgentId( '0.0.0.0.0' )
    aid.org_id = ADMIN_OID
    res = model.request( 'list_sensors', { 'aid' : aid }, timeout = 2 )
    res2 = identmanager.request( 'get_org_info', { 'include_all' : True } )
    if res.isSuccess and res2.isSuccess:
        IS_BACKEND_AVAILABLE = True
        print( 'Backend available' )
        if not isOneOff:
            gevent.spawn_later( 10, pollBackendAvailability, isOneOff = False )
    else:
        IS_BACKEND_AVAILABLE = False
        print( 'Backend unavailable' )
        if not isOneOff:
            gevent.spawn_later( 2, pollBackendAvailability, isOneOff = False )
AlertService.py 文件源码 项目:lib9 作者: Jumpscale 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def restartTimers(self):
        now = time.time()
        for key, alert in self.rediscl.hgetall('alerts').items():
            alert = self.getAlert(key)
            if alert['state'] in ('RESOLVED', 'UNRESOLVED'):
                self.rediscl.hdel('alerts', key)
            else:
                alerttime = self.getStateTime(alert)
                if not alerttime:
                    self.rediscl.hdel('alerts', key)
                    continue
                epoch = alert['epoch'] or alert['lasttime']
                remainingtime = (epoch + alerttime) - now
                if remainingtime > 0:
                    self.logger.info("Schedule escalation in %ss for state %s" % (remainingtime, alert['state']))
                    self.timers[alert['guid']] = gevent.spawn_later(remainingtime, self.escalateHigher, alert)
                else:
                    self.escalateHigher(alert)
socketserver.py 文件源码 项目:reddit-service-websockets 作者: reddit 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _quiesce(self, environ, bypass_auth=False):
        """Set service state to quiesced and shed existing connections."""
        if not bypass_auth and not self._authorized_to_quiesce(environ):
            raise UnauthorizedError

        # Delay shedding to allow service deregistration after quiescing
        shed_delay_secs = 30

        if not self.quiesced:
            self.quiesced = True
            total_conns = len(self.connections)
            # Note: There's still a small chance that we miss connections
            #   that came in before we set to quiesced but are
            #   still being established.
            conns = self.connections.copy()

            # Shed shed_rate_per_sec connections every second
            #   after service deregistration delay.
            cur_iter_sec = 0
            for remaining in xrange(total_conns, 0, -self.shed_rate_per_sec):
                cur_iter_sec += 1
                # Check if fewer than shed_rate_per_sec conns left
                #   in set so there's no over-popping.
                if remaining >= self.shed_rate_per_sec:
                    num_conns = self.shed_rate_per_sec
                else:
                    num_conns = remaining
                gevent.spawn_later(cur_iter_sec + shed_delay_secs,
                                   self._shed_connections,
                                   [conns.pop() for j in xrange(num_conns)])

            # Terminate the service after shedding
            termination_delay_secs = 10
            gevent.spawn_later(shed_delay_secs + cur_iter_sec +
                               termination_delay_secs,
                               self._shutdown)
util.py 文件源码 项目:Pyrlang 作者: esl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def schedule(delay, func, *args, **kw_args):
    """ Spawns a greenlet with args periodically """
    gevent.spawn_later(0, func, *args, **kw_args)
    gevent.spawn_later(delay, schedule, delay, func, *args, **kw_args)
test_socket.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 64 收藏 0 点赞 0 评论 0
def test_timeout(self):
            a,b = self.create_bound_pair()
            g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
            timeout = gevent.Timeout(0.1)
            timeout.start()
            self.assertRaises(gevent.Timeout, b.recv)
            g.kill()
transport.py 文件源码 项目:raiden 作者: raiden-network 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def send(self, sender, host_port, bytes_):
        self.track_send(sender, host_port, bytes_)
        receive_end = self.transports[host_port].receive
        gevent.spawn_later(0.00000000001, receive_end, bytes_)
test_tracer.py 文件源码 项目:dd-trace-py 作者: DataDog 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_spawn_later_greenlet(self):
        # a greenlet will have a context if the tracer is used even
        # if it's spawned later
        def greenlet():
            self.tracer.get_call_context()

        g = gevent.spawn_later(0.01, greenlet)
        g.join()
        ctx = getattr(g, '__datadog_context', None)
        ok_(ctx is not None)
        eq_(0, len(ctx._trace))
test_tracer.py 文件源码 项目:dd-trace-py 作者: DataDog 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_trace_later_greenlet(self):
        # a greenlet can be traced using the trace API
        def greenlet():
            with self.tracer.trace('greenlet') as span:
                span.resource = 'base'

        gevent.spawn_later(0.01, greenlet).join()
        traces = self.tracer.writer.pop_traces()
        eq_(1, len(traces))
        eq_(1, len(traces[0]))
        eq_('greenlet', traces[0][0].name)
        eq_('base', traces[0][0].resource)
test_tracer.py 文件源码 项目:dd-trace-py 作者: DataDog 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_trace_spawn_later_multiple_greenlets_multiple_traces(self):
        # multiple greenlets must be part of the same trace
        def entrypoint():
            with self.tracer.trace('greenlet.main') as span:
                span.resource = 'base'
                jobs = [gevent.spawn_later(0.01, green_1), gevent.spawn_later(0.01, green_2)]
                gevent.joinall(jobs)

        def green_1():
            with self.tracer.trace('greenlet.worker') as span:
                span.set_tag('worker_id', '1')
                gevent.sleep(0.01)

        def green_2():
            with self.tracer.trace('greenlet.worker') as span:
                span.set_tag('worker_id', '2')
                gevent.sleep(0.01)

        gevent.spawn(entrypoint).join()
        traces = self.tracer.writer.pop_traces()
        eq_(3, len(traces))
        eq_(1, len(traces[0]))
        parent_span = traces[2][0]
        worker_1 = traces[0][0]
        worker_2 = traces[1][0]
        # check spans data and hierarchy
        eq_(parent_span.name, 'greenlet.main')
        eq_(parent_span.resource, 'base')
        eq_(worker_1.get_tag('worker_id'), '1')
        eq_(worker_1.name, 'greenlet.worker')
        eq_(worker_1.resource, 'greenlet.worker')
        eq_(worker_1.parent_id, parent_span.span_id)
        eq_(worker_2.get_tag('worker_id'), '2')
        eq_(worker_2.name, 'greenlet.worker')
        eq_(worker_2.resource, 'greenlet.worker')
        eq_(worker_2.parent_id, parent_span.span_id)
test_tracer.py 文件源码 项目:dd-trace-py 作者: DataDog 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_trace_concurrent_spawn_later_calls(self):
        # create multiple futures so that we expect multiple
        # traces instead of a single one, even if greenlets
        # are delayed
        def greenlet():
            with self.tracer.trace('greenlet'):
                gevent.sleep(0.01)

        jobs = [gevent.spawn_later(0.01, greenlet) for x in range(100)]
        gevent.joinall(jobs)

        traces = self.tracer.writer.pop_traces()
        eq_(100, len(traces))
        eq_(1, len(traces[0]))
        eq_('greenlet', traces[0][0].name)
patch.py 文件源码 项目:dd-trace-py 作者: DataDog 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def _replace(g_class):
    """
    Utility function that replace the gevent Greenlet class with the given one.
    """
    # replace the original Greenlet class with the new one
    gevent.greenlet.Greenlet = g_class

    # replace gevent shortcuts
    gevent.Greenlet = gevent.greenlet.Greenlet
    gevent.spawn = gevent.greenlet.Greenlet.spawn
    gevent.spawn_later = gevent.greenlet.Greenlet.spawn_later
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_timeout(self):
            a,b = self.create_bound_pair()
            g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
            timeout = gevent.Timeout(0.1)
            timeout.start()
            self.assertRaises(gevent.Timeout, b.recv)
            g.kill()
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_timeout(self):
            a,b = self.create_bound_pair()
            g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
            timeout = gevent.Timeout(0.1)
            timeout.start()
            self.assertRaises(gevent.Timeout, b.recv)
            g.kill()
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_timeout(self):
            a,b = self.create_bound_pair()
            g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
            timeout = gevent.Timeout(0.1)
            timeout.start()
            self.assertRaises(gevent.Timeout, b.recv)
            g.kill()
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_timeout(self):
            a,b = self.create_bound_pair()
            g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
            timeout = gevent.Timeout(0.1)
            timeout.start()
            self.assertRaises(gevent.Timeout, b.recv)
            g.kill()
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_timeout(self):
            a,b = self.create_bound_pair()
            g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
            timeout = gevent.Timeout(0.1)
            timeout.start()
            self.assertRaises(gevent.Timeout, b.recv)
            g.kill()
plugin.py 文件源码 项目:disco 作者: b1naryth1ef 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def spawn_later(self, delay, *args, **kwargs):
        return self.spawn_wrap(functools.partial(gevent.spawn_later, delay), *args, **kwargs)
VirtualSensor.py 文件源码 项目:lc_cloud 作者: refractionPOINT 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _connect( self ):
        try:
            self._socket = gevent.ssl.wrap_socket( gevent.socket.socket( gevent.socket.AF_INET, 
                                                                 gevent.socket.SOCK_STREAM ), 
                                           cert_reqs = gevent.ssl.CERT_NONE )
            self._socket.connect( ( self._destServer, self._destPort ) )
            self._log( "Connected" )
            headers = rSequence()
            headers.addSequence( Symbols.base.HCP_IDENT, AgentId( ( self._oid, self._iid, self._sid, self._plat, self._arch ) ).toJson() )
            headers.addStringA( Symbols.base.HOST_NAME, hashlib.md5( str( self._sid ) ).hexdigest() )
            headers.addIpv4( Symbols.base.IP_ADDRESS, "%d.%d.%d.%d" % ( random.randint( 0, 254 ), 
                                                                        random.randint( 0, 254 ), 
                                                                        random.randint( 0, 254 ), 
                                                                        random.randint( 0, 254 ) ) )
            if self._enrollmentToken is not None:
                headers.addBuffer( Symbols.hcp.ENROLLMENT_TOKEN, self._enrollmentToken )
            self._sendFrame( HcpModuleId.HCP, [ headers ], timeout = 30, isNotHbs = True )
            self._log( "Handshake sent" )
            self._threads.add( gevent.spawn( self._recvThread ) )
            self._threads.add( gevent.spawn_later( 1, self._syncHcpThread ) )
            self._threads.add( gevent.spawn_later( 10, self._syncHbsThread ) )
            self._threads.add( gevent.spawn_later( 2, lambda: self._connectedEvent.set() ) )
            return True
        except:
            self._log( "Failed to connect over TLS: %s" % traceback.format_exc() )
            return False
VirtualSensor.py 文件源码 项目:lc_cloud 作者: refractionPOINT 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _syncHbsThread( self ):
        self._doHbsSync()
        self._threads.add( gevent.spawn_later( 60 * 5, self._syncHbsThread ) )
VirtualSensor.py 文件源码 项目:lc_cloud 作者: refractionPOINT 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _syncHcpThread( self ):
        self._doHcpSync()
        self._threads.add( gevent.spawn_later( 60 * 10, self._syncHcpThread ) )

    ###########################################################################
    #   SEND AND RECEIVE DATA STUFF
    ###########################################################################
VirtualSensor.py 文件源码 项目:lc_cloud 作者: refractionPOINT 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _generateEvent( self, everyNSeconds, eventGenerator, plusOrMinusNSeconds, upToNEvents ):
        if self._connectedEvent.wait( 0 ):
            if upToNEvents is None or 0 != upToNEvents:
                if upToNEvents is not None:
                    upToNEvents -= 1
                try:
                    messages = next( eventGenerator )
                except StopIteration:
                    self._log( "Scheduled event generator failed to generate, ignoring it in the future." )
                    return
            else:
                return

            if type( messages ) not in ( tuple, list ):
                messages = ( messages, )

            self._sendFrame( HcpModuleId.HBS, messages, timeout = 30 )

        if not self._stopEvent.wait( 0 ):
            nextEvent = everyNSeconds
            if 0 != plusOrMinusNSeconds:
                nextEvent += random.randint( -plusOrMinusNSeconds, plusOrMinusNSeconds )
            if 0 > nextEvent:
                nextEvent = 0
            self._threads.add( gevent.spawn_later( nextEvent, self._generateEvent, everyNSeconds, eventGenerator, plusOrMinusNSeconds, upToNEvents ) )

    ###########################################################################
    #   PUBLIC FUNCTIONALITY
    ###########################################################################
VirtualSensor.py 文件源码 项目:lc_cloud 作者: refractionPOINT 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def scheduleEvent( self, everyNSeconds, eventGenerator, plusOrMinusNSeconds = 0, upToNEvents = None ):
        self._threads.add( gevent.spawn_later( 0, self._generateEvent, everyNSeconds, eventGenerator, plusOrMinusNSeconds, upToNEvents ) )

    ###########################################################################
    #   MAIN
    ###########################################################################
hcp_helpers.py 文件源码 项目:lc_cloud 作者: refractionPOINT 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__( self, maxQps, cbLog = None ):
        self._maxQps = maxQps
        self._q = gevent.queue.Queue()
        self._log = cbLog
        self._transmitted = 0
        self._lastWait = time.time()
        self._isRunning = True
        self._threads = gevent.pool.Group()
        self._threads.add( gevent.spawn_later( 0, self._sendLoop ) )
        self._threads.add( gevent.spawn_later( 1, self._resetStats ) )
hcp_helpers.py 文件源码 项目:lc_cloud 作者: refractionPOINT 项目源码 文件源码 阅读 98 收藏 0 点赞 0 评论 0
def _resetStats( self ):
        if self._isRunning:
            self._transmitted = 0
            self._threads.add( gevent.spawn_later( 1, self._resetStats ) )
app.py 文件源码 项目:lc_cloud 作者: refractionPOINT 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def pollOutageState():
    global IS_OUTAGE_ON
    info = deployment.request( 'get_global_config', {} )
    if info.isSuccess:
        IS_OUTAGE_ON = False if info.data[ 'global/outagestate' ] == '0' else info.data[ 'global/outagetext' ]
    gevent.spawn_later( 30, pollOutageState )
gevent.py 文件源码 项目:django-rest-framework-reactive 作者: genialis 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def spawn_later(self, seconds, function, *args, **kwargs):
        """Spawn a new gevent greenlet later."""
        return gevent.spawn_later(seconds, function, *args, **kwargs)
splitter.py 文件源码 项目:felix 作者: axbaretto 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def on_datamodel_in_sync(self):
        if not self._cleanup_done:
            # Datamodel in sync for the first time.  Give the managers some
            # time to finish processing, then trigger cleanup.
            self._cleanup_done = True
            _log.info("No cleanup scheduled, scheduling one.")
            gevent.spawn_later(self.config.STARTUP_CLEANUP_DELAY,
                               functools.partial(self._do_cleanup,
                                                 async=True))
        self._cleanup_done = True
splitter.py 文件源码 项目:felix 作者: axbaretto 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def on_datamodel_in_sync(self):
        if not self._cleanup_done:
            # Datamodel in sync for the first time.  Give the managers some
            # time to finish processing, then trigger cleanup.
            self._cleanup_done = True
            _log.info("No cleanup scheduled, scheduling one.")
            gevent.spawn_later(self.config.STARTUP_CLEANUP_DELAY,
                               functools.partial(self._do_cleanup,
                                                 async=True))
        self._cleanup_done = True


问题


面经


文章

微信
公众号

扫码关注公众号