test_rolld.py 文件源码

python
阅读 26 收藏 0 点赞 0 评论 0

项目:rolld 作者: Hipo 项目源码 文件源码
def main():
    global rolld_proc, nginx_proc
    # start rolld
    rolld_proc = Subprocess(
        shlex.split("rolld example/rolld.yaml"),
        stdout=Subprocess.STREAM,
        stderr=Subprocess.STREAM,
    )
    out = partial(out_fn, name='rolld')
    rolld_proc.stdout.read_until_close(exit_callback, streaming_callback=out)
    rolld_proc.stderr.read_until_close(exit_callback, streaming_callback=out)

    # start nginx on port 9091
    nginx_proc = Subprocess(
        shlex.split("""nginx -p "%s" -c example/nginx.conf""" % os.path.curdir),
        stdout=Subprocess.STREAM,
        stderr=Subprocess.STREAM,
    )
    out = partial(out_fn, name='rolld')
    nginx_proc.stdout.read_until_close(exit_callback, streaming_callback=out)
    nginx_proc.stderr.read_until_close(exit_callback, streaming_callback=out)


    # now we restart everything
    def send_hub_to_rolld():
        print "sending SIGHUP to rolld"
        os.kill(rolld_proc.pid, signal.SIGHUP)

    def start_ping():
        global periodic_checker
        periodic_checker = PeriodicCallback(partial(periodic_callback, proc_pid=rolld_proc.pid), 1000)
        periodic_checker.start()

    IOLoop.instance().add_timeout(time.time() + 5, start_ping)
    IOLoop.instance().add_timeout(time.time() + 15, send_hub_to_rolld)
    IOLoop.instance().add_timeout(time.time() + 55, exit_test)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号