def _run(self, group, queue):
LOG.debug("Asynchronous handler started processing %s", group)
for _ in itertools.count():
# NOTE(ivc): this is a mock-friendly replacement for 'while True'
# to allow more controlled environment for unit-tests (e.g. to
# avoid tests getting stuck in infinite loops)
try:
event = queue.get(timeout=self._grace_period)
except six_queue.Empty:
break
# FIXME(ivc): temporary workaround to skip stale events
# If K8s updates resource while the handler is processing it,
# when the handler finishes its work it can fail to update an
# annotation due to the 'resourceVersion' conflict. K8sClient
# was updated to allow *new* annotations to be set ignoring
# 'resourceVersion', but it leads to another problem as the
# Handler will receive old events (i.e. before annotation is set)
# and will start processing the event 'from scratch'.
# It has negative effect on handlers' performance (VIFHandler
# creates ports only to later delete them and LBaaS handler also
# produces some excess requests to Neutron, although with lesser
# impact).
# Possible solutions (can be combined):
# - use K8s ThirdPartyResources to store data/annotations instead
# of native K8s resources (assuming Kuryr-K8s will own those
# resources and no one else would update them)
# - use the resulting 'resourceVersion' received from K8sClient's
# 'annotate' to provide feedback to Async to skip all events
# until that version
# - stick to the 'get-or-create' behaviour in handlers and
# also introduce cache for long operations
time.sleep(STALE_PERIOD)
while not queue.empty():
event = queue.get()
if queue.empty():
time.sleep(STALE_PERIOD)
self._handler(event)
评论列表
文章目录