def makeTimer(self, alert):
greenlet = self.timers.get(alert['guid'])
if greenlet is not None:
scheduledalert = greenlet.args[0]
if scheduledalert['state'] != alert['state']:
self.logger.info("Removing schedule for alert %s" % scheduledalert['state'])
greenlet.kill()
else:
return
delay = self.getStateTime(alert)
if delay:
self.logger.info("Schedule escalation in %ss for state %s" % (delay, alert['state']))
self.timers[alert['guid']] = gevent.spawn_later(delay, self.escalateHigher, alert)
python类spawn_later()的实例源码
def do_sed(message):
if message.channel not in channels:
return
try:
regex, replacement, flags, target = parse_sed(message.content[1:])
except ValueError:
return
try:
c = re.compile(regex, flags & 127)
except re.error as e:
return
g = gevent.getcurrent()
def raiseKeyboardInterrupt(s, i):
print("timing out!", g)
gevent.spawn(message.reply, 'fk off with ur evil regex bro')
g.throw(gevent.GreenletExit)
# ## We install a signal handler, to timeout the regular expression match if it's taking too long, i.e. evil regexp
# ## s/^(a+)+$/rip/
old_sighandler = signal.signal(signal.SIGALRM, raiseKeyboardInterrupt)
signal.setitimer(signal.ITIMER_REAL, 0.05)
try:
m = c.search
q = channels[message.channel]
for i in range(-1, -len(q) - 1, -1):
nick, line = q[i]
if m(line) and (not target or nick.lower() == target):
q[i] = nick, doTranslation(c.sub(replacement, line, 0 if flags & 0x100 else 1)[:400], flags)
gevent.spawn_later(0, message.reply, '*%s*: %s' % (nick, q[i][1]))
break
except re.error as e:
return
finally:
### Restore original handlers.
signal.setitimer(signal.ITIMER_REAL, 0)
signal.signal(signal.SIGALRM, old_sighandler)
def _finish_msg_batch(self, batch, results):
if not self._config.REPORT_ENDPOINT_STATUS:
_log.warning("StatusReporter called even though status reporting "
"disabled. Ignoring.")
self._endpoint_status[IPV4].clear()
self._endpoint_status[IPV6].clear()
self._newer_dirty_endpoints.clear()
self._older_dirty_endpoints.clear()
return
if self._cleanup_pending:
try:
self._attempt_cleanup()
except EtcdException as e:
_log.error("Cleanup failed: %r", e)
_stats.increment("Status report cleanup failed")
else:
_stats.increment("Status report cleanup done")
self._cleanup_pending = False
if self._reporting_allowed:
# We're not rate limited, go ahead and do a write to etcd.
_log.debug("Status reporting is allowed by rate limit.")
if not self._older_dirty_endpoints and self._newer_dirty_endpoints:
_log.debug("_older_dirty_endpoints empty, promoting"
"_newer_dirty_endpoints")
self._older_dirty_endpoints = self._newer_dirty_endpoints
self._newer_dirty_endpoints = set()
if self._older_dirty_endpoints:
ep_id = self._older_dirty_endpoints.pop()
status_v4 = self._endpoint_status[IPV4].get(ep_id)
status_v6 = self._endpoint_status[IPV6].get(ep_id)
status = combine_statuses(status_v4, status_v6)
try:
self._write_endpoint_status_to_etcd(ep_id, status)
except EtcdException:
_log.exception("Failed to report status for %s, will "
"retry", ep_id)
# Add it into the next dirty set. Retrying in the next
# batch ensures that we try to update all of the dirty
# endpoints before we do any retries, ensuring fairness.
self._newer_dirty_endpoints.add(ep_id)
# Reset the rate limit flag.
self._reporting_allowed = False
if not self._timer_scheduled and ((not self._reporting_allowed) or
self._cleanup_pending):
# Schedule a timer to stop our rate limiting or retry cleanup.
timeout = self._config.ENDPOINT_REPORT_DELAY
timeout *= (0.9 + (random.random() * 0.2)) # Jitter by +/- 10%.
gevent.spawn_later(timeout,
self._on_timer_pop,
async=True)
self._timer_scheduled = True