def test_fetch_and_prepare_message(mocker):
mocker.patch('iris.bin.sender.message_send_enqueue')
from iris.bin.sender import (
fetch_and_prepare_message, message_queue, per_mode_send_queues
)
init_queue_with_item(message_queue, {'message_id': 1234, 'plan_id': None})
fetch_and_prepare_message()
assert message_queue.qsize() == 0
send_queue = per_mode_send_queues.setdefault('email', gevent.queue.Queue())
init_queue_with_item(send_queue, {'message_id': 1234, 'plan_id': None})
assert message_queue.qsize() == 0
assert send_queue.qsize() == 1
m = send_queue.get()
assert m['message_id'] == 1234
python类get()的实例源码
def getLocalIp(self):
i = 0
local_ip = None
# ????local_ip???local_ip??cmd_agent???????????
while i < 3:
gevent.sleep(3)
sys_conf = os.path.join(_agentBasePath , "conf","sysconf.ini")
conf = ConfigParser.ConfigParser()
conf.optionxform = str
if os.path.exists(sys_conf):
conf.read(sys_conf)
if conf.has_section('sys'):
local_ip = conf.get('sys','local_ip')
if local_ip:
break
else:
# ?????????logger????????logger
logger.error('not found local_ip, will retry')
i += 1
return local_ip
def handle_customize(self):
self.generate_uuid()
# self.inner_ip = self.getLocalIp()
# if not self.inner_ip:
# logger.error('not found local_ip, please restart agent')
# sys.exit(1)
server_groups = self.conf.get('report', 'server_groups')
job_list = []
job_list.append(gevent.spawn(self.localReport))
job_list.append(gevent.spawn(self.localJsonReport))
jobs = self.send_to_server_groups(server_groups, self.config["linger_ms"], self.config["max_queued_messages"])
job_list.extend(jobs)
gevent.joinall(job_list)
def processJsonRep(self,socket, address):
org = self.conf.get('base','client_id')
jsonSocket = jsonSession(socket=socket,org=org)
while 1:
try:
code, data = jsonSocket.recv()
if code != 0:
logger.error("local receive error (%s %s)"%(code, data))
socket.close()
break
try:
_reportQueue.put_nowait(data)
except gevent.queue.Full:
logger.error("report queue is full")
jsonSocket.send_response(conf.global_vars.ErrCode.QueueFull, 'ok')
continue
jsonSocket.send_response(0, 'ok')
except Exception, e:
logger.error("uncaught error, e={}, traceback={}".format(e, traceback.format_exc()))
socket.close()
break
def processRep(self,socket, address):
org = self.conf.get('base', 'client_id')
pbSocket = pbSession(socket=socket,org=org)
while 1:
try:
code, data = pbSocket.recv(decode=False)
if code != 0:
if "connection closed" not in data:
logger.error("local receive error (%s %s)"%(code, data))
socket.close()
break
try:
_reportQueue.put_nowait(data)
except gevent.queue.Full:
logger.error("report queue is full")
pbSocket.send_response(conf.global_vars.ErrCode.QueueFull, 'ok')
continue
pbSocket.send_response(0, 'ok')
except Exception, e:
logger.error("uncaught error, e={}, traceback={}".format(e, traceback.format_exc()))
socket.close()
break
def enqueue(self, queue_event_list, max_queued_messages):
if len(queue_event_list) == 0:
return
while True:
try:
# get msg
task_msg = _reportQueue.get()
if not task_msg:
continue
dataid, org, ip = task_msg[0][-3:]
logger.debug('recv msg, org: %s dataid: %s' %(org, dataid))
# enqueue
for (q, flush_ready_event) in queue_event_list:
if not q.full():
q.put_nowait(task_msg)
else:
logger.error("queue full")
if q.qsize() >= max_queued_messages and not flush_ready_event.is_set():
flush_ready_event.set()
except Exception, e:
logger.error(e)
def on_message_received(self, namespace, message):
consumers = self.consumers.get(namespace, [])
# Compress the message
if len(message) >= MIN_COMPRESS_SIZE:
compressed = make_compressed_frame(message, COMPRESSOR)
else:
compressed = None
message = Message(compressed=compressed, raw=message)
with self.metrics.timer("dispatch"):
for consumer in consumers:
consumer.put(message)
def listen(self, namespace, max_timeout):
"""Register to listen to a namespace and yield messages as they arrive.
If no messages arrive within `max_timeout` seconds, this will yield a
`None` to allow clients to do periodic actions like send PINGs.
This will run forever and yield items as an iterable. Use it in a loop
and break out of it when you want to deregister.
"""
queue = gevent.queue.Queue()
namespace = namespace.rstrip("/")
for ns in _walk_namespace_hierarchy(namespace):
self.consumers.setdefault(ns, []).append(queue)
try:
while True:
# jitter the timeout a bit to ensure we don't herd
timeout = max_timeout - random.uniform(0, max_timeout / 2)
try:
yield queue.get(block=True, timeout=timeout)
except gevent.queue.Empty:
yield None
# ensure we're not starving others by spinning
gevent.sleep()
finally:
for ns in _walk_namespace_hierarchy(namespace):
self.consumers[ns].remove(queue)
if not self.consumers[ns]:
del self.consumers[ns]
def _create_greenlet_worker(self, queue):
def greenlet_worker():
while True:
try:
func = queue.get()
if func is _STOP:
break
func()
except Empty:
continue
except Exception as exc:
log.warning("Exception in worker greenlet")
log.exception(exc)
return gevent.spawn(greenlet_worker)
def init_queue_with_item(queue, item=None):
# drain out queue
while queue.qsize() > 0:
queue.get()
if item:
queue.put(item)
def test_handle_api_request_v0_send(mocker):
from iris.bin.sender import message_send_enqueue
from iris.sender.rpc import handle_api_request, send_funcs
from iris.sender.shared import per_mode_send_queues
send_funcs['message_send_enqueue'] = message_send_enqueue
send_queue = per_mode_send_queues.setdefault('email', gevent.queue.Queue())
# support expanding target
mocker.patch('iris.sender.cache.targets_for_role', lambda role, target: [target])
mocker.patch('iris.bin.sender.db')
mocker.patch('iris.metrics.stats')
mocker.patch('iris.bin.sender.set_target_contact').return_value = True
mock_address = mocker.MagicMock()
mock_socket = mocker.MagicMock()
mock_socket.recv.return_value = msgpack.packb({
'endpoint': 'v0/send',
'data': fake_notification,
})
while send_queue.qsize() > 0:
send_queue.get()
handle_api_request(mock_socket, mock_address)
assert send_queue.qsize() == 1
m = send_queue.get()
assert m['subject'] == '[%s] %s' % (fake_notification['application'],
fake_notification['subject'])
def localJsonReport(self):
import platform
if platform.system() == 'Windows':
rep_port = self.conf.get('report','local_json_port')
server = StreamServer(('127.0.0.1', rep_port), self.processJsonRep)
server.serve_forever()
else:
from libs.unixSocket import bind_unix_listener
unix_sock_name = os.path.join(_agentBasePath,'localJsonReport.sock')
server = StreamServer(bind_unix_listener(unix_sock_name), self.processJsonRep)
os.chmod(unix_sock_name, 0o777)
server.serve_forever()
# ???????????
#@profile
def sendToServer(self, group_name, server_list, local_queue, flush_ready_event, linger_ms, max_queued_messages):
connected = False
rs = None
while True:
try:
# get msg
task_msgs = self.batch_fetch(local_queue, flush_ready_event, linger_ms, max_queued_messages)
if not task_msgs:
continue
# retry 3 times if failed
while True:
# check connection
if connected is False:
if rs is not None:
rs.session.close()
rs = self.get_report_server(group_name, server_list)
if rs.connect() != 0:
gevent.sleep(3)
continue
else:
connected = True
# send data
ret = rs.batch_send_data(task_msgs)
if ret == 0:
break
logger.error("send msg error!, ret={}".format(ret))
connected = False
except Exception, e:
connected = False
logger.error("Uncaught error here! e={}, traceback={}".format(e, traceback.format_exc()))
#@profile
def batch_fetch(self, queue, event, linger_ms, max_queued_messages):
if queue.qsize() < max_queued_messages:
event.wait(linger_ms / 1000)
if event.is_set():
event.clear()
batch_msgs = [queue.get() for _ in range(queue.qsize())]
return batch_msgs
#@profile
def _create_greenlet_worker(self, queue):
def greenlet_worker():
while True:
try:
func = queue.get()
if func is _STOP:
break
func()
except Empty:
continue
except Exception as exc:
log.warning("Exception in worker greenlet")
log.exception(exc)
return gevent.spawn(greenlet_worker)