def start_change_event_in_stream(self):
def receive_change_events():
streaming_rpc_method = self.local_stub.ReceiveChangeEvents
iterator = streaming_rpc_method(empty_pb2.Empty())
try:
for event in iterator:
reactor.callFromThread(self.change_event_queue.put, event)
log.debug('enqued-change-event',
change_event=event,
queue_len=len(self.change_event_queue.pending))
except _Rendezvous, e:
if e.code() == StatusCode.UNAVAILABLE:
os.system("kill -15 {}".format(os.getpid()))
reactor.callInThread(receive_change_events)
评论列表
文章目录