def _forward_worker(self):
LOG.debug('forward worker begun')
session = neutron_db_api.get_session()
while True:
try:
def work(k, v):
LOG.debug('forward worker updating etcd key %s' % k)
if self.do_etcd_update(k, v):
return True
else:
os.sleep(1) # something went bad; breathe, in
# case we end up in a tight loop
return False
LOG.debug('forward worker reading journal')
while db.journal_read(session, work):
pass
LOG.debug('forward worker has emptied journal')
# work queue is now empty.
LOG.debug("ML2_VPP(%s): worker thread pausing" % self.__class__.__name__)
# Wait to be kicked, or (in case of emergency) run every
# few seconds in case another thread or process dumped
# work and failed to process it
try:
with eventlet.Timeout(PARANOIA_TIME) as t:
# Wait for kick
dummy = self.db_q_ev.wait()
# Clear the event - we will now process till
# we've run out of things in the backlog
# so any trigger lost in this gap is harmless
self.db_q_ev.reset()
LOG.debug("ML2_VPP(%s): worker thread kicked: %s" % (self.__class__.__name__, str(dummy)))
except eventlet.Timeout:
LOG.debug("ML2_VPP(%s): worker thread suspicious of a long pause" % self.__class__.__name__)
pass
LOG.debug("ML2_VPP(%s): worker thread active" % self.__class__.__name__)
except Exception, e:
# TODO(ijw): log exception properly
LOG.error("problems in forward worker: %s", e)
LOG.error(traceback.format_exc())
# never quit
#pass
######################################################################
评论列表
文章目录