def run(self):
try:
for response in self._watch_response_iterator:
if response.created:
self._watch_id_callbacks[response.watch_id] = \
self._callback
self._watch_id_queue.put(response.watch_id)
callback = self._watch_id_callbacks.get(response.watch_id)
if callback:
# The watcher can be safely reused, but adding a new event
# to indicate that the revision is already compacted
# requires api change which would break all users of this
# module. So, raising an exception if a watcher is still
# alive. The caller has to create a new client instance to
# recover would break all users of this module.
if response.compact_revision != 0:
callback(etcd3_exceptions.RevisionCompactedError(
response.compact_revision))
self.cancel(response.watch_id)
continue
for event in response.events:
callback(events.new_event(event))
except grpc.RpcError as e:
self.stop()
if self._watch_id_callbacks:
for callback in self._watch_id_callbacks.values():
callback(e)
评论列表
文章目录