def test_send(self):
transmission.sd = mock.Mock()
ft = FakeThread()
transmission.threading.Thread = mock.Mock(return_value=ft)
t = transmission.Transmission()
qsize = 4
t.pending.qsize = mock.Mock(return_value=qsize)
t.pending.put = mock.Mock()
t.pending.put_nowait = mock.Mock()
t.responses.put = mock.Mock()
t.responses.put_nowait = mock.Mock()
# put an event non-blocking
ev = FakeEvent()
ev.metadata = None
t.send(ev)
transmission.sd.gauge.assert_called_with("queue_length", 4)
t.pending.put_nowait.assert_called_with(ev)
t.pending.put.assert_not_called()
transmission.sd.incr.assert_called_with("messages_queued")
t.pending.put.reset_mock()
t.pending.put_nowait.reset_mock()
transmission.sd.reset_mock()
# put an event blocking
t.block_on_send = True
t.send(ev)
t.pending.put.assert_called_with(ev)
t.pending.put_nowait.assert_not_called()
transmission.sd.incr.assert_called_with("messages_queued")
transmission.sd.reset_mock()
# put an event non-blocking queue full
t.block_on_send = False
t.pending.put_nowait = mock.Mock(side_effect=queue.Full())
t.send(ev)
transmission.sd.incr.assert_called_with("queue_overflow")
t.responses.put_nowait.assert_called_with({
"status_code": 0, "duration": 0,
"metadata": None, "body": "",
"error": "event dropped; queue overflow",
})
评论列表
文章目录