def _cleanup(self):
global _dispatcher_map
self._shutdown = True
if not self._thread:
return
log.debug("Waiting for event loop thread to join...")
self._thread.join(timeout=1.0)
if self._thread.is_alive():
log.warning(
"Event loop thread could not be joined, so shutdown may not be clean. "
"Please call Cluster.shutdown() to avoid this.")
log.debug("Event loop thread was joined")
# Ensure all connections are closed and in-flight requests cancelled
for conn in tuple(_dispatcher_map.values()):
conn.close()
log.debug("Dispatchers were closed")
python类loop()的实例源码
def __init__(self, *args, **kwargs):
Connection.__init__(self, *args, **kwargs)
self.deque = deque()
self.deque_lock = Lock()
self._connect_socket()
asyncore.dispatcher.__init__(self, self._socket, _dispatcher_map)
self._writable = True
self._readable = True
self._send_options_message()
# start the event loop if needed
self._loop.maybe_start()
def testchannel(self):
i, a, c = self.setup(True)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def icond(hook, msg):
if hook == 'admin' and msg['MsgType'] == 'Logon':
sock.connect(('127.0.0.1', CHANNEL_PORT))
sock.send(json.dumps({'MsgType': 'NewOrderSingle'}))
def acond(hook, msg):
if hook == 'app' and msg['MsgType'] == 'NewOrderSingle':
a.close()
i.close()
c.close()
self.loop(i, a, icond, acond, False)
self.assertEqual(json.loads(sock.recv(8192)), {'result': 'done'})
def line_terminator_check(self, term, server_chunk):
event = threading.Event()
s = echo_server(event)
s.chunk_size = server_chunk
s.start()
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
c = echo_client(term, s.port)
c.push(b"hello ")
c.push(b"world" + term)
c.push(b"I'm not dead yet!" + term)
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
# the line terminator tests below check receiving variously-sized
# chunks back from the server in order to exercise all branches of
# async_chat.handle_read
def test_close_when_done(self):
s, event = start_echo_server()
s.start_resend_event = threading.Event()
c = echo_client(b'\n', s.port)
c.push(b"hello world\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
c.close_when_done()
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
# Only allow the server to start echoing data back to the client after
# the client has closed its connection. This prevents a race condition
# where the server echoes all of its data before we can check that it
# got any down below.
s.start_resend_event.set()
s.join()
self.assertEqual(c.contents, [])
# the server might have been able to send a byte or two back, but this
# at least checks that it received something and didn't just fail
# (which could still result in the client not having received anything)
self.assertGreater(len(s.buffer), 0)
def line_terminator_check(self, term, server_chunk):
event = threading.Event()
s = echo_server(event)
s.chunk_size = server_chunk
s.start()
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
c = echo_client(term, s.port)
c.push("hello ")
c.push("world%s" % term)
c.push("I'm not dead yet!%s" % term)
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
# the line terminator tests below check receiving variously-sized
# chunks back from the server in order to exercise all branches of
# async_chat.handle_read
def test_close_when_done(self):
s, event = start_echo_server()
s.start_resend_event = threading.Event()
c = echo_client('\n', s.port)
c.push("hello world\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
c.close_when_done()
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
# Only allow the server to start echoing data back to the client after
# the client has closed its connection. This prevents a race condition
# where the server echoes all of its data before we can check that it
# got any down below.
s.start_resend_event.set()
s.join()
self.assertEqual(c.contents, [])
# the server might have been able to send a byte or two back, but this
# at least checks that it received something and didn't just fail
# (which could still result in the client not having received anything)
self.assertTrue(len(s.buffer) > 0)
def test_quick_connect(self):
# see: http://bugs.python.org/issue10340
server = TCPServer()
t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, count=500))
t.start()
self.addCleanup(t.join)
for x in xrange(20):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(.2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
try:
s.connect(server.address)
except socket.error:
pass
finally:
s.close()
def line_terminator_check(self, term, server_chunk):
event = threading.Event()
s = echo_server(event)
s.chunk_size = server_chunk
s.start()
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
c = echo_client(term, s.port)
c.push("hello ")
c.push("world%s" % term)
c.push("I'm not dead yet!%s" % term)
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
# the line terminator tests below check receiving variously-sized
# chunks back from the server in order to exercise all branches of
# async_chat.handle_read
def test_close_when_done(self):
s, event = start_echo_server()
s.start_resend_event = threading.Event()
c = echo_client('\n', s.port)
c.push("hello world\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
c.close_when_done()
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
# Only allow the server to start echoing data back to the client after
# the client has closed its connection. This prevents a race condition
# where the server echoes all of its data before we can check that it
# got any down below.
s.start_resend_event.set()
s.join()
self.assertEqual(c.contents, [])
# the server might have been able to send a byte or two back, but this
# at least checks that it received something and didn't just fail
# (which could still result in the client not having received anything)
self.assertTrue(len(s.buffer) > 0)
def test_quick_connect(self):
# see: http://bugs.python.org/issue10340
server = TCPServer()
t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, count=500))
t.start()
self.addCleanup(t.join)
for x in xrange(20):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(.2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
try:
s.connect(server.address)
except socket.error:
pass
finally:
s.close()
def line_terminator_check(self, term, server_chunk):
event = threading.Event()
s = echo_server(event)
s.chunk_size = server_chunk
s.start()
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
c = echo_client(term, s.port)
c.push(b"hello ")
c.push(b"world" + term)
c.push(b"I'm not dead yet!" + term)
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
# the line terminator tests below check receiving variously-sized
# chunks back from the server in order to exercise all branches of
# async_chat.handle_read
def test_close_when_done(self):
s, event = start_echo_server()
s.start_resend_event = threading.Event()
c = echo_client(b'\n', s.port)
c.push(b"hello world\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
c.close_when_done()
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
# Only allow the server to start echoing data back to the client after
# the client has closed its connection. This prevents a race condition
# where the server echoes all of its data before we can check that it
# got any down below.
s.start_resend_event.set()
s.join()
self.assertEqual(c.contents, [])
# the server might have been able to send a byte or two back, but this
# at least checks that it received something and didn't just fail
# (which could still result in the client not having received anything)
self.assertGreater(len(s.buffer), 0)
def test_quick_connect(self):
# see: http://bugs.python.org/issue10340
if self.family in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
server = BaseServer(self.family, self.addr)
t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
count=500))
t.start()
self.addCleanup(t.join)
s = socket.socket(self.family, socket.SOCK_STREAM)
s.settimeout(.2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
try:
s.connect(server.address)
except socket.error:
pass
finally:
s.close()
def serve_forever(self, poll_interval):
"""
Run the :mod:`asyncore` loop until normal termination
conditions arise.
:param poll_interval: The interval, in seconds, used in the underlying
:func:`select` or :func:`poll` call by
:func:`asyncore.loop`.
"""
try:
asyncore.loop(poll_interval, map=self.sockmap)
except select.error:
# On FreeBSD 8, closing the server repeatably
# raises this error. We swallow it if the
# server has been closed.
if self.connected or self.accepting:
raise
def _fun_net_proc(self, async_ctx, req_queue):
"""
processing request queue
:param async_ctx:
:param req_queue: request queue
:return:
"""
while True:
if req_queue.empty() is False:
try:
ctl_flag, req_str = req_queue.get(timeout=0.001)
if ctl_flag is False:
break
async_ctx.network_query(req_str)
except Exception as e:
traceback.print_exc()
asyncore.loop(timeout=0.001, count=5)
def line_terminator_check(self, term, server_chunk):
event = threading.Event()
s = echo_server(event)
s.chunk_size = server_chunk
s.start()
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
c = echo_client(term, s.port)
c.push("hello ")
c.push("world%s" % term)
c.push("I'm not dead yet!%s" % term)
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
# the line terminator tests below check receiving variously-sized
# chunks back from the server in order to exercise all branches of
# async_chat.handle_read
def test_close_when_done(self):
s, event = start_echo_server()
s.start_resend_event = threading.Event()
c = echo_client('\n', s.port)
c.push("hello world\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
c.close_when_done()
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
# Only allow the server to start echoing data back to the client after
# the client has closed its connection. This prevents a race condition
# where the server echoes all of its data before we can check that it
# got any down below.
s.start_resend_event.set()
s.join()
self.assertEqual(c.contents, [])
# the server might have been able to send a byte or two back, but this
# at least checks that it received something and didn't just fail
# (which could still result in the client not having received anything)
self.assertTrue(len(s.buffer) > 0)
def test_quick_connect(self):
# see: http://bugs.python.org/issue10340
server = TCPServer()
t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, count=500))
t.start()
self.addCleanup(t.join)
for x in xrange(20):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(.2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
try:
s.connect(server.address)
except socket.error:
pass
finally:
s.close()
def line_terminator_check(self, term, server_chunk):
event = threading.Event()
s = echo_server(event)
s.chunk_size = server_chunk
s.start()
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
c = echo_client(term, s.port)
c.push(b"hello ")
c.push(b"world" + term)
c.push(b"I'm not dead yet!" + term)
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join(timeout=TIMEOUT)
if s.is_alive():
self.fail("join() timed out")
self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
# the line terminator tests below check receiving variously-sized
# chunks back from the server in order to exercise all branches of
# async_chat.handle_read
def test_close_when_done(self):
s, event = start_echo_server()
s.start_resend_event = threading.Event()
c = echo_client(b'\n', s.port)
c.push(b"hello world\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
c.close_when_done()
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
# Only allow the server to start echoing data back to the client after
# the client has closed its connection. This prevents a race condition
# where the server echoes all of its data before we can check that it
# got any down below.
s.start_resend_event.set()
s.join(timeout=TIMEOUT)
if s.is_alive():
self.fail("join() timed out")
self.assertEqual(c.contents, [])
# the server might have been able to send a byte or two back, but this
# at least checks that it received something and didn't just fail
# (which could still result in the client not having received anything)
self.assertGreater(len(s.buffer), 0)
def test_quick_connect(self):
# see: http://bugs.python.org/issue10340
if self.family in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
server = BaseServer(self.family, self.addr)
t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
count=500))
t.start()
def cleanup():
t.join(timeout=TIMEOUT)
if t.is_alive():
self.fail("join() timed out")
self.addCleanup(cleanup)
s = socket.socket(self.family, socket.SOCK_STREAM)
s.settimeout(.2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
try:
s.connect(server.address)
except OSError:
pass
finally:
s.close()
def serve_forever(self, poll_interval):
"""
Run the :mod:`asyncore` loop until normal termination
conditions arise.
:param poll_interval: The interval, in seconds, used in the underlying
:func:`select` or :func:`poll` call by
:func:`asyncore.loop`.
"""
try:
asyncore.loop(poll_interval, map=self._map)
except OSError:
# On FreeBSD 8, closing the server repeatably
# raises this error. We swallow it if the
# server has been closed.
if self.connected or self.accepting:
raise
def line_terminator_check(self, term, server_chunk):
event = threading.Event()
s = echo_server(event)
s.chunk_size = server_chunk
s.start()
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
c = echo_client(term, s.port)
c.push("hello ")
c.push("world%s" % term)
c.push("I'm not dead yet!%s" % term)
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
# the line terminator tests below check receiving variously-sized
# chunks back from the server in order to exercise all branches of
# async_chat.handle_read
def test_close_when_done(self):
s, event = start_echo_server()
s.start_resend_event = threading.Event()
c = echo_client('\n', s.port)
c.push("hello world\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
c.close_when_done()
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
# Only allow the server to start echoing data back to the client after
# the client has closed its connection. This prevents a race condition
# where the server echoes all of its data before we can check that it
# got any down below.
s.start_resend_event.set()
s.join()
self.assertEqual(c.contents, [])
# the server might have been able to send a byte or two back, but this
# at least checks that it received something and didn't just fail
# (which could still result in the client not having received anything)
self.assertTrue(len(s.buffer) > 0)
def test_quick_connect(self):
# see: http://bugs.python.org/issue10340
server = TCPServer()
t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, count=500))
t.start()
self.addCleanup(t.join)
for x in xrange(20):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(.2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
try:
s.connect(server.address)
except socket.error:
pass
finally:
s.close()
def line_terminator_check(self, term, server_chunk):
event = threading.Event()
s = echo_server(event)
s.chunk_size = server_chunk
s.start()
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
c = echo_client(term, s.port)
c.push(b"hello ")
c.push(b"world" + term)
c.push(b"I'm not dead yet!" + term)
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join(timeout=TIMEOUT)
if s.is_alive():
self.fail("join() timed out")
self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
# the line terminator tests below check receiving variously-sized
# chunks back from the server in order to exercise all branches of
# async_chat.handle_read
def _cleanup(loop_weakref):
try:
loop = loop_weakref()
except ReferenceError:
return
loop._cleanup()
def validate(self):
assert not self._notified
self.notify_loop()
assert self._notified
self.loop(0.1)
assert not self._notified
def loop(self, timeout):
asyncore.loop(timeout=timeout, use_poll=True, map=_dispatcher_map, count=1)