def wait_for_listening_port(port_number, tries=10, sleep=0.1, pid=None):
if pid is None:
pid = os.getpid()
for _ in range(tries):
gevent.sleep(sleep)
# macoOS requires root access for the connections api to work
# so get connections of the current process only
connections = psutil.Process(pid).connections()
for conn in connections:
if conn.status == 'LISTEN' and conn.laddr[1] == port_number:
return
raise RuntimeError('{port} is not bound'.format(port=port_number))
# TODO: Figure out why this fixture can't work as session scoped
# What happens is that after one test is done, in the next one
# the server is no longer running even though the teardown has not
# been invoked.
评论列表
文章目录