def _respond_async(self, is_ack, delivery_token, callback):
if delivery_token is None or callback is None:
return
try:
self.ack_queue.put((is_ack, delivery_token, callback),
block=True,
timeout=self.timeout_seconds)
hostport = util.get_hostport_from_delivery_token(delivery_token)
util.stats_count(self.tchannel.name, 'consumer_ack_queue.enqueue', hostport, 1)
except queue.Full:
callback(AckMessageResult(call_success=False,
is_ack=True,
delivery_token=delivery_token,
error_msg='ack message buffer is full'))
python类Full()的实例源码
def put(self, obj, block=True, timeout=None):
if not block:
return self.put_nowait(obj)
start_time = time.time()
while True:
try:
return self.put_nowait(obj)
except self.Full:
if timeout:
lasted = time.time() - start_time
if timeout > lasted:
time.sleep(min(self.max_timeout, timeout - lasted))
else:
raise
else:
time.sleep(self.max_timeout)
def put(self, obj, block=True, timeout=None):
if not block:
return self.put_nowait()
start_time = time.time()
while True:
try:
return self.put_nowait(obj)
except BaseQueue.Full:
if timeout:
lasted = time.time() - start_time
if timeout > lasted:
time.sleep(min(self.max_timeout, timeout - lasted))
else:
raise
else:
time.sleep(self.max_timeout)
def put(self, obj, block=True, timeout=None):
if not block:
return self.put_nowait(obj)
start_time = time.time()
while True:
try:
return self.put_nowait(obj)
except BaseQueue.Full:
if timeout:
lasted = time.time() - start_time
if timeout > lasted:
time.sleep(min(self.max_timeout, timeout - lasted))
else:
raise
else:
time.sleep(self.max_timeout)
def put(self, obj, block=True, timeout=None):
if not block:
return self.put_nowait(obj)
start_time = time.time()
while True:
try:
return self.put_nowait(obj)
except BaseQueue.Full:
if timeout:
lasted = time.time() - start_time
if timeout > lasted:
time.sleep(min(self.max_timeout, timeout - lasted))
else:
raise
else:
time.sleep(self.max_timeout)
def close(self):
'''call close to send all in-flight requests and shut down the
senders nicely. Times out after max 20 seconds per sending thread
plus 10 seconds for the response queue'''
for i in range(self.max_concurrent_batches):
try:
self.pending.put(None, True, 10)
except queue.Full:
pass
for t in self.threads:
t.join(10)
# signal to the responses queue that nothing more is coming.
try:
self.responses.put(None, True, 10)
except queue.Full:
pass
def _send_dropped_response(ev):
'''push the dropped event down the responses queue'''
response = {
"status_code": 0,
"duration": 0,
"metadata": ev.metadata,
"body": "",
"error": "event dropped due to sampling",
}
try:
if g_block_on_response:
g_responses.put(response)
else:
g_responses.put_nowait(response)
except queue.Full:
pass
def send_task(self, task, force=True):
'''
dispatch task to fetcher
out queue may have size limit to prevent block, a send_buffer is used
'''
try:
self.out_queue.put_nowait(task)
except Queue.Full:
if force:
self._send_buffer.appendleft(task)
else:
raise
def send_task(self, task, force=True):
if self.fetcher.http_client.free_size() <= 0:
if force:
self._send_buffer.appendleft(task)
else:
raise self.outqueue.Full
self.ioloop.add_future(self.do_task(task), lambda x: x.result())
def put_nowait(self, obj):
if self.lazy_limit and self.last_qsize < self.maxsize:
pass
elif self.full():
raise self.Full
self.last_qsize = self.redis.rpush(self.name, umsgpack.packb(obj))
return True
def put_nowait(self, obj):
if self.lazy_limit and self.qsize_diff < self.qsize_diff_limit:
pass
elif self.full():
raise BaseQueue.Full
else:
self.qsize_diff = 0
with self.lock:
self.qsize_diff += 1
msg = amqp.Message(umsgpack.packb(obj))
return self.channel.basic_publish(msg, exchange="", routing_key=self.name)
def put_nowait(self, obj):
if self.full():
raise BaseQueue.Full
with self.lock:
return self.connection.put(umsgpack.packb(obj))
def put_nowait(self, obj):
if self.lazy_limit and self.qsize_diff < self.qsize_diff_limit:
pass
elif self.full():
raise BaseQueue.Full
else:
self.qsize_diff = 0
return self.queue.put(obj)
def queue_put_stoppable(self, q, obj):
""" put obj to queue, but will give up if the thread is stopped"""
while not self.stopped():
try:
q.put(obj, timeout=5)
break
except queue.Full:
pass
def big_print(self, msg):
try:
self.queue.put((True, str(msg)), False)
except queue.Full:
pass
def print_(self, msg):
if not self.terse:
try:
self.queue.put((False, str(msg)), False)
except queue.Full:
pass
def send(self, ev):
'''send accepts an event and queues it to be sent'''
sd.gauge("queue_length", self.pending.qsize())
try:
if self.block_on_send:
self.pending.put(ev)
else:
self.pending.put_nowait(ev)
sd.incr("messages_queued")
except queue.Full:
response = {
"status_code": 0,
"duration": 0,
"metadata": ev.metadata,
"body": "",
"error": "event dropped; queue overflow",
}
if self.block_on_response:
self.responses.put(response)
else:
try:
self.responses.put_nowait(response)
except queue.Full:
# if the response queue is full when trying to add an event
# queue is full response, just skip it.
pass
sd.incr("queue_overflow")
def test_send(self):
transmission.sd = mock.Mock()
ft = FakeThread()
transmission.threading.Thread = mock.Mock(return_value=ft)
t = transmission.Transmission()
qsize = 4
t.pending.qsize = mock.Mock(return_value=qsize)
t.pending.put = mock.Mock()
t.pending.put_nowait = mock.Mock()
t.responses.put = mock.Mock()
t.responses.put_nowait = mock.Mock()
# put an event non-blocking
ev = FakeEvent()
ev.metadata = None
t.send(ev)
transmission.sd.gauge.assert_called_with("queue_length", 4)
t.pending.put_nowait.assert_called_with(ev)
t.pending.put.assert_not_called()
transmission.sd.incr.assert_called_with("messages_queued")
t.pending.put.reset_mock()
t.pending.put_nowait.reset_mock()
transmission.sd.reset_mock()
# put an event blocking
t.block_on_send = True
t.send(ev)
t.pending.put.assert_called_with(ev)
t.pending.put_nowait.assert_not_called()
transmission.sd.incr.assert_called_with("messages_queued")
transmission.sd.reset_mock()
# put an event non-blocking queue full
t.block_on_send = False
t.pending.put_nowait = mock.Mock(side_effect=queue.Full())
t.send(ev)
transmission.sd.incr.assert_called_with("queue_overflow")
t.responses.put_nowait.assert_called_with({
"status_code": 0, "duration": 0,
"metadata": None, "body": "",
"error": "event dropped; queue overflow",
})
def event_producer(self):
executor_queue = self.executor.get_event_queue()
while True:
if self.stopping:
return
try:
event = executor_queue.get(block=True, timeout=1)
self.event_queue.put(event, False)
except Empty:
pass
except Full:
pass
def run(self):
logging.debug("{} thread {} started".format(self.DESCR, self.name))
while True:
try:
task = self.tasks_q.get(block=True, timeout=0.1)
except queue.Empty:
break
try:
data = self.do_task(task)
if data:
with self.lock:
self.save_data(task, data)
except Exception as e:
if isinstance(e, ARouteServerError):
if str(e):
logging.error(
"{} thread {} error: {}".format(
self.DESCR, self.name,
str(e)
)
)
else:
logging.error(
"{} thread {} unhandled exception: {}".format(
self.DESCR, self.name,
str(e) if str(e) else "error unknown"
),
exc_info=True
)
try:
self.errors_q.put_nowait(True)
except queue.Full:
pass
self.tasks_q.task_done()
logging.debug("{} thread {} stopped".format(
self.DESCR, self.name))
def queue_put_stoppable(self, q, obj):
""" put obj to queue, but will give up if the thread is stopped"""
while not self.stopped():
try:
q.put(obj, timeout=5)
break
except queue.Full:
pass
def _queue_refresh_index(self):
"""Queues a background task to update search indexes. This
method is a protected helper function for depot consumers."""
try:
self.__bgtask.put(self.repo.refresh_index)
except queue.Full:
# If another operation is already in progress, just
# log a warning and drive on.
cherrypy.log("Skipping indexing; another operation is "
"already in progress.", "INDEX")
def index_0(self, *tokens):
"""Provides an administrative interface for search indexing.
Returns no output if successful; otherwise the response body
will contain the failure details.
"""
try:
cmd = tokens[0]
except IndexError:
cmd = ""
# These commands cause the operation requested to be queued
# for later execution. This does mean that if the operation
# fails, the client won't know about it, but this is necessary
# since these are long running operations (are likely to exceed
# connection timeout limits).
try:
if cmd == "refresh":
# Update search indexes.
self.__bgtask.put(self.repo.refresh_index,
pub=self._get_req_pub())
else:
err = "Unknown index subcommand: {0}".format(
cmd)
cherrypy.log(err)
raise cherrypy.HTTPError(http_client.NOT_FOUND, err)
except queue.Full:
raise cherrypy.HTTPError(http_client.SERVICE_UNAVAILABLE,
"Another operation is already in progress; try "
"again later.")
def put(self, task, *args, **kwargs):
"""Schedule the given task for background execution if queue
isn't full.
"""
if self.__q.unfinished_tasks > 9:
raise queue.Full()
self.__q.put_nowait((task, args, kwargs))
def put(self, task, *args, **kwargs):
"""Schedule the given task for background execution if queue
isn't full.
"""
if self.__q.unfinished_tasks > self.size - 1:
raise queue.Full()
self.__q.put_nowait((task, args, kwargs))
self.__keep_busy = True
def push_retry(self, batch):
# we retry a batch - decrement retry counter
batch = batch._replace(rty_cnt=batch.rty_cnt - 1)
try:
self.network_deque.put(batch, block=False)
except queue.Full:
msg = 'Dropping {} due to backfill queue full.'.format(
batch)
self.ui.error(msg)
self.send_error_to_ctx(batch, msg)
def run(self):
request = cherami.ReceiveMessageBatchRequest(destinationPath=self.path,
consumerGroupName=self.consumer_group_name,
maxNumberOfMessages=self.msg_batch_size,
receiveTimeout=max(1, self.timeout_seconds - 1)
)
while not self.stop_signal.is_set():
# possible optimization: if we don't have enough capacity in the queue,
# backoff for a bit before pulling from Cherami again
try:
result = util.execute_output_host(tchannel=self.tchannel,
headers=self.headers,
hostport=self.hostport,
timeout=self.timeout_seconds,
method_name='receiveMessageBatch',
request=request)
util.stats_count(self.tchannel.name,
'receiveMessageBatch.messages',
self.hostport,
len(result.messages))
for msg in result.messages:
# if the queue is full, keep trying until there's free slot, or the thread has been shutdown
while not self.stop_signal.is_set():
try:
self.msg_queue.put((util.create_delivery_token(msg.ackId, self.hostport), msg),
block=True,
timeout=5)
util.stats_count(self.tchannel.name,
'consumer_msg_queue.enqueue',
self.hostport,
1)
break
except Full:
pass
except Exception as e:
self.logger.info({
'msg': 'error receiving msg from output host',
'hostport': self.hostport,
'traceback': traceback.format_exc(),
'exception': str(e)
})
def _send(self, ev):
'''_send should only be called from sender and sends an individual
event to Honeycomb'''
start = get_now()
try:
url = urljoin(urljoin(ev.api_host, "/1/events/"), ev.dataset)
req = requests.Request('POST', url, data=str(ev))
event_time = ev.created_at.isoformat()
if ev.created_at.tzinfo is None:
event_time += "Z"
req.headers.update({
"X-Event-Time": event_time,
"X-Honeycomb-Team": ev.writekey,
"X-Honeycomb-SampleRate": str(ev.sample_rate)})
preq = self.session.prepare_request(req)
resp = self.session.send(preq)
if (resp.status_code == 200):
sd.incr("messages_sent")
else:
sd.incr("send_errors")
response = {
"status_code": resp.status_code,
"body": resp.text,
"error": "",
}
except Exception as e:
# Sometimes the ELB returns SSL issues for no good reason. Sometimes
# Honeycomb will timeout. We shouldn't influence the calling app's
# stack, so catch these and hand them to the responses queue.
sd.incr("send_errors")
response = {
"status_code": 0,
"body": "",
"error": repr(e),
}
finally:
dur = get_now() - start
response["duration"] = dur.total_seconds() * 1000 # report in milliseconds
response["metadata"] = ev.metadata
if self.block_on_response:
self.responses.put(response)
else:
try:
self.responses.put_nowait(response)
except queue.Full:
pass