def __init__(self,bind_ip='127.0.0.1',bind_port=0,handlers={},enable_null_handle=True):
""" handlers is a dict mapping message type integers to functions that take the params (msg_params,msg_id,from_addr,sock)
enable_null_handle enables a default "null handler" that does nothing with unhandled message types except logging them to debug
"""
self.sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
self.sock.bind((bind_ip,bind_port))
yatelog.info('YATESock','Bound %s:%s' % self.sock.getsockname())
yatelog.info('YATESock','Setting up handlers and queues')
self.pool = eventlet.GreenPool(1000)
self.in_queues = {} # packets coming in from remote peer go here after parsing, each message type has an independent queue so we can do QoS-type stuff
self.out_queues = {} # packets going out to remote peer go here
self.parse_q = eventlet.queue.LightQueue(0) # to keep up performance, packets go here before parsing
self.handlers = {MSGTYPE_CONNECT: self.handle_connect, # a couple of standard message handlers, override by passing in new handlers
MSGTYPE_UNKNOWN_PEER: self.handle_unknown_peer,
MSGTYPE_CONNECT_ACK: self.handle_connect_ack,
MSGTYPE_KEEPALIVE: self.handle_keepalive,
MSGTYPE_KEEPALIVE_ACK:self.handle_keepalive_ack}
self.handlers.update(handlers)
self.enable_null_handle = enable_null_handle
self.active = True
for x in xrange(10): self.pool.spawn_n(self.parser_thread)
for k,v in msgtype_str.items():
self.in_queues[k] = eventlet.queue.LightQueue(0)
self.out_queues[k] = eventlet.queue.LightQueue(0)
setattr(self,'send_%s' % v[8:].lower(),YATESockSendMethod(k,self)) # black magic
for x in xrange(2): self.pool.spawn_n(self.msg_sender_thread,k)
for x in xrange(2): self.pool.spawn_n(self.msg_reader_thread,k)
if enable_null_handle:
if not self.handlers.has_key(k): self.handlers[k] = self.null_handler
self.known_peers = set() # if this is a server, this set contains the list of clients, if it's a client this contains only 1 member - the server
self.last_pack = {} # store the timestamp of the last packet from a particular peer so we can do timeouts
self.pool.spawn_n(self.recv_thread)
self.pool.spawn_n(self.timeout_thread) # timeout peers all in a central location, giving plenty of time for them to send packets and not timeout
评论列表
文章目录