def callAsync(event, allowed_again=10, func=None, *args, **kwargs):
if isAllowed(event, allowed_again): # Not called recently, call it now
called(event)
# print "Calling now"
return gevent.spawn(func, *args, **kwargs)
else: # Called recently, schedule it for later
time_left = allowed_again - max(0, time.time() - called_db[event])
log.debug("Added to queue (%.2fs left): %s " % (time_left, event))
if not queue_db.get(event): # Function call not queued yet
thread = gevent.spawn_later(time_left, lambda: callQueue(event)) # Call this function later
queue_db[event] = (func, args, kwargs, thread)
return thread
else: # Function call already queued, just update the parameters
thread = queue_db[event][3]
queue_db[event] = (func, args, kwargs, thread)
return thread
# Rate limit and delay function call if needed
# Return: Wait for execution/delay then return value
python类spawn_later()的实例源码
def updateEndpoints( endpointActors, nextUpdate ):
global currentEndpoints
endpointActors.forceRefresh()
responses = endpointActors.requestFromAll( 'report' )
newEndpoints = Set()
while responses.waitForResults( timeout = 10 ):
for response in responses.getNewResults():
if response.isSuccess and 'address' in response.data and 'port' in response.data:
newEndpoints.add( ( response.data[ 'address' ], response.data[ 'port' ] ) )
if responses.isFinished(): break
currentEndpoints = newEndpoints
tmpUpdate = nextUpdate
if 0 == len( currentEndpoints ):
tmpUpdate = 5
print( "Updated list of endpoints, found %s" % len( currentEndpoints ) )
gevent.spawn_later( tmpUpdate, updateEndpoints, endpointActors, nextUpdate )
def pollBackendAvailability( isOneOff = True ):
global IS_BACKEND_AVAILABLE
aid = AgentId( '0.0.0.0.0' )
aid.org_id = ADMIN_OID
res = model.request( 'list_sensors', { 'aid' : aid }, timeout = 2 )
res2 = identmanager.request( 'get_org_info', { 'include_all' : True } )
if res.isSuccess and res2.isSuccess:
IS_BACKEND_AVAILABLE = True
print( 'Backend available' )
if not isOneOff:
gevent.spawn_later( 10, pollBackendAvailability, isOneOff = False )
else:
IS_BACKEND_AVAILABLE = False
print( 'Backend unavailable' )
if not isOneOff:
gevent.spawn_later( 2, pollBackendAvailability, isOneOff = False )
def restartTimers(self):
now = time.time()
for key, alert in self.rediscl.hgetall('alerts').items():
alert = self.getAlert(key)
if alert['state'] in ('RESOLVED', 'UNRESOLVED'):
self.rediscl.hdel('alerts', key)
else:
alerttime = self.getStateTime(alert)
if not alerttime:
self.rediscl.hdel('alerts', key)
continue
epoch = alert['epoch'] or alert['lasttime']
remainingtime = (epoch + alerttime) - now
if remainingtime > 0:
self.logger.info("Schedule escalation in %ss for state %s" % (remainingtime, alert['state']))
self.timers[alert['guid']] = gevent.spawn_later(remainingtime, self.escalateHigher, alert)
else:
self.escalateHigher(alert)
def _quiesce(self, environ, bypass_auth=False):
"""Set service state to quiesced and shed existing connections."""
if not bypass_auth and not self._authorized_to_quiesce(environ):
raise UnauthorizedError
# Delay shedding to allow service deregistration after quiescing
shed_delay_secs = 30
if not self.quiesced:
self.quiesced = True
total_conns = len(self.connections)
# Note: There's still a small chance that we miss connections
# that came in before we set to quiesced but are
# still being established.
conns = self.connections.copy()
# Shed shed_rate_per_sec connections every second
# after service deregistration delay.
cur_iter_sec = 0
for remaining in xrange(total_conns, 0, -self.shed_rate_per_sec):
cur_iter_sec += 1
# Check if fewer than shed_rate_per_sec conns left
# in set so there's no over-popping.
if remaining >= self.shed_rate_per_sec:
num_conns = self.shed_rate_per_sec
else:
num_conns = remaining
gevent.spawn_later(cur_iter_sec + shed_delay_secs,
self._shed_connections,
[conns.pop() for j in xrange(num_conns)])
# Terminate the service after shedding
termination_delay_secs = 10
gevent.spawn_later(shed_delay_secs + cur_iter_sec +
termination_delay_secs,
self._shutdown)
def schedule(delay, func, *args, **kw_args):
""" Spawns a greenlet with args periodically """
gevent.spawn_later(0, func, *args, **kw_args)
gevent.spawn_later(delay, schedule, delay, func, *args, **kw_args)
def test_timeout(self):
a,b = self.create_bound_pair()
g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
timeout = gevent.Timeout(0.1)
timeout.start()
self.assertRaises(gevent.Timeout, b.recv)
g.kill()
def send(self, sender, host_port, bytes_):
self.track_send(sender, host_port, bytes_)
receive_end = self.transports[host_port].receive
gevent.spawn_later(0.00000000001, receive_end, bytes_)
def test_spawn_later_greenlet(self):
# a greenlet will have a context if the tracer is used even
# if it's spawned later
def greenlet():
self.tracer.get_call_context()
g = gevent.spawn_later(0.01, greenlet)
g.join()
ctx = getattr(g, '__datadog_context', None)
ok_(ctx is not None)
eq_(0, len(ctx._trace))
def test_trace_later_greenlet(self):
# a greenlet can be traced using the trace API
def greenlet():
with self.tracer.trace('greenlet') as span:
span.resource = 'base'
gevent.spawn_later(0.01, greenlet).join()
traces = self.tracer.writer.pop_traces()
eq_(1, len(traces))
eq_(1, len(traces[0]))
eq_('greenlet', traces[0][0].name)
eq_('base', traces[0][0].resource)
def test_trace_spawn_later_multiple_greenlets_multiple_traces(self):
# multiple greenlets must be part of the same trace
def entrypoint():
with self.tracer.trace('greenlet.main') as span:
span.resource = 'base'
jobs = [gevent.spawn_later(0.01, green_1), gevent.spawn_later(0.01, green_2)]
gevent.joinall(jobs)
def green_1():
with self.tracer.trace('greenlet.worker') as span:
span.set_tag('worker_id', '1')
gevent.sleep(0.01)
def green_2():
with self.tracer.trace('greenlet.worker') as span:
span.set_tag('worker_id', '2')
gevent.sleep(0.01)
gevent.spawn(entrypoint).join()
traces = self.tracer.writer.pop_traces()
eq_(3, len(traces))
eq_(1, len(traces[0]))
parent_span = traces[2][0]
worker_1 = traces[0][0]
worker_2 = traces[1][0]
# check spans data and hierarchy
eq_(parent_span.name, 'greenlet.main')
eq_(parent_span.resource, 'base')
eq_(worker_1.get_tag('worker_id'), '1')
eq_(worker_1.name, 'greenlet.worker')
eq_(worker_1.resource, 'greenlet.worker')
eq_(worker_1.parent_id, parent_span.span_id)
eq_(worker_2.get_tag('worker_id'), '2')
eq_(worker_2.name, 'greenlet.worker')
eq_(worker_2.resource, 'greenlet.worker')
eq_(worker_2.parent_id, parent_span.span_id)
def test_trace_concurrent_spawn_later_calls(self):
# create multiple futures so that we expect multiple
# traces instead of a single one, even if greenlets
# are delayed
def greenlet():
with self.tracer.trace('greenlet'):
gevent.sleep(0.01)
jobs = [gevent.spawn_later(0.01, greenlet) for x in range(100)]
gevent.joinall(jobs)
traces = self.tracer.writer.pop_traces()
eq_(100, len(traces))
eq_(1, len(traces[0]))
eq_('greenlet', traces[0][0].name)
def _replace(g_class):
"""
Utility function that replace the gevent Greenlet class with the given one.
"""
# replace the original Greenlet class with the new one
gevent.greenlet.Greenlet = g_class
# replace gevent shortcuts
gevent.Greenlet = gevent.greenlet.Greenlet
gevent.spawn = gevent.greenlet.Greenlet.spawn
gevent.spawn_later = gevent.greenlet.Greenlet.spawn_later
def test_timeout(self):
a,b = self.create_bound_pair()
g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
timeout = gevent.Timeout(0.1)
timeout.start()
self.assertRaises(gevent.Timeout, b.recv)
g.kill()
def test_timeout(self):
a,b = self.create_bound_pair()
g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
timeout = gevent.Timeout(0.1)
timeout.start()
self.assertRaises(gevent.Timeout, b.recv)
g.kill()
def test_timeout(self):
a,b = self.create_bound_pair()
g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
timeout = gevent.Timeout(0.1)
timeout.start()
self.assertRaises(gevent.Timeout, b.recv)
g.kill()
def test_timeout(self):
a,b = self.create_bound_pair()
g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
timeout = gevent.Timeout(0.1)
timeout.start()
self.assertRaises(gevent.Timeout, b.recv)
g.kill()
def test_timeout(self):
a,b = self.create_bound_pair()
g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
timeout = gevent.Timeout(0.1)
timeout.start()
self.assertRaises(gevent.Timeout, b.recv)
g.kill()
def spawn_later(self, delay, *args, **kwargs):
return self.spawn_wrap(functools.partial(gevent.spawn_later, delay), *args, **kwargs)
def _connect( self ):
try:
self._socket = gevent.ssl.wrap_socket( gevent.socket.socket( gevent.socket.AF_INET,
gevent.socket.SOCK_STREAM ),
cert_reqs = gevent.ssl.CERT_NONE )
self._socket.connect( ( self._destServer, self._destPort ) )
self._log( "Connected" )
headers = rSequence()
headers.addSequence( Symbols.base.HCP_IDENT, AgentId( ( self._oid, self._iid, self._sid, self._plat, self._arch ) ).toJson() )
headers.addStringA( Symbols.base.HOST_NAME, hashlib.md5( str( self._sid ) ).hexdigest() )
headers.addIpv4( Symbols.base.IP_ADDRESS, "%d.%d.%d.%d" % ( random.randint( 0, 254 ),
random.randint( 0, 254 ),
random.randint( 0, 254 ),
random.randint( 0, 254 ) ) )
if self._enrollmentToken is not None:
headers.addBuffer( Symbols.hcp.ENROLLMENT_TOKEN, self._enrollmentToken )
self._sendFrame( HcpModuleId.HCP, [ headers ], timeout = 30, isNotHbs = True )
self._log( "Handshake sent" )
self._threads.add( gevent.spawn( self._recvThread ) )
self._threads.add( gevent.spawn_later( 1, self._syncHcpThread ) )
self._threads.add( gevent.spawn_later( 10, self._syncHbsThread ) )
self._threads.add( gevent.spawn_later( 2, lambda: self._connectedEvent.set() ) )
return True
except:
self._log( "Failed to connect over TLS: %s" % traceback.format_exc() )
return False
def _syncHbsThread( self ):
self._doHbsSync()
self._threads.add( gevent.spawn_later( 60 * 5, self._syncHbsThread ) )
def _syncHcpThread( self ):
self._doHcpSync()
self._threads.add( gevent.spawn_later( 60 * 10, self._syncHcpThread ) )
###########################################################################
# SEND AND RECEIVE DATA STUFF
###########################################################################
def _generateEvent( self, everyNSeconds, eventGenerator, plusOrMinusNSeconds, upToNEvents ):
if self._connectedEvent.wait( 0 ):
if upToNEvents is None or 0 != upToNEvents:
if upToNEvents is not None:
upToNEvents -= 1
try:
messages = next( eventGenerator )
except StopIteration:
self._log( "Scheduled event generator failed to generate, ignoring it in the future." )
return
else:
return
if type( messages ) not in ( tuple, list ):
messages = ( messages, )
self._sendFrame( HcpModuleId.HBS, messages, timeout = 30 )
if not self._stopEvent.wait( 0 ):
nextEvent = everyNSeconds
if 0 != plusOrMinusNSeconds:
nextEvent += random.randint( -plusOrMinusNSeconds, plusOrMinusNSeconds )
if 0 > nextEvent:
nextEvent = 0
self._threads.add( gevent.spawn_later( nextEvent, self._generateEvent, everyNSeconds, eventGenerator, plusOrMinusNSeconds, upToNEvents ) )
###########################################################################
# PUBLIC FUNCTIONALITY
###########################################################################
def scheduleEvent( self, everyNSeconds, eventGenerator, plusOrMinusNSeconds = 0, upToNEvents = None ):
self._threads.add( gevent.spawn_later( 0, self._generateEvent, everyNSeconds, eventGenerator, plusOrMinusNSeconds, upToNEvents ) )
###########################################################################
# MAIN
###########################################################################
def __init__( self, maxQps, cbLog = None ):
self._maxQps = maxQps
self._q = gevent.queue.Queue()
self._log = cbLog
self._transmitted = 0
self._lastWait = time.time()
self._isRunning = True
self._threads = gevent.pool.Group()
self._threads.add( gevent.spawn_later( 0, self._sendLoop ) )
self._threads.add( gevent.spawn_later( 1, self._resetStats ) )
def _resetStats( self ):
if self._isRunning:
self._transmitted = 0
self._threads.add( gevent.spawn_later( 1, self._resetStats ) )
def pollOutageState():
global IS_OUTAGE_ON
info = deployment.request( 'get_global_config', {} )
if info.isSuccess:
IS_OUTAGE_ON = False if info.data[ 'global/outagestate' ] == '0' else info.data[ 'global/outagetext' ]
gevent.spawn_later( 30, pollOutageState )
def spawn_later(self, seconds, function, *args, **kwargs):
"""Spawn a new gevent greenlet later."""
return gevent.spawn_later(seconds, function, *args, **kwargs)
def on_datamodel_in_sync(self):
if not self._cleanup_done:
# Datamodel in sync for the first time. Give the managers some
# time to finish processing, then trigger cleanup.
self._cleanup_done = True
_log.info("No cleanup scheduled, scheduling one.")
gevent.spawn_later(self.config.STARTUP_CLEANUP_DELAY,
functools.partial(self._do_cleanup,
async=True))
self._cleanup_done = True
def on_datamodel_in_sync(self):
if not self._cleanup_done:
# Datamodel in sync for the first time. Give the managers some
# time to finish processing, then trigger cleanup.
self._cleanup_done = True
_log.info("No cleanup scheduled, scheduling one.")
gevent.spawn_later(self.config.STARTUP_CLEANUP_DELAY,
functools.partial(self._do_cleanup,
async=True))
self._cleanup_done = True