def test_send(self):
evt = threading.Event()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
port = test_support.bind_port(sock)
cap = StringIO()
args = (evt, cap, sock)
t = threading.Thread(target=capture_server, args=args)
t.start()
try:
# wait a little longer for the server to initialize (it sometimes
# refuses connections on slow machines without this wait)
time.sleep(0.2)
data = "Suppose there isn't a 16-ton weight?"
d = dispatcherwithsend_noread()
d.create_socket(socket.AF_INET, socket.SOCK_STREAM)
d.connect((HOST, port))
# give time for socket to connect
time.sleep(0.1)
d.send(data)
d.send(data)
d.send('\n')
n = 1000
while d.out_buffer and n > 0:
asyncore.poll()
n -= 1
evt.wait()
self.assertEqual(cap.getvalue(), data*2)
finally:
t.join()
python类poll()的实例源码
def debugging_server(serv, serv_evt, client_evt):
serv_evt.set()
try:
if hasattr(select, 'poll'):
poll_fun = asyncore.poll2
else:
poll_fun = asyncore.poll
n = 1000
while asyncore.socket_map and n > 0:
poll_fun(0.01, asyncore.socket_map)
# when the client conversation is finished, it will
# set client_evt, and it's then ok to kill the server
if client_evt.is_set():
serv.close()
break
n -= 1
except socket.timeout:
pass
finally:
if not client_evt.is_set():
# allow some time for the client to read the result
time.sleep(0.5)
serv.close()
asyncore.close_all()
serv_evt.set()
def test_readwriteexc(self):
# Check exception handling behavior of read, write and _exception
# check that ExitNow exceptions in the object handler method
# bubbles all the way up through asyncore read/write/_exception calls
tr1 = exitingdummy()
self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
# check that an exception other than ExitNow in the object handler
# method causes the handle_error method to get called
tr2 = crashingdummy()
asyncore.read(tr2)
self.assertEqual(tr2.error_handled, True)
tr2 = crashingdummy()
asyncore.write(tr2)
self.assertEqual(tr2.error_handled, True)
tr2 = crashingdummy()
asyncore._exception(tr2)
self.assertEqual(tr2.error_handled, True)
# asyncore.readwrite uses constants in the select module that
# are not present in Windows systems (see this thread:
# http://mail.python.org/pipermail/python-list/2001-October/109973.html)
# These constants should be present as long as poll is available
def test_send(self):
evt = threading.Event()
sock = socket.socket()
sock.settimeout(3)
port = support.bind_port(sock)
cap = BytesIO()
args = (evt, cap, sock)
t = threading.Thread(target=capture_server, args=args)
t.start()
try:
# wait a little longer for the server to initialize (it sometimes
# refuses connections on slow machines without this wait)
time.sleep(0.2)
data = b"Suppose there isn't a 16-ton weight?"
d = dispatcherwithsend_noread()
d.create_socket()
d.connect((support.HOST, port))
# give time for socket to connect
time.sleep(0.1)
d.send(data)
d.send(data)
d.send(b'\n')
n = 1000
while d.out_buffer and n > 0:
asyncore.poll()
n -= 1
evt.wait()
self.assertEqual(cap.getvalue(), data*2)
finally:
t.join(timeout=TIMEOUT)
if t.is_alive():
self.fail("join() timed out")
def lifetime_loop():
# The main loop. Stay in here until we need to shutdown
map = asyncore.socket_map
timeout = 30.0
while map and _shutdown_phase == 0:
asyncore.poll(timeout, map)
def graceful_shutdown_loop():
# The shutdown loop. Allow various services to shutdown gradually.
global _shutdown_phase
timestamp = time.time()
timeout = 1.0
map = asyncore.socket_map
while map and _shutdown_phase < 4:
time_in_this_phase = time.time() - timestamp
veto = 0
for fd, obj in map.items():
try:
fn = getattr(obj, 'clean_shutdown_control')
except AttributeError:
pass
else:
try:
veto = veto or fn(_shutdown_phase, time_in_this_phase)
except:
obj.handle_error()
if veto and time_in_this_phase < _shutdown_timeout:
# Any open socket handler can veto moving on to the next shutdown
# phase. (but not forever)
asyncore.poll(timeout, map)
else:
# No vetos? That is one step closer to shutting down
_shutdown_phase += 1
timestamp = time.time()
def go(self, timeout=30.0, granularity=15):
global socket_map
last_event_check = 0
while socket_map:
now = int(time.time())
if (now - last_event_check) >= granularity:
last_event_check = now
fired = []
# yuck. i want my lisp.
i = j = 0
while i < len(self.events):
when, what = self.events[i]
if now >= when:
fired.append(what)
j = i + 1
else:
break
i = i + 1
if fired:
self.events = self.events[j:]
for what in fired:
what(self, now)
# sample the number of channels
n = len(asyncore.socket_map)
self.num_channels = n
if n > self.max_channels:
self.max_channels = n
asyncore.poll(timeout)
def run(self):
while self.continue_running:
asyncore.poll()
def run_loop():
global must_exit
global in_pipe
global out_pipe
global needs_flush
global flush_pipes
global last_activity
winmm = None
# increase the windows timer resolution to 1ms
if platform.system() == "Windows":
try:
import ctypes
winmm = ctypes.WinDLL('winmm')
winmm.timeBeginPeriod(1)
except:
pass
last_activity = time.clock()
last_check = time.clock()
# disable gc to avoid pauses during traffic shaping/proxying
gc.disable()
while not must_exit:
# Tick every 1ms if traffic-shaping is enabled and we have data or are doing background dns lookups, every 1 second otherwise
lock.acquire()
tick_interval = 0.001
if background_activity_count == 0:
if in_pipe.next_message is None and in_pipe.queue.empty() and out_pipe.next_message is None and out_pipe.queue.empty():
tick_interval = 1.0
elif in_pipe.kbps == .0 and in_pipe.latency == 0 and out_pipe.kbps == .0 and out_pipe.latency == 0:
tick_interval = 1.0
lock.release()
asyncore.poll(tick_interval, asyncore.socket_map)
if needs_flush:
flush_pipes = True
needs_flush = False
out_pipe.tick()
in_pipe.tick()
if flush_pipes:
PrintMessage('OK')
flush_pipes = False
# Every 500 ms check to see if it is a good time to do a gc
now = time.clock()
if now - last_check > 0.5:
last_check = now
# manually gc after 5 seconds of idle
if now - last_activity >= 5:
last_activity = now
logging.debug("Triggering manual GC")
gc.collect()
if winmm is not None:
winmm.timeEndPeriod(1)
def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
"""
Replacement for asyncore.loop(), adding timer and signal support.
"""
old_signal_handlers = {}
while True:
save_sigs = len(old_signal_handlers) == 0
try:
for sig in catch_signals:
old = signal.signal(sig, _raiseExitNow)
if save_sigs:
old_signal_handlers[sig] = old
while asyncore.socket_map or timer_queue:
t = timer.seconds_until_wakeup()
if debug_event_timing:
logger.debug("Dismissing to asyncore.poll(), t = %s, q = %r", t, timer_queue)
asyncore.poll(t, asyncore.socket_map)
timer.runq()
if timer.gc_debug:
gc.collect()
if gc.garbage:
for i in gc.garbage:
logger.debug("GC-cycle %r", i)
del gc.garbage[:]
except ExitNow:
break
except SystemExit:
raise
except ValueError, e:
if str(e) == "filedescriptor out of range in select()":
logger.error("Something is badly wrong, select() thinks we gave it a bad file descriptor.")
logger.error("Content of asyncore.socket_map:")
for fd in sorted(asyncore.socket_map.iterkeys()):
logger.error(" fd %s obj %r", fd, asyncore.socket_map[fd])
logger.error("Not safe to continue due to risk of spin loop on select(). Exiting.")
sys.exit(1)
logger.exception("event_loop() exited with exception %r, this is not supposed to happen, restarting")
except Exception, e:
logger.exception("event_loop() exited with exception %r, this is not supposed to happen, restarting")
else:
break
finally:
for sig in old_signal_handlers:
signal.signal(sig, old_signal_handlers[sig])
def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
"""
Replacement for asyncore.loop(), adding timer and signal support.
"""
old_signal_handlers = {}
while True:
save_sigs = len(old_signal_handlers) == 0
try:
for sig in catch_signals:
old = signal.signal(sig, _raiseExitNow)
if save_sigs:
old_signal_handlers[sig] = old
while asyncore.socket_map or timer_queue:
t = timer.seconds_until_wakeup()
if debug_event_timing:
logger.debug("Dismissing to asyncore.poll(), t = %s, q = %r", t, timer_queue)
asyncore.poll(t, asyncore.socket_map)
timer.runq()
if timer.gc_debug:
gc.collect()
if gc.garbage:
for i in gc.garbage:
logger.debug("GC-cycle %r", i)
del gc.garbage[:]
except ExitNow:
break
except SystemExit:
raise
except ValueError, e:
if str(e) == "filedescriptor out of range in select()":
logger.error("Something is badly wrong, select() thinks we gave it a bad file descriptor.")
logger.error("Content of asyncore.socket_map:")
for fd in sorted(asyncore.socket_map.iterkeys()):
logger.error(" fd %s obj %r", fd, asyncore.socket_map[fd])
logger.error("Not safe to continue due to risk of spin loop on select(). Exiting.")
sys.exit(1)
logger.exception("event_loop() exited with exception %r, this is not supposed to happen, restarting")
except Exception, e:
logger.exception("event_loop() exited with exception %r, this is not supposed to happen, restarting")
else:
break
finally:
for sig in old_signal_handlers:
signal.signal(sig, old_signal_handlers[sig])
def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
"""
Replacement for asyncore.loop(), adding timer and signal support.
"""
old_signal_handlers = {}
while True:
save_sigs = len(old_signal_handlers) == 0
try:
for sig in catch_signals:
old = signal.signal(sig, _raiseExitNow)
if save_sigs:
old_signal_handlers[sig] = old
while asyncore.socket_map or timer_queue:
t = timer.seconds_until_wakeup()
if debug_event_timing:
logger.debug("Dismissing to asyncore.poll(), t = %s, q = %r", t, timer_queue)
asyncore.poll(t, asyncore.socket_map)
timer.runq()
if timer.gc_debug:
gc.collect()
if gc.garbage:
for i in gc.garbage:
logger.debug("GC-cycle %r", i)
del gc.garbage[:]
except ExitNow:
break
except SystemExit:
raise
except ValueError, e:
if str(e) == "filedescriptor out of range in select()":
logger.error("Something is badly wrong, select() thinks we gave it a bad file descriptor.")
logger.error("Content of asyncore.socket_map:")
for fd in sorted(asyncore.socket_map.iterkeys()):
logger.error(" fd %s obj %r", fd, asyncore.socket_map[fd])
logger.error("Not safe to continue due to risk of spin loop on select(). Exiting.")
sys.exit(1)
logger.exception("event_loop() exited with exception %r, this is not supposed to happen, restarting")
except Exception, e:
logger.exception("event_loop() exited with exception %r, this is not supposed to happen, restarting")
else:
break
finally:
for sig in old_signal_handlers:
signal.signal(sig, old_signal_handlers[sig])
def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
"""
Replacement for asyncore.loop(), adding timer and signal support.
"""
old_signal_handlers = {}
while True:
save_sigs = len(old_signal_handlers) == 0
try:
for sig in catch_signals:
old = signal.signal(sig, _raiseExitNow)
if save_sigs:
old_signal_handlers[sig] = old
while asyncore.socket_map or timer_queue:
t = timer.seconds_until_wakeup()
if debug_event_timing:
logger.debug("Dismissing to asyncore.poll(), t = %s, q = %r", t, timer_queue)
asyncore.poll(t, asyncore.socket_map)
timer.runq()
if timer.gc_debug:
gc.collect()
if gc.garbage:
for i in gc.garbage:
logger.debug("GC-cycle %r", i)
del gc.garbage[:]
except ExitNow:
break
except SystemExit:
raise
except ValueError, e:
if str(e) == "filedescriptor out of range in select()":
logger.error("Something is badly wrong, select() thinks we gave it a bad file descriptor.")
logger.error("Content of asyncore.socket_map:")
for fd in sorted(asyncore.socket_map.iterkeys()):
logger.error(" fd %s obj %r", fd, asyncore.socket_map[fd])
logger.error("Not safe to continue due to risk of spin loop on select(). Exiting.")
sys.exit(1)
logger.exception("event_loop() exited with exception %r, this is not supposed to happen, restarting")
except Exception, e:
logger.exception("event_loop() exited with exception %r, this is not supposed to happen, restarting")
else:
break
finally:
for sig in old_signal_handlers:
signal.signal(sig, old_signal_handlers[sig])
def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
"""
Replacement for asyncore.loop(), adding timer and signal support.
"""
old_signal_handlers = {}
while True:
save_sigs = len(old_signal_handlers) == 0
try:
for sig in catch_signals:
old = signal.signal(sig, _raiseExitNow)
if save_sigs:
old_signal_handlers[sig] = old
while asyncore.socket_map or timer_queue:
t = timer.seconds_until_wakeup()
if debug_event_timing:
logger.debug("Dismissing to asyncore.poll(), t = %s, q = %r", t, timer_queue)
asyncore.poll(t, asyncore.socket_map)
timer.runq()
if timer.gc_debug:
gc.collect()
if gc.garbage:
for i in gc.garbage:
logger.debug("GC-cycle %r", i)
del gc.garbage[:]
except ExitNow:
break
except SystemExit:
raise
except ValueError, e:
if str(e) == "filedescriptor out of range in select()":
logger.error("Something is badly wrong, select() thinks we gave it a bad file descriptor.")
logger.error("Content of asyncore.socket_map:")
for fd in sorted(asyncore.socket_map.iterkeys()):
logger.error(" fd %s obj %r", fd, asyncore.socket_map[fd])
logger.error("Not safe to continue due to risk of spin loop on select(). Exiting.")
sys.exit(1)
logger.exception("event_loop() exited with exception %r, this is not supposed to happen, restarting")
except Exception, e:
logger.exception("event_loop() exited with exception %r, this is not supposed to happen, restarting")
else:
break
finally:
for sig in old_signal_handlers:
signal.signal(sig, old_signal_handlers[sig])