def run(self):
# init listeners, add them to the event loop
for sock in self.sockets:
sock.setblocking(False)
# a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(1.0)
for key, mask in events:
callback = key.data
callback(key.fileobj)
# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)
# clean up finished requests
for fut in result.done:
self.futures.remove(fut)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
self.tpool.shutdown(False)
self.poller.close()
for s in self.sockets:
s.close()
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
python类FIRST_COMPLETED的实例源码
def run(self):
# init listeners, add them to the event loop
for sock in self.sockets:
sock.setblocking(False)
# a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(1.0)
for key, mask in events:
callback = key.data
callback(key.fileobj)
# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)
# clean up finished requests
for fut in result.done:
self.futures.remove(fut)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
self.tpool.shutdown(False)
self.poller.close()
for s in self.sockets:
s.close()
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
def run(self):
# init listeners, add them to the event loop
for sock in self.sockets:
sock.setblocking(False)
# a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(1.0)
for key, mask in events:
callback = key.data
callback(key.fileobj)
# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)
# clean up finished requests
for fut in result.done:
self.futures.remove(fut)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
self.tpool.shutdown(False)
self.poller.close()
for s in self.sockets:
s.close()
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
def run(self):
# init listeners, add them to the event loop
for sock in self.sockets:
sock.setblocking(False)
# a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(1.0)
for key, mask in events:
callback = key.data
callback(key.fileobj)
# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)
# clean up finished requests
for fut in result.done:
self.futures.remove(fut)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
self.tpool.shutdown(False)
self.poller.close()
for s in self.sockets:
s.close()
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
def run(self):
# init listeners, add them to the event loop
for sock in self.sockets:
sock.setblocking(False)
# a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(1.0)
for key, mask in events:
callback = key.data
callback(key.fileobj)
# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)
# clean up finished requests
for fut in result.done:
self.futures.remove(fut)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
self.tpool.shutdown(False)
self.poller.close()
for s in self.sockets:
s.close()
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
def perform_requests(self):
signal.signal(signal.SIGINT, self.exit_fast)
signal.signal(signal.SIGTERM, self.exit_fast)
self.state = b'E'
for q_batch in self.get_batch():
for (batch, data) in self.split_batch(q_batch):
hook = partial(self._response_callback, batch=batch)
r = requests.Request(
method='POST',
url=self.endpoint,
headers=self.headers,
data=data,
auth=(self.user, self.api_token),
hooks={'response': hook})
self.n_requests += 1
while True:
self.futures = [i for i in self.futures if not i.done()]
if len(self.futures) < self.concurrency:
self.state = b'R'
f = self._executor.submit(self._request, r)
f.add_done_callback(self.request_cb)
self.futures.append(f)
break
else:
self.state = b'F'
wait(self.futures, return_when=FIRST_COMPLETED)
yield
# wait for all batches to finish before returning
self.state = b'W'
while self.futures:
f_len = len(self.futures)
self.futures = [i for i in self.futures if not i.done()]
if f_len != len(self.futures):
self.ui.debug('Waiting for final requests to finish. '
'remaining requests: {}'
''.format(len(self.futures)))
wait(self.futures, return_when=FIRST_COMPLETED)
self.state = b'D'
yield True
def run(self):
# init listeners, add them to the event loop
for sock in self.sockets:
sock.setblocking(False)
# a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(1.0)
for key, mask in events:
callback = key.data
callback(key.fileobj)
# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)
# clean up finished requests
for fut in result.done:
self.futures.remove(fut)
if not self.is_parent_alive():
break
# hanle keepalive timeouts
self.murder_keepalived()
self.tpool.shutdown(False)
self.poller.close()
for s in self.sockets:
s.close()
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)