def spawn_after(self, delay, f, *args, **kwargs):
"""
Spawns a greenlet that will start after delay seconds. Otherwise, same as Module.spawn
"""
g = gevent.Greenlet(f, *args, **kwargs)
g.link_exception(self._on_error)
g.link(lambda v: self._running_greenlets.discard(g))
self._running_greenlets.add(g)
g.start_later(delay)
return g
python类Greenlet()的实例源码
def start(self, right_away=True):
if self._greenlet:
raise RuntimeError("Periodic already started.")
self._greenlet = Greenlet(self._run)
self._greenlet.link(self._discard_greenlet)
if right_away:
self._greenlet.start()
else:
self._greenlet.start_later(self.interval)
def __init__(self):
self.logger = logging.getLogger('%s.%s' % (self.__class__.__module__,
self.__class__.__name__))
self._component_types = {} # Component metadata, keyed by component name
# type: Dict[str, Graph]
self._graphs = {} # Graph instances, keyed by graph ID
# type: Dict[str, Tuple[Greenlet, Network]]
self._executors = {} # GraphExecutor instances, keyed by graph ID
self.logger.debug('Initialized runtime!')
def _on_worker_died(self, watch_greenlet):
"""
Greenlet: spawned by the gevent Hub if our worker thread dies.
"""
_log.critical("Worker greenlet died: %s; exiting.", watch_greenlet)
sys.exit(1)
def __init__(self, qualifier=None):
self._event_queue = collections.deque()
# Set to True when the main loop is actively processing the input
# queue or has been scheduled to do so. Set to False when the loop
# runs out of work and switches to the Hub to wait for more.
self._scheduled = True
# (Monotonic time) timestamp of last schedule.
self._last_scheduled = None
# Cache the gevent Hub and main loop.
self._gevent_hub = gevent.get_hub()
self._gevent_loop = self._gevent_hub.loop
self.greenlet = gevent.Greenlet(self._loop)
self._op_count = 0
self._current_msg = None
self.started = False
# Message being processed; purely for logging.
self.msg_id = None
# Logging parameters
self.qualifier = qualifier
if qualifier:
self.name = "%s(%s)" % (self.__class__.__name__, qualifier)
else:
self.name = self.__class__.__name__
# Can't use str(self) yet, it might not be ready until subclass
# constructed.
_log.info("%s created.", self.name)
def __init__(self, config, hosts_ipset):
super(EtcdAPI, self).__init__(config.ETCD_ADDRS,
etcd_scheme=config.ETCD_SCHEME,
etcd_key=config.ETCD_KEY_FILE,
etcd_cert=config.ETCD_CERT_FILE,
etcd_ca=config.ETCD_CA_FILE)
self._config = config
# Timestamp storing when the EtcdAPI started. This info is needed
# in order to report uptime to etcd.
self._start_time = monotonic_time()
# Create an Actor to report per-endpoint status into etcd. We defer
# startup of this and our other workers until we get started.
self.status_reporter = EtcdStatusReporter(config)
# Create the main etcd-watching greenlet.
self._watcher = _FelixEtcdWatcher(config,
self,
self.status_reporter,
hosts_ipset)
self._watcher.link(self._on_worker_died)
# Create a greenlet to trigger periodic resyncs.
self._resync_greenlet = gevent.Greenlet(self._periodically_resync)
self._resync_greenlet.link_exception(self._on_worker_died)
# Create a greenlet to report felix's liveness into etcd.
self.done_first_status_report = False
self._status_reporting_greenlet = gevent.Greenlet(
self._periodically_report_status
)
self._status_reporting_greenlet.link_exception(self._on_worker_died)
self.status_reporter.greenlet.link(self._on_worker_died)
def _periodically_report_status(self):
"""
Greenlet: periodically writes Felix's status into etcd.
:return: Does not return, unless reporting disabled.
"""
_log.info("Started status reporting thread. Waiting for config.")
self._watcher.configured.wait()
ttl = self._config.REPORTING_TTL_SECS
interval = self._config.REPORTING_INTERVAL_SECS
_log.debug("Reporting interval: %s, TTL: %s", interval, ttl)
if interval == 0:
_log.info("Interval is 0, status reporting disabled.")
return
while True:
try:
self._update_felix_status(ttl)
except EtcdException as e:
_log.warning("Error when trying to check into etcd (%r), "
"retrying after %s seconds.", e, RETRY_DELAY)
self.reconnect()
gevent.sleep(RETRY_DELAY)
else:
# Jitter by 10% of interval.
jitter = random.random() * 0.1 * interval
sleep_time = interval + jitter
gevent.sleep(sleep_time)
def _on_worker_died(self, watch_greenlet):
"""
Greenlet: spawned by the gevent Hub if the etcd watch loop ever
stops, kills the process.
"""
_log.critical("Worker greenlet died: %s; exiting.", watch_greenlet)
sys.exit(1)
def __init__(self, config, etcd_api, status_reporter, hosts_ipset):
super(_FelixEtcdWatcher, self).__init__()
self._config = config
self._etcd_api = etcd_api
self._status_reporter = status_reporter
self.hosts_ipset = hosts_ipset
# Whether we've been in sync with etcd at some point.
self._been_in_sync = False
# Keep track of the config loaded from etcd so we can spot if it
# changes.
self.last_global_config = None
self.last_host_config = None
self.my_config_dir = dir_for_per_host_config(self._config.HOSTNAME)
# Events triggered by the EtcdAPI Actor to tell us to load the config
# and start polling. These are one-way flags.
self.load_config = Event()
self.begin_polling = Event()
# Event that we trigger once the config is loaded.
self.configured = Event()
# Polling state initialized at poll start time.
self.splitter = None
# Next-hop IP addresses of our hosts, if populated in etcd.
self.ipv4_by_hostname = {}
# Forces a resync after the current poll if set. Safe to set from
# another thread. Automatically reset to False after the resync is
# triggered.
self.resync_requested = False
self.dispatcher = PathDispatcher()
# The Popen object for the driver.
self._driver_process = None
# Stats.
self.read_count = 0
self.msgs_processed = 0
self.last_rate_log_time = monotonic_time()
# Register for events when values change.
self._register_paths()
self._usage_report_greenlet = gevent.Greenlet(
self._periodically_usage_report
)
def _periodically_usage_report(self):
"""
Greenlet: periodically report the cluster existence to
projectcalico.org. Period is about once per day.
:return: Does not return, unless USAGE_REPORT disabled.
"""
interval = 86400 # Once every 24 hours minus 12 minute jitter
jitter = random.random() * 0.01 * interval
try:
calico_version = str(pkg_resources.require("calico")[0].version)
except ResolutionError:
calico_version = "NA"
_log.info("Started usage report thread. Usage report interval: %s, pre-jitter: %s", interval, jitter)
# Pre-Jitter the reporting thread start by 1% of interval (about 12 minutes)
# Jitter prevents thundering herd for large clusters when the cluster first starts
# Do pre-jitter only for clusters greater than 25.
felix_count = self.estimated_host_count()
if (felix_count >= 25):
gevent.sleep(jitter)
while True:
config = self._config
felix_count = self.estimated_host_count()
cluster_type = "NA"
if self._config.USAGE_REPORT:
_log.info("usage report is enabled")
report_usage_and_get_warnings(calico_version, config.HOSTNAME, config.CLUSTER_GUID, felix_count, cluster_type)
# Jitter by 10% of interval (about 120 minutes)
jitter = random.random() * 0.1 * interval
sleep_time = interval - jitter
_log.info("Usage report interval: %s, sleep-time: %s", interval, sleep_time)
gevent.sleep(sleep_time)
def __init__(self, *args, **kwargs):
self._inbox = Queue()
self._running = True
Greenlet.__init__(self)
self.start()
self._args = args
self._kwargs = kwargs
def run_parallel(self, hosts, cmd):
codes = {"total": 0, "error": 0, "success": 0}
def worker(host, cmd):
p = Popen(self.get_parallel_ssh_options(host, cmd), stdout=PIPE, stderr=PIPE)
while True:
outs, _, _ = select([p.stdout, p.stderr], [], [])
if p.stdout in outs:
outline = p.stdout.readline()
else:
outline = ""
if p.stderr in outs:
errline = p.stderr.readline()
else:
errline = ""
if outline == "" and errline == "" and p.poll() is not None:
break
if outline != "":
print("%s: %s" % (colored(host, "blue", attrs=["bold"]), outline.strip()))
if errline != "":
print("%s: %s" % (colored(host, "blue", attrs=["bold"]), colored(errline.strip(), "red")))
if p.poll() == 0:
codes["success"] += 1
else:
codes["error"] += 1
codes["total"] += 1
pool = Pool(self.ssh_threads)
for host in hosts:
pool.start(Greenlet(worker, host, cmd))
pool.join()
self.print_exec_results(codes)
def ping_parallel(self, hosts, pc):
"""ping:\n pings host (using shell cmd)"""
codes = {"total": 0, "error": 0, "success": 0}
def worker(host):
if pc == 0:
args = ["ping", host]
else:
args = ["ping", "-c", str(pc), host]
p = Popen(args, stdout=PIPE, stderr=PIPE)
while True:
outs, _, _ = select([p.stdout, p.stderr], [], [])
if p.stdout in outs:
outline = p.stdout.readline()
else:
outline = ""
if p.stderr in outs:
errline = p.stderr.readline()
else:
errline = ""
if outline == "" and errline == "" and p.poll() is not None:
break
if outline != "":
print("%s: %s" % (colored(host, "blue", attrs=["bold"]), outline.strip()))
if errline != "":
print("%s: %s" % (colored(host, "blue", attrs=["bold"]), colored(errline.strip(), "red")))
if p.poll() == 0:
codes["success"] += 1
else:
codes["error"] += 1
codes["total"] += 1
pool = Pool(self.ssh_threads)
for host in hosts:
pool.start(Greenlet(worker, host))
pool.join()
self.print_exec_results(codes)
def defer(f=None, predicate=None):
"""
Schedule a function to run in a cpu_bound thread, returns a AsyncFuture
Optional predicate parameter to determine if the function should be dispatched
"""
if f is None:
def p_wrap(f):
return defer(f, predicate)
return p_wrap
else:
def f_wrap(f, *args, **kwargs):
if CPUThread._thread is None:
CPUThread._thread = CPUThread()
return CPUThread._thread.apply(f, args, kwargs)
def wrapper(*args, **kwargs):
a = AsyncFuture(None, None)
# TODO: unit test this
if (predicate is not None and not predicate) or utils.in_cpubound_thread():
v = f(*args, **kwargs)
a._value = v
else:
g = Greenlet(f_wrap, f, *args, **kwargs)
g.start()
a._future = g
return a
return wrapper
def _on_child_hook():
# This is called in the hub greenlet. To let the function
# do more useful work, like use blocking functions,
# we run it in a new greenlet; see gevent.hub.signal
if callable(_child_handler):
# None is a valid value for the frame argument
from gevent import Greenlet
greenlet = Greenlet(_child_handler, _signal.SIGCHLD, None)
greenlet.switch()
def __init__(self, config, ip_type,
iptables_updater,
workload_disp_chains,
host_disp_chains,
rules_manager,
fip_manager,
status_reporter):
super(EndpointManager, self).__init__(qualifier=ip_type)
# Configuration and version to use
self.config = config
self.ip_type = ip_type
self.ip_version = futils.IP_TYPE_TO_VERSION[ip_type]
# Peers/utility classes.
self.iptables_updater = iptables_updater
self.workload_disp_chains = workload_disp_chains
self.host_disp_chains = host_disp_chains
self.rules_mgr = rules_manager
self.status_reporter = status_reporter
self.fip_manager = fip_manager
# All endpoint dicts that are on this host.
self.endpoints_by_id = {}
# Dict that maps from interface name ("tap1234") to endpoint ID.
self.endpoint_id_by_iface_name = {}
# Cache of IPs applied to host endpoints. (I.e. any interfaces that
# aren't workload interfaces.)
self.host_ep_ips_by_iface = {}
# Host interface dicts by ID. We'll resolve these with the IPs above
# and inject the (resolved) ones as endpoints.
self.host_eps_by_id = {}
# Cache of interfaces that we've resolved and injected as endpoints.
self.resolved_host_eps = {}
# Set of endpoints that are live on this host. I.e. ones that we've
# increffed.
self.local_endpoint_ids = set()
# Index tracking what policy applies to what endpoints.
self.policy_index = LabelValueIndex()
self.policy_index.on_match_started = self.on_policy_match_started
self.policy_index.on_match_stopped = self.on_policy_match_stopped
self._label_inherit_idx = LabelInheritanceIndex(self.policy_index)
# Tier orders by tier ID. We use this to look up the order when we're
# sorting the tiers.
self.tier_orders = {}
# Cache of the current ordering of tier IDs.
self.tier_sequence = []
# And their associated orders.
self.profile_orders = {}
# Set of profile IDs to apply to each endpoint ID.
self.pol_ids_by_ep_id = MultiDict()
self.endpoints_with_dirty_policy = set()
self._data_model_in_sync = False
self._iface_poll_greenlet = gevent.Greenlet(self._interface_poll_loop)
self._iface_poll_greenlet.link_exception(self._on_worker_died)
def run_collapse(self, hosts, cmd):
progress = None
if self.progressbar:
from progressbar import ProgressBar, Percentage, Bar, ETA, FileTransferSpeed
progress = ProgressBar(
widgets=["Running: ", Percentage(), ' ', Bar(marker='.'), ' ', ETA(), ' ', FileTransferSpeed()],
maxval=len(hosts))
codes = {"total": 0, "error": 0, "success": 0}
outputs = defaultdict(list)
def worker(host, cmd):
p = Popen(self.get_parallel_ssh_options(host, cmd), stdout=PIPE, stderr=PIPE)
o = ""
while True:
outs, _, _ = select([p.stdout, p.stderr], [], [])
outline = errline = ""
if p.stdout in outs:
outline = p.stdout.readline()
if p.stderr in outs:
errline = p.stderr.readline()
o += outline + errline
if outline == "" and errline == "" and p.poll() is not None:
break
if o == "":
o = colored("[ No Output ]\n", "yellow")
outputs[o].append(host)
if p.poll() == 0:
codes["success"] += 1
else:
codes["error"] += 1
codes["total"] += 1
if self.progressbar:
progress.update(codes["total"])
pool = Pool(self.ssh_threads)
if self.progressbar:
progress.start()
for host in hosts:
pool.start(Greenlet(worker, host, cmd))
try:
pool.join()
except KeyboardInterrupt:
pass
if self.progressbar:
progress.finish()
self.print_exec_results(codes)
print()
for output, hosts in outputs.items():
msg = " %s " % ','.join(hosts)
table_width = min([len(msg) + 2, terminal_size()[0]])
cprint("=" * table_width, "blue", attrs=["bold"])
cprint(msg, "blue", attrs=["bold"])
cprint("=" * table_width, "blue", attrs=["bold"])
print(output)