def process_secgroup_after(self, resource, event, trigger, **kwargs):
"""Callback for handling security group/rule commit-complete events
This is when we should tell other things that a change has
happened and has been recorded permanently in the DB.
"""
# In Liberty, this is the only callback that's called.
# We use our own event names, which will identify AFTER_*
# events as the right time to commit, so in this case we
# simply call the commit function ourselves.
# This is not perfect - since we're not committing in one
# transaction we can commit the secgroup change but fail to
# propagate it to the journal and from there to etcd on a
# crash. It's all we can do for Liberty as it doesn't support
# in-transaction precommit events.
if not PRECOMMIT:
self.process_secgroup_commit(resource, event, trigger, **kwargs)
# Whatever the object that caused this, we've put something
# in the journal and now need to nudge the communicator
self.kick()
python类event()的实例源码
def __init__(self):
super(EtcdAgentCommunicator, self).__init__()
self.etcd_client = etcd.Client() # TODO(ijw): give this args
# We need certain directories to exist
self.do_etcd_mkdir(LEADIN + '/state')
self.do_etcd_mkdir(LEADIN + '/nodes')
# TODO(ijw): .../state/<host> lists all known hosts, and they
# heartbeat when they're functioning
# Get the physnets the agents know about. This is updated
# periodically in the return thread below.
self.physical_networks = set()
self._find_physnets()
self.db_q_ev = eventlet.event.Event()
self.return_thread = eventlet.spawn(self._return_worker)
self.forward_thread = eventlet.spawn(self._forward_worker)
def __init__(self):
self._ev = eventlet.event.Event()
self._cond = False
def _broadcast(self):
self._ev.send()
# Since eventlet Event doesn't allow multiple send() operations
# on an event, re-create the underlying event.
# Note: _ev.reset() is obsolete.
self._ev = eventlet.event.Event()
def start_threads(self, resource, event, trigger):
LOG.debug('Starting background threads for Neutron worker')
self.return_thread = self.make_return_worker()
self.forward_thread = self.make_forward_worker()
def __init__(self):
self._ev = eventlet.event.Event()
self._cond = False
def _broadcast(self):
self._ev.send()
# Since eventlet Event doesn't allow multiple send() operations
# on an event, re-create the underlying event.
# Note: _ev.reset() is obsolete.
self._ev = eventlet.event.Event()
def _forward_worker(self):
LOG.debug('forward worker begun')
session = neutron_db_api.get_session()
while True:
try:
def work(k, v):
LOG.debug('forward worker updating etcd key %s' % k)
if self.do_etcd_update(k, v):
return True
else:
os.sleep(1) # something went bad; breathe, in
# case we end up in a tight loop
return False
LOG.debug('forward worker reading journal')
while db.journal_read(session, work):
pass
LOG.debug('forward worker has emptied journal')
# work queue is now empty.
LOG.debug("ML2_VPP(%s): worker thread pausing" % self.__class__.__name__)
# Wait to be kicked, or (in case of emergency) run every
# few seconds in case another thread or process dumped
# work and failed to process it
try:
with eventlet.Timeout(PARANOIA_TIME) as t:
# Wait for kick
dummy = self.db_q_ev.wait()
# Clear the event - we will now process till
# we've run out of things in the backlog
# so any trigger lost in this gap is harmless
self.db_q_ev.reset()
LOG.debug("ML2_VPP(%s): worker thread kicked: %s" % (self.__class__.__name__, str(dummy)))
except eventlet.Timeout:
LOG.debug("ML2_VPP(%s): worker thread suspicious of a long pause" % self.__class__.__name__)
pass
LOG.debug("ML2_VPP(%s): worker thread active" % self.__class__.__name__)
except Exception, e:
# TODO(ijw): log exception properly
LOG.error("problems in forward worker: %s", e)
LOG.error(traceback.format_exc())
# never quit
#pass
######################################################################